diff --git a/README.md b/README.md index 0c4c93a..58f63e0 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,7 @@ try: with EEGConnector() as device: info = device.get_device_data() print(f"Połączono z: {info.name}") + print(f"Kanały: {info.channel_names}") # Akwizycja 5 sekund danych eeg_data = device.get_output(duration=5.0) @@ -70,6 +71,59 @@ finally: close() ``` +### BrainAccess — skalowanie jednostek + +SDK BrainAccess zwraca surowe wartości ADC. Parametr `scale` pozwala przeliczać je na µV bezpośrednio przy streamowaniu: + +```python +from bridge.eeg.brainaccess import BrainaccessDevice + +device = BrainaccessDevice(scale=1 / 1000) # ADC → µV +device.connect() +for chunk in device.stream(): + # chunk jest już w µV + ... +device.disconnect() +``` + +### Nagrywanie i odtwarzanie + +**Nagrywanie live** z urządzenia: + +```python +from bridge.eeg.recorder import EEGRecorder +from bridge.eeg.brainaccess import BrainaccessDevice + +device = BrainaccessDevice(scale=1 / 1000) +with EEGRecorder(device, filename="session.npz", sfreq=250, ch_names=["C3", "C4", "Cz"]) as rec: + for chunk in rec.stream(): + pass # dane są buforowane i zapisywane automatycznie przy wyjściu +``` + +**Zapis gotowych danych** (gdy pętla streamowania jest zarządzana ręcznie): + +```python +from bridge.eeg.recorder import save_recording +import numpy as np + +data: np.ndarray # (n_channels, n_samples) +save_recording(data, path="session.npz", sfreq=250, ch_names=["C3", "C4", "Cz"]) +``` + +**Odtwarzanie** nagrania zamiast prawdziwego urządzenia: + +```python +from bridge.eeg.file.device import FileDevice + +device = FileDevice("session.npz") +device.connect() +for chunk in device.stream(): + ... # identyczne API jak z live urządzeniem +device.disconnect() +``` + +Pliki `.npz` zapisane przez `EEGRecorder` i `save_recording` są w pełni wymienne z `FileDevice`. + ## Rozwój Projektu ### Konfiguracja Środowiska diff --git a/bridge/eeg/brainaccess/device.py b/bridge/eeg/brainaccess/device.py index 4148fee..a3491c2 100644 --- a/bridge/eeg/brainaccess/device.py +++ b/bridge/eeg/brainaccess/device.py @@ -26,7 +26,7 @@ class BrainaccessDevice(EEGDevice): - def __init__(self, logger: Logger | None = None) -> None: + def __init__(self, scale: float = 1.0, logger: Logger | None = None) -> None: self._eeg: EEG = acquisition.EEG() self._manager: EEGManager | None = None self._cap: dict[int, str] | None = None @@ -34,6 +34,7 @@ def __init__(self, logger: Logger | None = None) -> None: self._device_name: str | None = None self._stream_queue: Queue[EEGArray] = Queue() self._is_streaming: bool = False + self._scale: float = scale super().__init__(logger or getLogger(__name__)) @@ -173,7 +174,7 @@ def stream(self) -> Generator[EEGArray, None, None]: while self._is_streaming: try: chunk = self._stream_queue.get(timeout=1.0) - yield chunk + yield chunk * self._scale if self._scale != 1.0 else chunk except Empty: continue finally: @@ -224,4 +225,5 @@ def get_device_data(self) -> DeviceData: manufacturer=BRAINACCESS_MANUFACTURER, electrodes_num=len(self._cap) if self._cap else None, sample_rate=self._manager.get_sample_frequency() if self._manager else None, + channel_names=tuple(self._cap.values()) if self._cap else None, ) diff --git a/bridge/eeg/core/device_data.py b/bridge/eeg/core/device_data.py index 10a982f..2abbd80 100644 --- a/bridge/eeg/core/device_data.py +++ b/bridge/eeg/core/device_data.py @@ -10,6 +10,7 @@ class DeviceData: manufacturer: str | None = None electrodes_num: int | None = None sample_rate: int | None = None + channel_names: tuple[str, ...] | None = None @dataclass(frozen=True, slots=True, kw_only=True) diff --git a/bridge/eeg/file/device.py b/bridge/eeg/file/device.py index 4fc8ed2..6b515a1 100644 --- a/bridge/eeg/file/device.py +++ b/bridge/eeg/file/device.py @@ -16,9 +16,10 @@ def __init__( ) -> None: super().__init__(logger or getLogger(__name__)) self._path: Final[Path] = Path(file_path) - self._sfreq: Final[float] = sfreq + self._sfreq: float = sfreq self._chunk_size: Final[int] = chunk_size self._data: np.ndarray[Any, Any] | None = None + self._ch_names: tuple[str, ...] | None = None self._is_connected: bool = False def connect(self) -> None: @@ -27,6 +28,10 @@ def connect(self) -> None: with np.load(self._path) as loader: self._data = loader["data"] + if "sfreq" in loader: + self._sfreq = float(loader["sfreq"]) + if "ch_names" in loader: + self._ch_names = tuple(str(n) for n in loader["ch_names"]) self._is_connected = True if self._data is None or self._data.size == 0: @@ -58,4 +63,11 @@ def stream(self) -> Generator[EEGArray, None, None]: yield self._data[:, start : start + self._chunk_size].astype(np.float64) def get_device_data(self) -> DeviceData: - return DeviceData(name=self._path.name, manufacturer="BinarySim", sample_rate=int(self._sfreq)) + n_ch = self._data.shape[0] if self._data is not None else None + return DeviceData( + name=self._path.name, + manufacturer="BinarySim", + sample_rate=int(self._sfreq), + electrodes_num=n_ch, + channel_names=self._ch_names, + ) diff --git a/bridge/eeg/recorder.py b/bridge/eeg/recorder.py index c386485..3a95571 100644 --- a/bridge/eeg/recorder.py +++ b/bridge/eeg/recorder.py @@ -9,6 +9,30 @@ from .core.device_data import RecordingFrame +def save_recording( + data: np.ndarray[Any, Any], + path: str | Path, + sfreq: float | None = None, + ch_names: list[str] | None = None, + logger: Logger | None = None, +) -> None: + _log = logger or getLogger(__name__) + dest = Path(path) + dest.parent.mkdir(parents=True, exist_ok=True) + + arrays: dict[str, Any] = { + "timestamps": np.array([time.time()]), + "data": data, + } + if sfreq is not None: + arrays["sfreq"] = np.float64(sfreq) + if ch_names is not None: + arrays["ch_names"] = np.array(ch_names, dtype=str) + + np.savez_compressed(dest, **arrays) + _log.info("Saved recording to %s", dest) + + class EEGRecorder: """Rejestrator EEG wykorzystujący wysokowydajny format binarny NumPy.""" @@ -16,6 +40,9 @@ def __init__( self, device: EEGDevice, filename: str, + output_dir: str | Path = "recordings", + sfreq: float | None = None, + ch_names: list[str] | None = None, logger: Logger | None = None, autosave: bool = True, connect_device: bool = True, @@ -23,6 +50,9 @@ def __init__( self._logger: Final[Logger] = logger or getLogger(__name__) self._device: Final[EEGDevice] = device self._filename: Final[str] = filename + self._output_dir: Final[Path] = Path(output_dir) + self._sfreq: float | None = sfreq + self._ch_names: list[str] | None = ch_names self._autosave: Final[bool] = autosave self._connect_device: Final[bool] = connect_device self._frames: list[RecordingFrame] = [] @@ -51,14 +81,19 @@ def save(self) -> None: return try: - output_dir: Final[Path] = Path("recordings") - output_dir.mkdir(exist_ok=True) - file_path: Final[Path] = output_dir / self._filename + self._output_dir.mkdir(parents=True, exist_ok=True) + file_path: Final[Path] = self._output_dir / self._filename timestamps: Final[np.ndarray[Any, Any]] = np.array([f.timestamp for f in self._frames]) data_blocks: Final[np.ndarray[Any, Any]] = np.concatenate([f.data for f in self._frames], axis=1) - np.savez_compressed(file_path, timestamps=timestamps, data=data_blocks) + arrays: dict[str, Any] = {"timestamps": timestamps, "data": data_blocks} + if self._sfreq is not None: + arrays["sfreq"] = np.float64(self._sfreq) + if self._ch_names is not None: + arrays["ch_names"] = np.array(self._ch_names, dtype=str) + + np.savez_compressed(file_path, **arrays) self._logger.info("Saved session to binary file: %s", file_path)