diff --git a/dlclivegui/cameras/backends/gentl_backend.py b/dlclivegui/cameras/backends/gentl_backend.py index b55abdf..0992db0 100644 --- a/dlclivegui/cameras/backends/gentl_backend.py +++ b/dlclivegui/cameras/backends/gentl_backend.py @@ -7,11 +7,12 @@ import threading import time from pathlib import Path -from typing import ClassVar +from typing import Any, ClassVar import cv2 import numpy as np +from ...config import CameraTriggerSettings from ..base import CameraBackend, SupportLevel, register_backend from ..factory import DetectedCamera from .utils import gentl_discovery as cti_finder @@ -75,6 +76,11 @@ class GenTLCameraBackend(CameraBackend): _CTI_FILES_SOURCE_AUTO: ClassVar[str] = "auto" _CTI_FILES_SOURCE_USER: ClassVar[str] = "user" + # Keep individual Harvester.fetch() calls short enough that controller + # shutdown can stop worker threads promptly. Hardware-trigger waits are + # handled by repeated polling in SingleCameraWorker. + _MAX_HARDWARE_TRIGGER_FETCH_TIMEOUT: ClassVar[float] = 1.0 + def __init__(self, settings): super().__init__(settings) @@ -105,6 +111,35 @@ def __init__(self, settings): self._gain = self._positive_float(ns.get("gain", props.get("gain"))) self._timeout: float = float(ns.get("timeout", props.get("timeout", 2.0))) + raw_trigger = ns.get("trigger", props.get("trigger")) + raw_trigger_strict = isinstance(raw_trigger, dict) and bool(raw_trigger.get("strict", False)) + + try: + self._trigger = CameraTriggerSettings.from_any(raw_trigger) + except Exception as exc: + if raw_trigger_strict: + raise ValueError(f"Strict mode failure - Invalid GenTL trigger configuration: {exc}") from exc + + LOG.warning( + "Invalid GenTL trigger config; falling back to trigger role=off: %s. " + "Enable strict mode to force this to raise.", + exc, + ) + self._trigger = CameraTriggerSettings() + + trigger_timeout = self._positive_float(self._trigger_attr(self._trigger, "timeout", None)) + if trigger_timeout is not None: + role = str(self._trigger_attr(self._trigger, "role", "off") or "off").strip().lower() + + if role in {"external", "follower"}: + # Do not let a long hardware-trigger wait block shutdown. + # SingleCameraWorker treats these fetch timeouts as expected + # polling misses while waits_for_hardware_trigger is true. + self._timeout = min(float(trigger_timeout), self._MAX_HARDWARE_TRIGGER_FETCH_TIMEOUT) + else: + # For non-trigger-waiting modes, preserve legacy behavior. + self._timeout = float(trigger_timeout) + self._requested_resolution: tuple[int, int] | None = self._get_requested_resolution_or_none() self._actual_width: int | None = None @@ -154,6 +189,7 @@ def static_capabilities(cls) -> dict[str, SupportLevel]: "set_gain": SupportLevel.SUPPORTED, "device_discovery": SupportLevel.SUPPORTED, "stable_identity": SupportLevel.SUPPORTED, + "hardware_trigger": SupportLevel.BEST_EFFORT, } # ------------------------------------------------------------------ @@ -420,11 +456,12 @@ def open(self) -> None: self._device_label = self._resolve_device_label(node_map) self._configure_pixel_format(node_map) - self._configure_trigger(node_map) self._configure_resolution(node_map) self._configure_exposure(node_map) self._configure_gain(node_map) self._configure_frame_rate(node_map) + self._configure_trigger(node_map) # keep low in the list + self._ensure_settings_ns()["trigger_actual"] = self._trigger_to_dict(self._trigger) self._read_telemetry(node_map) self._persist_device_metadata(selected_info, selected_serial) @@ -449,6 +486,11 @@ def open(self) -> None: f"Failed to open GenTL camera.\n\nLoaded CTIs: {loaded}\nFailed CTIs: {failed}\nReason: {exc}" ) from exc + @property + def waits_for_hardware_trigger(self) -> bool: + role = str(self._trigger_attr(getattr(self, "_trigger", None), "role", "off") or "off").lower() + return role in {"external", "follower"} + def read(self) -> tuple[np.ndarray, float]: if self._acquirer is None: raise RuntimeError("GenTL image acquirer not initialised") @@ -470,6 +512,8 @@ def read(self) -> tuple[np.ndarray, float]: except ValueError: frame = array.copy() except HarvesterTimeoutError as exc: + if self.waits_for_hardware_trigger: + raise TimeoutError(str(exc) + " (GenTL timeout; waiting for hardware trigger?)") from exc raise TimeoutError(str(exc) + " (GenTL timeout)") from exc frame = self._convert_frame(frame) @@ -502,6 +546,12 @@ def close(self) -> None: except Exception: pass + try: + node_map = self._acquirer.remote_device.node_map + self._call_with_optional_lock(self._restore_trigger_idle, node_map) + except Exception: + pass + try: destroy = getattr(self._acquirer, "destroy", None) if destroy is not None: @@ -949,6 +999,107 @@ def _create_acquirer(self, serial: str | None, index: int): # ------------------------------------------------------------------ # Camera configuration helpers # ------------------------------------------------------------------ + @staticmethod + def _node(node_map, name: str): + try: + return getattr(node_map, name) + except Exception: + return None + + @staticmethod + def _node_symbolics(node) -> list[str]: + try: + return list(getattr(node, "symbolics", []) or []) + except Exception: + return [] + + def _set_enum_node(self, node_map, name: str, value: str, *, strict: bool = False) -> bool: + node = self._node(node_map, name) + if node is None: + if strict: + raise RuntimeError(f"GenICam node '{name}' is not available") + LOG.debug("GenICam node '%s' is not available; skipping", name) + return False + + symbolics = self._node_symbolics(node) + if symbolics and value not in symbolics: + if strict: + raise RuntimeError(f"GenICam node '{name}' does not support '{value}'. Available: {symbolics}") + LOG.warning("GenICam node '%s' does not support '%s'. Available: %s", name, value, symbolics) + return False + + try: + node.value = value + return True + except Exception as exc: + if strict: + raise RuntimeError(f"Failed to set GenICam node '{name}' to '{value}': {exc}") from exc + LOG.warning("Failed to set GenICam node '%s' to '%s': %s", name, value, exc) + return False + + @staticmethod + def _trigger_attr(trigger, name: str, default=None): + if isinstance(trigger, dict): + return trigger.get(name, default) + return getattr(trigger, name, default) + + @staticmethod + def _trigger_to_dict(trigger) -> dict[str, Any]: + if trigger is None: + return {} + if isinstance(trigger, dict): + return dict(trigger) + if hasattr(trigger, "model_dump"): + try: + return trigger.model_dump(exclude_none=True) + except Exception: + pass + return {} + + def _resolve_trigger_source(self, node_map, requested: str, *, strict: bool) -> tuple[str, bool]: + """Resolve TriggerSource against the camera-supported GenICam enum values. + + Model-level default is "auto"; this backend maps it to the first preferred + source supported by the actual camera. + """ + requested = str(requested or "auto").strip() + node = self._node(node_map, "TriggerSource") + available = self._node_symbolics(node) + + if not available: + if strict: + raise RuntimeError("GenICam node 'TriggerSource' is not available or has no symbolics") + LOG.warning("GenICam node 'TriggerSource' is not available; disabling trigger input.") + return requested, False + + if requested in available: + return requested, True + + if requested.lower() == "auto": + for candidate in ("Line0", "Line1", "Line2", "Any"): + if candidate in available: + LOG.info( + "GenTL TriggerSource auto-selected '%s'. Available: %s", + candidate, + available, + ) + return candidate, True + + LOG.warning( + "Could not auto-select a GenTL TriggerSource. Available: %s", + available, + ) + return requested, False + + if strict: + raise RuntimeError(f"GenICam node 'TriggerSource' does not support '{requested}'. Available: {available}") + + LOG.warning( + "GenTL TriggerSource '%s' is not available. Available: %s", + requested, + available, + ) + return requested, False def _configure_pixel_format(self, node_map) -> None: try: @@ -1004,12 +1155,149 @@ def _configure_pixel_format(self, node_map) -> None: LOG.warning("Failed to configure pixel format '%s': %s", self._pixel_format, e) def _configure_trigger(self, node_map) -> None: + cfg = self._trigger + role = str(self._trigger_attr(cfg, "role", "off") or "off").strip().lower() + strict = bool(self._trigger_attr(cfg, "strict", False)) + + if role in {"off", "disabled"}: + self._configure_trigger_off(node_map, strict=strict) + return + + if role in {"external", "follower"}: + self._configure_trigger_input(node_map, cfg, strict=strict) + return + + if role == "master": + self._configure_trigger_master(node_map, cfg, strict=strict) + return + + if strict: + raise RuntimeError(f"Unsupported GenTL trigger role: {role!r}") + + LOG.warning("Unsupported GenTL trigger role '%s'; disabling trigger.", role) + self._configure_trigger_off(node_map, strict=False) + + def _configure_trigger_off(self, node_map, *, strict: bool = False) -> None: + self._set_enum_node(node_map, "TriggerMode", "Off", strict=strict) + + def _configure_trigger_input(self, node_map, cfg, *, strict: bool = False) -> None: + role = str(self._trigger_attr(cfg, "role", "external") or "external").strip().lower() + selector = str(self._trigger_attr(cfg, "selector", "FrameStart") or "FrameStart") + source = str(self._trigger_attr(cfg, "source", "Line0") or "Line0") + activation = str(self._trigger_attr(cfg, "activation", "RisingEdge") or "RisingEdge") + + # Disable trigger while changing trigger-related nodes. + self._set_enum_node(node_map, "TriggerMode", "Off", strict=False) + + selector_ok = self._set_enum_node(node_map, "TriggerSelector", selector, strict=strict) + source, source_resolved = self._resolve_trigger_source(node_map, source, strict=strict) + source_ok = source_resolved and self._set_enum_node(node_map, "TriggerSource", source, strict=strict) + activation_ok = self._set_enum_node(node_map, "TriggerActivation", activation, strict=strict) + + # TriggerSelector and TriggerSource are required routing nodes. + # If either failed in non-strict mode, do not arm TriggerMode=On. + # Otherwise the camera may wait on a previous/default input line. + if not (selector_ok and source_ok): + LOG.warning( + "Could not apply GenTL trigger input routing " + "(selector_ok=%s, source_ok=%s); disabling trigger. " + "requested role=%s selector=%s source=%s activation=%s", + selector_ok, + source_ok, + role, + selector, + source, + activation, + ) + self._configure_trigger_off(node_map, strict=False) + self._trigger = CameraTriggerSettings() + return + + if not activation_ok: + LOG.warning( + "Could not apply GenTL TriggerActivation=%s; using camera default/current activation.", + activation, + ) + + self._set_enum_node(node_map, "AcquisitionMode", "Continuous", strict=False) + + if not self._set_enum_node(node_map, "TriggerMode", "On", strict=strict): + LOG.warning("Could not enable GenTL TriggerMode=On; disabling trigger.") + self._configure_trigger_off(node_map, strict=False) + self._trigger = CameraTriggerSettings() + return + + LOG.info( + "GenTL trigger input configured: role=%s selector=%s source=%s activation=%s activation_ok=%s", + role, + selector, + source, + activation, + activation_ok, + ) + + def _configure_trigger_master(self, node_map, cfg, *, strict: bool = False) -> None: + output_line = str(self._trigger_attr(cfg, "output_line", "Line2") or "Line2") + output_source = str(self._trigger_attr(cfg, "output_source", "ExposureActive") or "ExposureActive") + + # Master camera runs freerun and exposes an output signal. + self._configure_trigger_off(node_map, strict=False) + + line_selected = self._set_enum_node( + node_map, + "LineSelector", + output_line, + strict=strict, + ) + + # In non-strict mode, do not continue configuring output behavior if the + # requested line could not be selected. Otherwise we may accidentally drive + # whichever GPIO line the camera had selected previously/defaulted to. + if not line_selected: + LOG.warning( + "Could not select GenTL output line '%s'; skipping trigger output configuration.", + output_line, + ) + return + + mode_ok = self._set_enum_node(node_map, "LineMode", "Output", strict=strict) + source_ok = self._set_enum_node(node_map, "LineSource", output_source, strict=strict) + + if not (mode_ok and source_ok): + LOG.warning( + "GenTL trigger master output configuration incomplete (LineMode ok=%s, LineSource ok=%s).", + mode_ok, + source_ok, + ) + return + + LOG.info( + "GenTL trigger master configured: output_line=%s output_source=%s", + output_line, + output_source, + ) + + def _restore_trigger_idle(self, node_map) -> None: + """Best-effort restore to a safe non-triggering state after acquisition stops. + + Important: + - This should be called after acquirer.stop(), not while acquisition is active. + - It is intentionally non-strict because shutdown should not fail if a node + is missing or read-only. + """ + role = str(self._trigger_attr(getattr(self, "_trigger", None), "role", "off") or "off").lower() + try: - trigger_mode = getattr(node_map, "TriggerMode", None) - if trigger_mode is not None and "Off" in getattr(trigger_mode, "symbolics", []): - trigger_mode.value = "Off" - except Exception as e: - LOG.warning("Failed to disable trigger mode: %s", e) + if role in {"external", "follower"}: + self._set_enum_node(node_map, "TriggerMode", "Off", strict=False) + + elif role == "master": + # Stop driving output if the camera exposes these nodes. + self._set_enum_node(node_map, "LineSource", "Off", strict=False) + self._set_enum_node(node_map, "LineMode", "Input", strict=False) + + except Exception: + LOG.debug("Best-effort GenTL trigger restore failed", exc_info=True) def _configure_resolution(self, node_map) -> None: if self._requested_resolution is None: diff --git a/dlclivegui/cameras/base.py b/dlclivegui/cameras/base.py index bc91ce9..fefedd1 100644 --- a/dlclivegui/cameras/base.py +++ b/dlclivegui/cameras/base.py @@ -70,6 +70,7 @@ class SupportLevel(str, Enum): "set_gain": SupportLevel.UNSUPPORTED, "device_discovery": SupportLevel.UNSUPPORTED, "stable_identity": SupportLevel.UNSUPPORTED, + "hardware_trigger": SupportLevel.UNSUPPORTED, } diff --git a/dlclivegui/config.py b/dlclivegui/config.py index 6d9e1de..c7a9f43 100644 --- a/dlclivegui/config.py +++ b/dlclivegui/config.py @@ -12,6 +12,11 @@ TileLayout = Literal["auto", "2x2", "1x4", "4x1"] Precision = Literal["FP32", "FP16"] ModelType = Literal["pytorch", "tensorflow"] +TriggerRole = Literal["off", "external", "master", "follower"] +TriggerActivation = Literal["RisingEdge", "FallingEdge", "AnyEdge", "LevelHigh", "LevelLow"] + +SINGLE_CAMERA_WORKER_DO_LOG_TIMING = False +MULTI_CAMERA_WORKER_DO_LOG_TIMING = True class CameraSettings(BaseModel): @@ -38,6 +43,27 @@ class CameraSettings(BaseModel): enabled: bool = True properties: dict[str, Any] = Field(default_factory=dict) + def pretty(self) -> str: + crop = ( + "none" + if self.get_crop_region() is None + else f"({self.crop_x0}, {self.crop_y0}) -> ({self.crop_x1 or 'edge'}, {self.crop_y1 or 'edge'})" + ) + return ( + f"CameraSettings[\n" + f" name={self.name!r}, index={self.index}, backend={self.backend!r}, enabled={self.enabled}\n" + f" fps={self.fps}, size={self.width or 'auto'}x{self.height or 'auto'}, " + f"exposure={self.exposure or 'auto'}, gain={self.gain or 'auto'}\n" + f" rotation={self.rotation}, crop={crop}\n" + f"]" + ) + + def __str__(self) -> str: + return self.pretty() + + def __repr__(self) -> str: + return self.pretty() + @field_validator("fps", mode="before") @classmethod def _coerce_fps(cls, v): @@ -168,6 +194,137 @@ def check_diff(old: CameraSettings, new: CameraSettings) -> dict: pass return out + def backend_options(self, backend: str | None = None) -> dict[str, Any]: + key = backend or self.backend + props = self.properties if isinstance(self.properties, dict) else {} + ns = props.get(str(key).lower(), {}) + return ns if isinstance(ns, dict) else {} + + def get_trigger_settings(self, backend: str | None = None) -> CameraTriggerSettings: + ns = self.backend_options(backend) + return CameraTriggerSettings.from_any(ns.get("trigger")) + + def set_trigger_settings(self, trigger: CameraTriggerSettings, backend: str | None = None) -> None: + key = backend or self.backend + if not isinstance(self.properties, dict): + self.properties = {} + ns = self.properties.setdefault(str(key).lower(), {}) + if not isinstance(ns, dict): + ns = {} + self.properties[str(key).lower()] = ns + ns["trigger"] = trigger.to_properties() + + def with_save_defaults(self) -> CameraSettings: + out = self.model_copy(deep=True) + + backend = (out.backend or "").lower() + if backend != "gentl": + return out + + if not isinstance(out.properties, dict): + out.properties = {} + + ns = out.properties.setdefault("gentl", {}) + if not isinstance(ns, dict): + ns = {} + out.properties["gentl"] = ns + + ns.setdefault("trigger", CameraTriggerSettings().to_properties()) + + return out + + +class CameraTriggerSettings(BaseModel): + """ + Generic hardware-trigger settings. + + Backend-specific code may ignore fields that are unsupported by a given + camera/SDK. For GenTL, these map to common GenICam nodes such as: + TriggerMode, TriggerSelector, TriggerSource, TriggerActivation, + LineSelector, LineMode, and LineSource. + """ + + role: TriggerRole = "off" + + # Input trigger config: external/follower + selector: str = "FrameStart" + source: str = "auto" + activation: TriggerActivation | str = "RisingEdge" + + # Output config: master + output_line: str = "Line2" + output_source: str = "ExposureActive" + + # Runtime behavior + timeout: float | None = None + strict: bool = False + + @field_validator("role", mode="before") + @classmethod + def _coerce_role(cls, v): + if v is None: + return "off" + + s = str(v).strip().lower() + aliases = { + "": "off", + "none": "off", + "false": "off", + "disabled": "off", + "disable": "off", + "off": "off", + "true": "external", + "on": "external", + "trigger": "external", + "triggered": "external", + "external": "external", + "follower": "follower", + "slave": "follower", + "master": "master", + "main": "master", + } + return aliases.get(s, s) + + @field_validator("timeout", mode="before") + @classmethod + def _coerce_timeout(cls, v): + if v in (None, ""): + return None + try: + fv = float(v) + except Exception: + return None + return fv if fv > 0 else None + + @field_validator("source", mode="before") + @classmethod + def _coerce_source(cls, v): + if v is None: + return "auto" + + s = str(v).strip() + if not s: + return "auto" + + aliases = { + "default": "auto", + "automatic": "auto", + "device": "auto", + "camera": "auto", + } + return aliases.get(s.lower(), s) + + @classmethod + def from_any(cls, value) -> CameraTriggerSettings: + if isinstance(value, cls): + return value + if isinstance(value, dict): + return cls(**value) + return cls() + + def to_properties(self) -> dict[str, Any]: + return self.model_dump(exclude_none=True) + class MultiCameraSettings(BaseModel): cameras: list[CameraSettings] = Field(default_factory=list) @@ -206,12 +363,19 @@ def from_dict(cls, data: dict[str, Any]) -> MultiCameraSettings: return cls(cameras=cameras, max_cameras=max_cameras, tile_layout=tile_layout) def to_dict(self) -> dict[str, Any]: + out = self.with_save_defaults() return { - "cameras": [cam.model_dump() for cam in self.cameras], - "max_cameras": self.max_cameras, - "tile_layout": self.tile_layout, + "cameras": [cam.model_dump() for cam in out.cameras], + "max_cameras": out.max_cameras, + "tile_layout": out.tile_layout, } + def with_save_defaults(self) -> MultiCameraSettings: + """Return a copy with save defaults applied to all cameras.""" + out = self.model_copy(deep=True) + out.cameras = [cam.with_save_defaults() for cam in out.cameras] + return out + class DynamicCropModel(BaseModel): enabled: bool = False @@ -377,10 +541,13 @@ def from_dict(cls, data: dict[str, Any]) -> ApplicationSettings: ) def to_dict(self) -> dict[str, Any]: + camera = self.camera.with_save_defaults() + multi_camera = self.multi_camera.with_save_defaults() + return { "version": self.version, - "camera": self.camera.model_dump(), - "multi_camera": self.multi_camera.to_dict(), + "camera": camera.model_dump(), + "multi_camera": multi_camera.to_dict(), "dlc": self.dlc.model_dump(), "recording": self.recording.model_dump(), "bbox": self.bbox.model_dump(), diff --git a/dlclivegui/gui/camera_config/camera_config_dialog.py b/dlclivegui/gui/camera_config/camera_config_dialog.py index 4c94fb7..444f273 100644 --- a/dlclivegui/gui/camera_config/camera_config_dialog.py +++ b/dlclivegui/gui/camera_config/camera_config_dialog.py @@ -18,9 +18,10 @@ ) from ...cameras.factory import CameraFactory, DetectedCamera, apply_detected_identity, camera_identity_key -from ...config import CameraSettings, MultiCameraSettings +from ...config import CameraSettings, CameraTriggerSettings, MultiCameraSettings from .loaders import CameraLoadWorker, CameraProbeWorker, CameraScanState, DetectCamerasWorker from .preview import PreviewSession, PreviewState, apply_crop, apply_rotation, resize_to_fit, to_display_pixmap +from .trigger_config_dialog import TriggerConfigDialog from .ui_blocks import setup_camera_config_dialog_ui LOGGER = logging.getLogger(__name__) @@ -328,6 +329,7 @@ def _connect_signals(self) -> None: self.active_cameras_list.currentRowChanged.connect(self._on_active_camera_selected) self.available_cameras_list.currentRowChanged.connect(self._on_available_camera_selected) self.available_cameras_list.itemDoubleClicked.connect(self._on_available_camera_double_clicked) + self.trigger_settings_btn.clicked.connect(self._open_trigger_settings_dialog) self.apply_settings_btn.clicked.connect(self._apply_camera_settings) self.reset_settings_btn.clicked.connect(self._reset_selected_camera) self.preview_btn.clicked.connect(self._toggle_preview) @@ -451,11 +453,24 @@ def _refresh_camera_labels(self) -> None: finally: cam_list.blockSignals(False) + def _trigger_role_for_label(self, cam: CameraSettings) -> str: + backend = (cam.backend or "").lower() + props = cam.properties if isinstance(cam.properties, dict) else {} + ns = props.get(backend, {}) if isinstance(props.get(backend), dict) else {} + trigger = ns.get("trigger", {}) + if not isinstance(trigger, dict): + return "off" + return str(trigger.get("role", "off") or "off").lower() + def _format_camera_label(self, cam: CameraSettings, index: int = -1) -> str: status = "✓" if cam.enabled else "○" this_id = f"{(cam.backend or '').lower()}:{cam.index}" dlc_indicator = " [DLC]" if this_id == self._dlc_camera_id and cam.enabled else "" - return f"{status} {cam.name} [{cam.backend}:{cam.index}]{dlc_indicator}" + + trigger_role = self._trigger_role_for_label(cam) + trigger_indicator = "" if trigger_role in {"off", "disabled"} else f" [{trigger_role}]" + + return f"{status} {cam.name} [{cam.backend}:{cam.index}]{trigger_indicator}{dlc_indicator}" def _selected_detected_camera(self) -> DetectedCamera | None: row = self.available_cameras_list.currentRow() @@ -514,6 +529,9 @@ def apply(widget, feature: str, label: str, *, allow_best_effort: bool = True): apply(self.cam_exposure, "set_exposure", "Exposure") apply(self.cam_gain, "set_gain", "Gain") + # Hardware trigger / sync + apply(self.trigger_settings_btn, "hardware_trigger", "Hardware trigger") + def _set_preview_button_loading(self, loading: bool) -> None: if loading: self.preview_btn.setText("Cancel Loading") @@ -800,6 +818,21 @@ def _on_active_camera_selected(self, row: int) -> None: self._load_camera_to_form(cam) self._start_probe_for_camera(cam, apply_to_requested=False) + def _ensure_default_trigger_config(self, cam: CameraSettings) -> None: + backend = (cam.backend or "").lower() + if backend != "gentl": + return + + if not isinstance(cam.properties, dict): + cam.properties = {} + + ns = cam.properties.setdefault("gentl", {}) + if not isinstance(ns, dict): + ns = {} + cam.properties["gentl"] = ns + + ns.setdefault("trigger", CameraTriggerSettings().model_dump(exclude_none=True)) + def _add_selected_camera(self) -> None: if not self._commit_pending_edits(reason="before adding a new camera"): return @@ -850,6 +883,7 @@ def _add_selected_camera(self) -> None: properties={}, ) apply_detected_identity(new_cam, detected, backend) + self._ensure_default_trigger_config(new_cam) self._working_settings.cameras.append(new_cam) new_index = len(self._working_settings.cameras) - 1 new_item = QListWidgetItem(self._format_camera_label(new_cam, new_index)) @@ -969,6 +1003,7 @@ def _load_camera_to_form(self, cam: CameraSettings) -> None: self.cam_crop_y0.setValue(cam.crop_y0) self.cam_crop_x1.setValue(cam.crop_x1) self.cam_crop_y1.setValue(cam.crop_y1) + self._ensure_default_trigger_config(cam) self.apply_settings_btn.setEnabled(True) self._set_detected_labels(cam) finally: @@ -1029,6 +1064,39 @@ def _enabled_count_with(self, row: int, new_enabled: bool) -> int: count += 1 return count + def _open_trigger_settings_dialog(self) -> None: + """Open per-camera hardware trigger settings dialog.""" + if self._current_edit_index is None: + return + + row = self._current_edit_index + if row < 0 or row >= len(self._working_settings.cameras): + return + + # Commit normal camera edits first so we do not lose pending UI changes. + if not self._commit_pending_edits(reason="before opening trigger settings"): + return + + cam = self._working_settings.cameras[row] + + dlg = TriggerConfigDialog(cam, self) + if dlg.exec() != QDialog.Accepted: + return + + updated = dlg.camera_settings + + self._working_settings.cameras[row] = updated + self._update_active_list_item(row, updated) + self._load_camera_to_form(updated) + + # Trigger changes require reopening the camera preview/backend. + if self._is_preview_live(): + self._append_status("[Trigger] Restarting preview to apply trigger settings.") + self._request_preview_restart(updated, reason="trigger-settings") + + self.apply_settings_btn.setEnabled(False) + self._set_apply_dirty(False) + def _apply_camera_settings(self) -> bool: try: for sb in ( @@ -1085,9 +1153,7 @@ def _apply_camera_settings(self) -> bool: old_settings = current_model restart = False - should_consider_restart = self._preview.state == PreviewState.ACTIVE and isinstance( - old_settings, CameraSettings - ) + should_consider_restart = self._is_preview_live() and isinstance(old_settings, CameraSettings) if should_consider_restart: restart = self._should_restart_preview(old_settings, new_model) @@ -1099,7 +1165,7 @@ def _apply_camera_settings(self) -> bool: new_model.index, ) - if self._preview.state == PreviewState.ACTIVE and restart: + if self._is_preview_live() and restart: self._append_status("[Apply] Restarting preview to apply camera settings changes.") self._request_preview_restart(new_model, reason="apply-settings") @@ -1597,6 +1663,13 @@ def _bump_epoch(self) -> int: self._preview.epoch += 1 return self._preview.epoch + def _trigger_dict_for_cam(self, cam: CameraSettings) -> dict: + backend = (cam.backend or "").lower() + props = cam.properties if isinstance(cam.properties, dict) else {} + ns = props.get(backend, {}) if isinstance(props.get(backend), dict) else {} + trigger = ns.get("trigger", {}) + return trigger if isinstance(trigger, dict) else {} + def _should_restart_preview(self, old: CameraSettings, new: CameraSettings) -> bool: """ Fast UX policy: @@ -1612,6 +1685,9 @@ def _should_restart_preview(self, old: CameraSettings, new: CameraSettings) -> b except Exception: return True # safest: restart + if self._trigger_dict_for_cam(old) != self._trigger_dict_for_cam(new): + return True + # No restart needed if only rotation/crop/enabled changed return False @@ -1756,9 +1832,10 @@ def _on_loader_finished(self, e: int) -> None: self._preview.restart_scheduled = False self._preview.loader = None - if pending and self._preview.state == PreviewState.IDLE: + if pending and self._preview.state in (PreviewState.IDLE, PreviewState.ACTIVE): LOGGER.debug("[Loader] finished with pending restart for backend=%s idx=%s", pending.backend, pending.index) self._begin_preview_load(pending, reason="pending-restart-after-finish") + return # UI sync is already handled in _begin_preview_load self._sync_preview_ui() diff --git a/dlclivegui/gui/camera_config/trigger_config_dialog.py b/dlclivegui/gui/camera_config/trigger_config_dialog.py new file mode 100644 index 0000000..d1efa03 --- /dev/null +++ b/dlclivegui/gui/camera_config/trigger_config_dialog.py @@ -0,0 +1,186 @@ +# dlclivegui/gui/camera_config/trigger_config_dialog.py +from __future__ import annotations + +from PySide6.QtWidgets import ( + QCheckBox, + QComboBox, + QDialog, + QDialogButtonBox, + QDoubleSpinBox, + QFormLayout, + QGroupBox, + QLabel, + QLineEdit, + QMessageBox, + QVBoxLayout, + QWidget, +) + +from ...config import CameraSettings, CameraTriggerSettings + + +def _backend_namespace(cam: CameraSettings) -> dict: + backend = (cam.backend or "").lower() + if not isinstance(cam.properties, dict): + cam.properties = {} + ns = cam.properties.setdefault(backend, {}) + if not isinstance(ns, dict): + ns = {} + cam.properties[backend] = ns + return ns + + +class TriggerConfigDialog(QDialog): + """Small dialog for editing per-camera hardware trigger settings.""" + + def __init__(self, cam: CameraSettings, parent: QWidget | None = None): + super().__init__(parent) + self.setWindowTitle("Configure trigger mode") + self.setMinimumWidth(420) + + self._cam = cam.model_copy(deep=True) + + ns = _backend_namespace(self._cam) + try: + self._trigger = CameraTriggerSettings.from_any(ns.get("trigger")) + except Exception: + self._trigger = CameraTriggerSettings() + + self._setup_ui() + self._load_from_trigger(self._trigger) + self._sync_role_ui() + + @property + def camera_settings(self) -> CameraSettings: + return self._cam + + def _setup_ui(self) -> None: + root = QVBoxLayout(self) + + info = QLabel( + "Configure hardware trigger settings for this camera.\n" + "Use 'auto' for trigger source unless you know the exact GenICam line name. " + "In strict mode, unsupported trigger nodes fail camera open." + ) + info.setWordWrap(True) + root.addWidget(info) + + group = QGroupBox("Hardware Trigger") + form = QFormLayout(group) + + self.role_combo = QComboBox() + self.role_combo.addItem("Off / Free-run", "off") + self.role_combo.addItem("External trigger", "external") + self.role_combo.addItem("Follower", "follower") + self.role_combo.addItem("Master", "master") + form.addRow("Role:", self.role_combo) + + self.selector_edit = QLineEdit() + self.selector_edit.setPlaceholderText("FrameStart") + form.addRow("Trigger selector:", self.selector_edit) + + self.source_edit = QLineEdit() + self.source_edit.setPlaceholderText("auto, Line0, Software, ...") + form.addRow("Trigger source:", self.source_edit) + + self.activation_combo = QComboBox() + for value in ("RisingEdge", "FallingEdge", "AnyEdge", "LevelHigh", "LevelLow"): + self.activation_combo.addItem(value, value) + form.addRow("Activation:", self.activation_combo) + + self.output_line_edit = QLineEdit() + self.output_line_edit.setPlaceholderText("Line2") + form.addRow("Output line:", self.output_line_edit) + + self.output_source_edit = QLineEdit() + self.output_source_edit.setPlaceholderText("ExposureActive") + form.addRow("Output source:", self.output_source_edit) + + self.timeout_spin = QDoubleSpinBox() + self.timeout_spin.setRange(0.0, 3600.0) + self.timeout_spin.setDecimals(3) + self.timeout_spin.setSingleStep(0.1) + self.timeout_spin.setSpecialValueText("Default") + self.timeout_spin.setToolTip( + "Fetch poll timeout in seconds. The backend may cap individual fetches to keep preview shutdown responsive." + ) + form.addRow("Read timeout:", self.timeout_spin) + + self.strict_checkbox = QCheckBox("Strict mode") + self.strict_checkbox.setToolTip("If enabled, missing/unsupported GenICam trigger nodes fail camera open.") + form.addRow(self.strict_checkbox) + + root.addWidget(group) + + buttons = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel) + buttons.accepted.connect(self._accept) + buttons.rejected.connect(self.reject) + root.addWidget(buttons) + + self.role_combo.currentIndexChanged.connect(self._sync_role_ui) + + def _load_from_trigger(self, trigger: CameraTriggerSettings) -> None: + role = str(getattr(trigger, "role", "off") or "off").lower() + idx = self.role_combo.findData(role) + self.role_combo.setCurrentIndex(idx if idx >= 0 else 0) + + self.selector_edit.setText(str(getattr(trigger, "selector", "FrameStart") or "FrameStart")) + self.source_edit.setText(str(getattr(trigger, "source", "auto") or "auto")) + + activation = str(getattr(trigger, "activation", "RisingEdge") or "RisingEdge") + idx = self.activation_combo.findData(activation) + self.activation_combo.setCurrentIndex(idx if idx >= 0 else 0) + + self.output_line_edit.setText(str(getattr(trigger, "output_line", "Line2") or "Line2")) + self.output_source_edit.setText(str(getattr(trigger, "output_source", "ExposureActive") or "ExposureActive")) + + timeout = getattr(trigger, "timeout", None) + self.timeout_spin.setValue(float(timeout) if timeout else 0.0) + + self.strict_checkbox.setChecked(bool(getattr(trigger, "strict", False))) + + def _sync_role_ui(self) -> None: + role = str(self.role_combo.currentData() or "off") + + input_enabled = role in {"external", "follower"} + output_enabled = role == "master" + + self.selector_edit.setEnabled(input_enabled) + self.source_edit.setEnabled(input_enabled) + self.activation_combo.setEnabled(input_enabled) + + self.output_line_edit.setEnabled(output_enabled) + self.output_source_edit.setEnabled(output_enabled) + + # Timeout is mostly useful for external/follower, but harmless for any role. + self.timeout_spin.setEnabled(role in {"external", "follower"}) + + def _accept(self) -> None: + role = str(self.role_combo.currentData() or "off") + + payload = { + "role": role, + "selector": self.selector_edit.text().strip() or "FrameStart", + "source": self.source_edit.text().strip() or "auto", + "activation": str(self.activation_combo.currentData() or "RisingEdge"), + "output_line": self.output_line_edit.text().strip() or "Line2", + "output_source": self.output_source_edit.text().strip() or "ExposureActive", + "strict": bool(self.strict_checkbox.isChecked()), + } + + timeout = float(self.timeout_spin.value()) + if role in {"external", "follower"} and timeout > 0: + payload["timeout"] = timeout + elif role == "off": + payload["timeout"] = None # ensure timeout is cleared when disabling trigger + + try: + trigger = CameraTriggerSettings.from_any(payload) + except Exception as e: + QMessageBox.critical(self, "Error", f"Failed to apply trigger settings: {e}") + return + + ns = _backend_namespace(self._cam) + ns["trigger"] = trigger.to_properties() + + self.accept() diff --git a/dlclivegui/gui/camera_config/ui_blocks.py b/dlclivegui/gui/camera_config/ui_blocks.py index 86e4f19..07a8025 100644 --- a/dlclivegui/gui/camera_config/ui_blocks.py +++ b/dlclivegui/gui/camera_config/ui_blocks.py @@ -354,6 +354,13 @@ def build_settings_group(dlg: CameraConfigDialog) -> QGroupBox: dlg.settings_form.addRow("Crop:", crop_widget) + # --- Trigger settings button --- + dlg.trigger_settings_btn = QPushButton("Trigger Settings…") + dlg.trigger_settings_btn.setIcon(dlg.style().standardIcon(QStyle.StandardPixmap.SP_FileDialogDetailedView)) + dlg.trigger_settings_btn.setEnabled(False) + dlg.trigger_settings_btn.setToolTip("Configure hardware trigger / GPIO sync settings for this camera.") + dlg.settings_form.addRow("Sync:", dlg.trigger_settings_btn) + # Apply/Reset buttons row dlg.apply_settings_btn = QPushButton("Apply Settings") dlg.apply_settings_btn.setIcon(dlg.style().standardIcon(QStyle.StandardPixmap.SP_DialogApplyButton)) diff --git a/dlclivegui/gui/main_window.py b/dlclivegui/gui/main_window.py index 0e3beb0..c16964b 100644 --- a/dlclivegui/gui/main_window.py +++ b/dlclivegui/gui/main_window.py @@ -854,15 +854,23 @@ def _apply_config(self, config: ApplicationSettings) -> None: # Update recording path preview self._update_recording_path_preview() - def _current_config(self) -> ApplicationSettings: - # Get the first camera from multi-camera config for backward compatibility - active_cameras = self._config.multi_camera.get_active_cameras() - camera = active_cameras[0] if active_cameras else CameraSettings() + def _current_config(self, *, allow_empty_model_path=False) -> ApplicationSettings: + multi_camera = self._config.multi_camera + active_cameras = multi_camera.get_active_cameras() + camera = ( + active_cameras[0].model_copy(deep=True) + if active_cameras + else ( + multi_camera.cameras[0].model_copy(deep=True) + if multi_camera.cameras + else self._config.camera.model_copy(deep=True) + ) + ) return ApplicationSettings( camera=camera, - multi_camera=self._config.multi_camera, - dlc=self._dlc_settings_from_ui(), + multi_camera=multi_camera, + dlc=self._dlc_settings_from_ui(allow_empty_model_path=allow_empty_model_path), recording=self._recording_settings_from_ui(), bbox=self._bbox_settings_from_ui(), visualization=self._visualization_settings_from_ui(), @@ -874,14 +882,29 @@ def _parse_json(self, value: str) -> dict: return {} return json.loads(text) - def _dlc_settings_from_ui(self) -> DLCProcessorSettings: + def _dlc_settings_from_ui(self, *, allow_empty_model_path=False) -> DLCProcessorSettings: model_path = self.model_path_edit.text().strip() if Path(model_path).exists() and Path(model_path).suffix == ".pb": # IMPORTANT NOTE: DLClive expects a directory for TensorFlow models, # so if user selects a .pb file, we should pass the parent directory to DLCLive model_path = str(Path(model_path).parent) - if model_path == "": + + existing_dlc = ( # explicitly init from default if unset + self._config.dlc.model_copy(deep=True) + if getattr(self._config, "dlc", None) is not None + else DEFAULT_CONFIG.dlc.model_copy(deep=True) + ) + if not model_path: + if allow_empty_model_path: + # Preserve all existing DLC settings and only clear the model path. + return existing_dlc.model_copy( + update={ + "model_path": "", + } + ) + raise ValueError("Model path cannot be empty. Please enter a valid path to a DLCLive model file.") + try: model_bknd = DLCLiveProcessor.get_model_backend(model_path) except Exception as e: @@ -890,15 +913,13 @@ def _dlc_settings_from_ui(self) -> DLCProcessorSettings: "Please ensure the model file is valid and has an appropriate extension " "(.pt, .pth for PyTorch or model directory for TensorFlow)." ) from e - return DLCProcessorSettings( - model_path=model_path, - model_directory=self._config.dlc.model_directory, # Preserve from config - device=self._config.dlc.device, # Preserve from config - dynamic=self._config.dlc.dynamic, # Preserve from config - resize=self._config.dlc.resize, # Preserve from config - precision=self._config.dlc.precision, # Preserve from config - model_type=model_bknd, - # additional_options=self._parse_json(self.additional_options_edit.toPlainText()), + + # Preserve all unchanged DLC settings and only update values derived from the UI. + return existing_dlc.model_copy( + update={ + "model_path": model_path, + "model_type": model_bknd, + } ) def _recording_settings_from_ui(self) -> RecordingSettings: @@ -965,7 +986,7 @@ def _action_save_config_as(self) -> None: def _save_config_to_path(self, path: Path) -> None: try: - config = self._current_config() + config = self._current_config(allow_empty_model_path=True) config.save(path) self._settings_store.set_last_config_path(str(path)) self._settings_store.save_full_config_snapshot(config) @@ -1265,8 +1286,10 @@ def _refresh_dlc_camera_list_running(self) -> None: """Populate the inference camera dropdown from currently running cameras.""" self.dlc_camera_combo.blockSignals(True) self.dlc_camera_combo.clear() - for cam_id in sorted(self._running_cams_ids): - self.dlc_camera_combo.addItem(self._label_for_cam_id(cam_id), cam_id) + for cam in self._config.multi_camera.get_active_cameras(): + cam_id = get_camera_id(cam) + if cam_id in self._running_cams_ids: + self.dlc_camera_combo.addItem(self._label_for_cam_id(cam_id), cam_id) # Keep current selection if still present, else select first running if self._inference_camera_id in self._running_cams_ids: @@ -1368,7 +1391,7 @@ def _on_multi_frame_ready(self, frame_data: MultiFrameData) -> None: # Determine DLC camera (first active camera) selected_id = self._inference_camera_id - available_ids = sorted(frame_data.frames.keys()) + available_ids = list(frame_data.frames.keys()) if selected_id in frame_data.frames: dlc_cam_id = selected_id else: @@ -1611,7 +1634,7 @@ def _stop_preview(self) -> None: def _configure_dlc(self) -> bool: try: settings = self._dlc_settings_from_ui() - except (ValueError, json.JSONDecodeError) as exc: + except (ValueError, RuntimeError, json.JSONDecodeError) as exc: self._show_error(f"Invalid DLCLive settings: {exc}") return False if not settings.model_path: diff --git a/dlclivegui/main.py b/dlclivegui/main.py index eace8bd..9e38a08 100644 --- a/dlclivegui/main.py +++ b/dlclivegui/main.py @@ -27,32 +27,70 @@ def _maybe_allow_keyboard_interrupt(app: QApplication) -> None: """ - Gracefully handle Ctrl+C (SIGINT) by closing the main window and quitting Qt. + Gracefully handle Ctrl+C/SIGTERM by closing the main window and quitting Qt. + + Notes: + - The small timer keeps Python signal handling responsive while Qt owns the event loop. + - First Ctrl+C tries graceful cleanup via closeEvent(). + - Second Ctrl+C exits immediately with code 130. """ + quitting = {"requested": False} def _request_quit() -> None: + if quitting["requested"]: + return + + quitting["requested"] = True logging.info("Keyboard interrupt received, closing application...") + win = getattr(app, "_main_window", None) + if win is not None: - # Trigger your existing closeEvent cleanup (camera stop, threads, timers, etc.) - win.close() - else: - app.quit() + try: + # Trigger existing closeEvent cleanup: + # camera stop, controller shutdown, timers, DLC shutdown, etc. + win.close() + except Exception: + logging.exception("Error while closing main window after Ctrl+C") + + # Explicitly ask Qt to leave app.exec(). + # Do this even after win.close(), because closeEvent cleanup can be async + # and relying only on quitOnLastWindowClosed can be fragile. + QTimer.singleShot(0, app.quit) + + def _force_exit() -> None: + logging.warning("Second interrupt received, forcing process exit.") + os._exit(130) def _sigint_handler(_signum, _frame) -> None: + if quitting["requested"]: + _force_exit() QTimer.singleShot(0, _request_quit) signal.signal(signal.SIGINT, _sigint_handler) - # Keepalive timer to allow Python to handle signals while Qt is running. - sig_timer = QTimer() - sig_timer.setInterval(100) # 50–200ms typical; keep low overhead + # Ctrl+Break on Windows. + if hasattr(signal, "SIGBREAK"): + signal.signal(signal.SIGBREAK, _sigint_handler) + + # Useful when process is terminated from shells/process managers. + if hasattr(signal, "SIGTERM"): + signal.signal(signal.SIGTERM, _sigint_handler) + + # Parent the timer to app so Qt owns its lifetime. + sig_timer = QTimer(app) + sig_timer.setInterval(100) sig_timer.timeout.connect(lambda: None) sig_timer.start() - if hasattr(app, "_sig_timer"): - app._sig_timer.stop() # Stop any existing timer to avoid duplicates - app._sig_timer = sig_timer # Store on app to keep it alive and allow cleanup on exit + old_timer = getattr(app, "_sig_timer", None) + if old_timer is not None: + try: + old_timer.stop() + except Exception: + pass + + app._sig_timer = sig_timer def configure_logging(debug: bool = False) -> None: diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index 7058d59..014cccc 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -4,7 +4,6 @@ import copy import logging -import time from dataclasses import dataclass from threading import Event, Lock @@ -18,7 +17,8 @@ from dlclivegui.cameras.factory import camera_identity_key # from dlclivegui.config import CameraSettings -from dlclivegui.config import CameraSettings +from dlclivegui.config import MULTI_CAMERA_WORKER_DO_LOG_TIMING, SINGLE_CAMERA_WORKER_DO_LOG_TIMING, CameraSettings +from dlclivegui.utils.stats import WorkerTimingStats LOGGER = logging.getLogger(__name__) @@ -49,6 +49,12 @@ def __init__(self, camera_id: str, settings: CameraSettings): self._backend: CameraBackend | None = None self._max_consecutive_errors = 5 self._retry_delay = 0.1 + self._trigger_timeout_delay = 0.05 + + # Performance logs + self._timing = WorkerTimingStats( + camera_id, logger=LOGGER, log_interval=1.0, enabled=SINGLE_CAMERA_WORKER_DO_LOG_TIMING + ) @Slot() def run(self) -> None: @@ -85,7 +91,8 @@ def run(self) -> None: while not self._stop_event.is_set(): try: - frame, timestamp = self._backend.read() + with self._timing.measure("Single.read"): + frame, timestamp = self._backend.read() if frame is None or frame.size == 0: consecutive_errors += 1 if consecutive_errors >= self._max_consecutive_errors: @@ -93,20 +100,57 @@ def run(self) -> None: self._camera_id, "Too many empty frames.\nWas the device disconnected ?" ) break - time.sleep(self._retry_delay) + if self._stop_event.wait(self._retry_delay): + break continue consecutive_errors = 0 - self.frame_captured.emit(self._camera_id, frame, timestamp) + with self._timing.measure("Single.emit.frame_captured"): + self.frame_captured.emit(self._camera_id, frame, timestamp) + + self._timing.note_frame() + self._timing.maybe_log() + + except TimeoutError as exc: + self._timing.note_timeout() + self._timing.maybe_log() + if self._stop_event.is_set(): + break + + # In hardware-trigger mode, a timeout usually means: + # "no trigger pulse arrived during this poll interval". + # This is expected and should not count as a camera failure. + if bool(getattr(self._backend, "waits_for_hardware_trigger", False)): + LOGGER.debug( + "[Worker %s] waiting for hardware trigger: %s", + self._camera_id, + exc, + ) + consecutive_errors = 0 + + if self._stop_event.wait(self._trigger_timeout_delay): + break # Stop event set during wait + continue + + consecutive_errors += 1 + if consecutive_errors >= self._max_consecutive_errors: + self.error_occurred.emit(self._camera_id, f"Camera read timeout: {exc}") + break + if self._stop_event.wait(self._retry_delay): + break + continue except Exception as exc: + self._timing.note_error() + self._timing.maybe_log() consecutive_errors += 1 if self._stop_event.is_set(): break if consecutive_errors >= self._max_consecutive_errors: self.error_occurred.emit(self._camera_id, f"Camera read error: {exc}") break - time.sleep(self._retry_delay) + if self._stop_event.wait(self._retry_delay): + break continue # Cleanup @@ -138,6 +182,46 @@ def get_camera_id(settings: CameraSettings) -> str: return f"{backend}:index:{int(settings.index)}" +def _trigger_role_from_settings(settings: CameraSettings) -> str: + backend = (settings.backend or "").lower() + props = settings.properties if isinstance(settings.properties, dict) else {} + ns = props.get(backend, {}) if isinstance(props.get(backend), dict) else {} + + trigger = ns.get("trigger", {}) + if not isinstance(trigger, dict): + return "off" + + role = str(trigger.get("role", "off") or "off").strip().lower() + + # Match CameraTriggerSettings aliases enough for controller ordering. + if role in {"true", "on", "trigger", "triggered"}: + return "external" + if role in {"slave"}: + return "follower" + if role in {"main"}: + return "master" + if role in {"false", "none", "disabled", "disable", ""}: + return "off" + + return role + + +def _camera_start_priority(settings: CameraSettings) -> int: + """Start trigger-waiting cameras before trigger-generating cameras. + + Priority: + 0: external/follower cameras, which should be armed first + 1: normal/free-run cameras + 2: master cameras, which may generate trigger pulses + """ + role = _trigger_role_from_settings(settings) + if role in {"external", "follower"}: + return 0 + if role == "master": + return 2 + return 1 + + class MultiCameraController(QObject): """Controller for managing multiple cameras simultaneously.""" @@ -162,9 +246,13 @@ def __init__(self): self._frame_lock = Lock() self._running = False self._started_cameras: set = set() + self._camera_display_order: list[str] = [] self._failed_cameras: dict[str, str] = {} # camera_id -> error message self._expected_cameras: int = 0 # Number of cameras we're trying to start + # Performance logs + self._timing_per_cam: dict[str, WorkerTimingStats] = {} + def is_running(self) -> bool: """Check if any camera is currently running.""" return self._running and len(self._started_cameras) > 0 @@ -173,13 +261,37 @@ def get_active_count(self) -> int: """Get the number of active cameras.""" return len(self._started_cameras) + def _timing_for_camera(self, camera_id: str) -> WorkerTimingStats: + if not MULTI_CAMERA_WORKER_DO_LOG_TIMING: + return WorkerTimingStats(camera_id, enabled=False) + timing = self._timing_per_cam.get(camera_id) + if timing is None: + timing = WorkerTimingStats( + f"Controller {camera_id}", + logger=LOGGER, + log_interval=1.0, + enabled=MULTI_CAMERA_WORKER_DO_LOG_TIMING, + ) + self._timing_per_cam[camera_id] = timing + return timing + def start(self, camera_settings: list[CameraSettings]) -> None: """Start multiple cameras; accepts dataclasses, pydantic models, or dicts.""" if self._running: LOGGER.warning("Multi-camera controller already running") return - active_settings = [s for s in camera_settings if s.enabled][: self.MAX_CAMERAS] + active_settings_user_order = [s for s in camera_settings if s.enabled][: self.MAX_CAMERAS] + if not active_settings_user_order: + LOGGER.warning("No active cameras to start") + return + + # Display/tile order follows the user-configured camera order. + self._camera_display_order = [get_camera_id(s) for s in active_settings_user_order] + + # Startup order may differ for trigger safety: + # followers/external first, master last. + active_settings = sorted(active_settings_user_order, key=_camera_start_priority) if not active_settings: LOGGER.warning("No active cameras to start") return @@ -271,35 +383,59 @@ def stop(self, wait: bool = True) -> None: self._settings.clear() self._started_cameras.clear() self._failed_cameras.clear() + self._camera_display_order.clear() self._expected_cameras = 0 self.all_stopped.emit() def _on_frame_captured(self, camera_id: str, frame: np.ndarray, timestamp: float) -> None: """Handle a frame from one camera.""" # Apply rotation if configured - settings = self._settings.get(camera_id) - if settings and settings.rotation: - frame = MultiCameraController.apply_rotation(frame, settings.rotation) - - # Apply cropping if configured - if settings: - crop_region = settings.get_crop_region() - if crop_region: - frame = MultiCameraController.apply_crop(frame, crop_region) - - with self._frame_lock: - self._frames[camera_id] = frame - self._timestamps[camera_id] = timestamp - - # Emit frame data without tiling (tiling done in GUI for performance) - if self._frames: - frame_data = MultiFrameData( - frames=dict(self._frames), - timestamps=dict(self._timestamps), - source_camera_id=camera_id, # Track which camera triggered this - tiled_frame=None, - ) - self.frame_ready.emit(frame_data) + timing = self._timing_for_camera(camera_id) + + with timing.measure("Multi.slot.total"): + settings = self._settings.get(camera_id) + with timing.measure("Multi.slot.apply_transforms"): + if settings and settings.rotation: + frame = MultiCameraController.apply_rotation(frame, settings.rotation) + + # Apply cropping if configured + if settings: + crop_region = settings.get_crop_region() + if crop_region: + frame = MultiCameraController.apply_crop(frame, crop_region) + with timing.measure("Multi.update_latest"): + with self._frame_lock: + self._frames[camera_id] = frame + self._timestamps[camera_id] = timestamp + + # Emit frame data without tiling (tiling done in GUI for performance) + if self._frames: + ordered_frames: dict[str, np.ndarray] = {} + ordered_timestamps: dict[str, float] = {} + + for cam_id in self._camera_display_order: + if cam_id in self._frames: + ordered_frames[cam_id] = self._frames[cam_id] + if cam_id in self._timestamps: + ordered_timestamps[cam_id] = self._timestamps[cam_id] + + # Any unexpected/legacy IDs, appended deterministically. + for cam_id in self._frames: + if cam_id not in ordered_frames: + ordered_frames[cam_id] = self._frames[cam_id] + for cam_id in self._timestamps: + if cam_id not in ordered_timestamps: + ordered_timestamps[cam_id] = self._timestamps[cam_id] + + frame_data = MultiFrameData( + frames=ordered_frames, + timestamps=ordered_timestamps, + source_camera_id=camera_id, + tiled_frame=None, + ) + self.frame_ready.emit(frame_data) + timing.note_frame() + timing.maybe_log() @staticmethod def apply_rotation(frame: np.ndarray, degrees: int) -> np.ndarray: diff --git a/dlclivegui/utils/display.py b/dlclivegui/utils/display.py index 0eac657..01b0be0 100644 --- a/dlclivegui/utils/display.py +++ b/dlclivegui/utils/display.py @@ -38,10 +38,10 @@ def compute_tiling_geometry( """Compute consistent tiling geometry for both tiling and overlay transforms. Returns: - (sorted_cam_ids, rows, cols, tile_w, tile_h) + (cam_ids, rows, cols, tile_w, tile_h) Notes: - - We intentionally base tile aspect on the first frame in sorted_cam_ids, + - We intentionally base tile aspect on the first frame in cam_ids, because create_tiled_frame uses the same ordering. This guarantees that compute_tile_info() and create_tiled_frame() agree on tile_w/tile_h. - If frames have different aspect ratios, they will be resized (possibly distorted) @@ -50,7 +50,7 @@ def compute_tiling_geometry( if not frames: return ([], 1, 1, 640, 480) - cam_ids = sorted(frames.keys()) + cam_ids = list(frames.keys()) frames_list = [frames[cid] for cid in cam_ids] num_frames = len(frames_list) @@ -63,7 +63,7 @@ def compute_tiling_geometry( max_w, max_h = max_canvas - # Reference aspect is based on the first frame in sorted order (matches tiler). + # Reference aspect is based on the first frame in display order (matches tiler). h0, w0 = frames_list[0].shape[:2] frame_aspect = (w0 / h0) if h0 > 0 else 1.0 @@ -135,7 +135,7 @@ def compute_tile_info( Critical robustness fix: - Tile dimensions are computed from the same reference used by create_tiled_frame() - (first frame in sorted order), so offsets/scales match the actual tiling. + (first frame in display order), so offsets/scales match the actual tiling. """ if not frames: return (0, 0), (1.0, 1.0) diff --git a/dlclivegui/utils/stats.py b/dlclivegui/utils/stats.py index 23e9d57..38e3798 100644 --- a/dlclivegui/utils/stats.py +++ b/dlclivegui/utils/stats.py @@ -1,10 +1,113 @@ # dlclivegui/utils/stats.py from __future__ import annotations +import logging +import time + from dlclivegui.services.dlc_processor import ProcessorStats from dlclivegui.services.video_recorder import RecorderStats +class WorkerTimingStats: + """Tiny timing accumulator for camera worker performance diagnostics. + + Usage: + with stats.measure("read"): + frame, ts = backend.read() + + Logs aggregate timings once per log_interval seconds. + """ + + def __init__( + self, camera_id: str, *, logger: logging.Logger | None = None, log_interval: float = 1.0, enabled: bool = True + ): + self.camera_id = camera_id + self.log_interval = float(log_interval) + self.enabled = bool(enabled) + self.logger = logger or logging.getLogger(__name__) + if self.enabled: # force logger to proper level + if not self.logger.isEnabledFor(logging.DEBUG): + self.logger.setLevel(logging.DEBUG) + + self._last_log = time.perf_counter() + self._frames = 0 + self._timeouts = 0 + self._errors = 0 + self._totals: dict[str, float] = {} + self._counts: dict[str, int] = {} + + class _Measure: + def __init__(self, parent: WorkerTimingStats, name: str): + self.parent = parent + self.name = name + self.t0 = 0.0 + + def __enter__(self): + if self.parent.enabled: + self.t0 = time.perf_counter() + return self + + def __exit__(self, exc_type, exc, tb): + if not self.parent.enabled: + return False + + dt = time.perf_counter() - self.t0 + self.parent._totals[self.name] = self.parent._totals.get(self.name, 0.0) + dt + self.parent._counts[self.name] = self.parent._counts.get(self.name, 0) + 1 + return False + + def measure(self, name: str): + return self._Measure(self, name) + + def note_frame(self) -> None: + if self.enabled: + self._frames += 1 + + def note_timeout(self) -> None: + if self.enabled: + self._timeouts += 1 + + def note_error(self) -> None: + if self.enabled: + self._errors += 1 + + def maybe_log(self) -> None: + if not self.enabled: + return + + now = time.perf_counter() + elapsed = now - self._last_log + if elapsed < self.log_interval: + return + + fps = self._frames / max(elapsed, 1e-9) + + parts = [ + f"[Worker {self.camera_id}]", + f"fps={fps:.1f}", + f"frames={self._frames}", + ] + + if self._timeouts: + parts.append(f"timeouts={self._timeouts}") + if self._errors: + parts.append(f"errors={self._errors}") + + for name in sorted(self._totals): + count = max(self._counts.get(name, 0), 1) + avg_ms = 1000.0 * self._totals[name] / count + parts.append(f"avg_{name}_ms={avg_ms:.3f}") + + self.logger.debug(" ".join(parts)) + + self._last_log = now + self._frames = 0 + self._timeouts = 0 + self._errors = 0 + self._totals.clear() + self._counts.clear() + + def format_recorder_stats(stats: RecorderStats) -> str: latency_ms = stats.last_latency * 1000.0 avg_ms = stats.average_latency * 1000.0 diff --git a/tests/cameras/backends/conftest.py b/tests/cameras/backends/conftest.py index 9459c35..f0a0b6b 100644 --- a/tests/cameras/backends/conftest.py +++ b/tests/cameras/backends/conftest.py @@ -599,6 +599,18 @@ def __init__( self.GainAuto = _FakeNode("Off") self.Gain = _FakeNode(float(gain)) + # Trigger input nodes + self.AcquisitionMode = _FakeNode("Continuous", symbolics=["Continuous", "SingleFrame"]) + self.TriggerSelector = _FakeNode("FrameStart", symbolics=["FrameStart"]) + self.TriggerMode = _FakeNode("Off", symbolics=["Off", "On"]) + self.TriggerSource = _FakeNode("Line1", symbolics=["Line0", "Line1", "Software"]) + self.TriggerActivation = _FakeNode("RisingEdge", symbolics=["RisingEdge", "FallingEdge"]) + + # GPIO output nodes for master/follower setups + self.LineSelector = _FakeNode("Line0", symbolics=["Line0", "Line1", "Line2"]) + self.LineMode = _FakeNode("Input", symbolics=["Input", "Output"]) + self.LineSource = _FakeNode("Off", symbolics=["Off", "ExposureActive", "AcquisitionActive"]) + class _FakeRemoteDevice: def __init__(self, node_map: _FakeNodeMap): diff --git a/tests/cameras/backends/test_gentl_trigger.py b/tests/cameras/backends/test_gentl_trigger.py new file mode 100644 index 0000000..57339a1 --- /dev/null +++ b/tests/cameras/backends/test_gentl_trigger.py @@ -0,0 +1,460 @@ +# tests/cameras/backends/test_gentl_trigger.py +from __future__ import annotations + +import pytest + +# --------------------------------------------------------------------- +# GenTL hardware trigger configuration +# --------------------------------------------------------------------- + + +def _gentl_trigger_settings(gentl_settings_factory, trigger: dict, **kwargs): + """Build CameraSettings with a GenTL trigger block.""" + return gentl_settings_factory(properties={"gentl": {"trigger": trigger}}, **kwargs) + + +def test_gentl_capabilities_advertise_hardware_trigger_best_effort(patch_gentl_sdk): + gb = patch_gentl_sdk + + caps = gb.GenTLCameraBackend.static_capabilities() + + assert caps.get("hardware_trigger") == gb.SupportLevel.BEST_EFFORT + + +def test_trigger_default_off_configures_trigger_mode_off(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = gentl_settings_factory() + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + assert nm.TriggerMode.value == "Off" + + ns = settings.properties.get("gentl", {}) + assert ns.get("trigger_actual", {}).get("role") == "off" + + be.close() + + +def test_trigger_explicit_off_configures_trigger_mode_off(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings(gentl_settings_factory, {"role": "off"}) + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + assert nm.TriggerMode.value == "Off" + assert settings.properties["gentl"]["trigger_actual"]["role"] == "off" + + be.close() + + +def test_trigger_external_configures_input_line_and_timeout(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "external", + "selector": "FrameStart", + "source": "Line0", + "activation": "RisingEdge", + "timeout": gb.GenTLCameraBackend._MAX_HARDWARE_TRIGGER_FETCH_TIMEOUT, + }, + ) + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + assert nm.TriggerSelector.value == "FrameStart" + assert nm.TriggerSource.value == "Line0" + assert nm.TriggerActivation.value == "RisingEdge" + assert nm.TriggerMode.value == "On" + assert be.waits_for_hardware_trigger is True + assert be._timeout == pytest.approx(gb.GenTLCameraBackend._MAX_HARDWARE_TRIGGER_FETCH_TIMEOUT) + + ns = settings.properties["gentl"] + assert ns["trigger_actual"]["role"] == "external" + assert ns["trigger_actual"]["source"] == "Line0" + + be.close() + + +def test_trigger_follower_configures_input_line(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "follower", + "selector": "FrameStart", + "source": "Line1", + "activation": "FallingEdge", + }, + ) + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + assert nm.TriggerSelector.value == "FrameStart" + assert nm.TriggerSource.value == "Line1" + assert nm.TriggerActivation.value == "FallingEdge" + assert nm.TriggerMode.value == "On" + + ns = settings.properties["gentl"] + assert ns["trigger_actual"]["role"] == "follower" + + be.close() + + +def test_trigger_master_configures_output_line_and_keeps_trigger_off(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "master", + "output_line": "Line2", + "output_source": "ExposureActive", + }, + ) + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + assert nm.TriggerMode.value == "Off" + assert nm.LineSelector.value == "Line2" + assert nm.LineMode.value == "Output" + assert nm.LineSource.value == "ExposureActive" + + ns = settings.properties["gentl"] + assert ns["trigger_actual"]["role"] == "master" + assert ns["trigger_actual"]["output_line"] == "Line2" + + be.close() + + +def test_trigger_invalid_source_non_strict_disables_trigger(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "external", + "source": "LineDoesNotExist", + "strict": False, + }, + ) + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + # Source was unsupported, so the fake node should retain its default. + assert nm.TriggerSource.value == "Line1" + + # Safety behavior: do not arm TriggerMode on the previous/default source. + assert nm.TriggerMode.value == "Off" + + # Controller should not treat timeouts as expected trigger waits. + assert be.waits_for_hardware_trigger is False + + # trigger_actual is persisted after _configure_trigger(); since we reset + # self._trigger to off, the effective trigger state is off. + actual = settings.properties["gentl"]["trigger_actual"] + assert actual["role"] == "off" + + be.close() + + +def test_trigger_invalid_source_strict_raises_and_cleans_up(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "external", + "source": "LineDoesNotExist", + "strict": True, + }, + ) + be = gb.GenTLCameraBackend(settings) + + with pytest.raises(RuntimeError): + be.open() + + assert be._harvester is None + assert be._shared_entry is None + assert be._acquirer is None + + +def test_trigger_invalid_master_output_source_non_strict_does_not_crash(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "master", + "output_line": "Line2", + "output_source": "NotARealLineSource", + "strict": False, + }, + ) + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + assert nm.TriggerMode.value == "Off" + assert nm.LineSelector.value == "Line2" + assert nm.LineMode.value == "Output" + # Unsupported source should not be applied in non-strict mode. + assert nm.LineSource.value == "Off" + + be.close() + + +def test_trigger_invalid_master_output_source_strict_raises_and_cleans_up(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "master", + "output_line": "Line2", + "output_source": "NotARealLineSource", + "strict": True, + }, + ) + be = gb.GenTLCameraBackend(settings) + + with pytest.raises(RuntimeError): + be.open() + + assert be._harvester is None + assert be._shared_entry is None + assert be._acquirer is None + + +def test_trigger_alias_on_maps_to_external(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "on", + "source": "Line1", + }, + ) + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + assert nm.TriggerMode.value == "On" + assert nm.TriggerSource.value == "Line1" + assert settings.properties["gentl"]["trigger_actual"]["role"] == "external" + + be.close() + + +def test_trigger_timeout_is_capped_for_hardware_trigger_fetch_polling( + patch_gentl_sdk, + gentl_settings_factory, +): + gb = patch_gentl_sdk + expected_fetch_timeout = gb.GenTLCameraBackend._MAX_HARDWARE_TRIGGER_FETCH_TIMEOUT + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "external", + "timeout": 7.5, + }, + ) + be = gb.GenTLCameraBackend(settings) + + try: + be.open() + + # Hardware-trigger fetch calls are intentionally capped so stop(wait=True) + # is not blocked by a long user trigger timeout. + assert be._timeout == pytest.approx(expected_fetch_timeout) + + # Fake acquisition is started, so read should pass and record the capped timeout. + frame, _ = be.read() + assert frame is not None + assert be._acquirer.fetch_calls[-1] == pytest.approx(expected_fetch_timeout) + + # The requested trigger timeout is still preserved in persisted trigger_actual. + actual = settings.properties["gentl"]["trigger_actual"] + assert actual["timeout"] == pytest.approx(7.5) + + finally: + be.close() + + +def test_trigger_timeout_error_mentions_hardware_trigger_when_waiting( + patch_gentl_sdk, + gentl_settings_factory, +): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "external", + "timeout": 3.0, + }, + ) + # fast_start keeps acquisition stopped; fake fetch then raises timeout. + # This lets us assert the backend timeout message without hardware. + settings.properties["gentl"]["fast_start"] = True + + be = gb.GenTLCameraBackend(settings) + + try: + be.open() + + assert be._timeout == pytest.approx(gb.GenTLCameraBackend._MAX_HARDWARE_TRIGGER_FETCH_TIMEOUT) + + with pytest.raises(TimeoutError) as ei: + be.read() + + msg = str(ei.value).lower() + assert "gentl timeout" in msg + assert "hardware trigger" in msg or "trigger" in msg + + finally: + be.close() + + +def test_trigger_actual_is_persisted_for_debugging(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "follower", + "source": "Line1", + "activation": "FallingEdge", + "timeout": 9.0, + "strict": False, + }, + ) + be = gb.GenTLCameraBackend(settings) + + try: + be.open() + + # Requested timeout remains in trigger_actual for debugging/config visibility. + actual = settings.properties["gentl"].get("trigger_actual") + assert isinstance(actual, dict) + assert actual["role"] == "follower" + assert actual["source"] == "Line1" + assert actual["activation"] == "FallingEdge" + assert actual["timeout"] == pytest.approx(9.0) + + # But each blocking Harvester.fetch() call is capped for responsive shutdown. + assert be._timeout == pytest.approx(gb.GenTLCameraBackend._MAX_HARDWARE_TRIGGER_FETCH_TIMEOUT) + + finally: + be.close() + + +def test_trigger_invalid_selector_non_strict_disables_trigger(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "external", + "selector": "NotARealSelector", + "source": "Line1", + "strict": False, + }, + ) + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + # Selector was unsupported, so the fake node should retain its default. + assert nm.TriggerSelector.value == "FrameStart" + + # Source may have been applied, but trigger must not be armed because + # the required selector routing failed. + assert nm.TriggerSource.value == "Line1" + assert nm.TriggerMode.value == "Off" + assert be.waits_for_hardware_trigger is False + + actual = settings.properties["gentl"]["trigger_actual"] + assert actual["role"] == "off" + + be.close() + + +def test_trigger_timeout_not_capped_for_master_mode(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "master", + "timeout": 7.5, + }, + ) + be = gb.GenTLCameraBackend(settings) + + try: + be.open() + + # Master is free-running / trigger-generating, not waiting for hardware input. + assert be.waits_for_hardware_trigger is False + assert be._timeout == pytest.approx(7.5) + + finally: + be.close() + + +def test_resolve_trigger_source_auto_selects_supported_line( + patch_gentl_sdk, + gentl_settings_factory, +): + gb = patch_gentl_sdk + be = gb.GenTLCameraBackend(gentl_settings_factory()) + + class Node: + symbolics = ["Line1", "Software", "Any"] + + class NodeMap: + TriggerSource = Node() + + source, ok = be._resolve_trigger_source(NodeMap(), "auto", strict=False) + + assert ok is True + assert source == "Line1" + + +def test_resolve_trigger_source_strict_raises_for_unsupported_explicit_line( + patch_gentl_sdk, + gentl_settings_factory, +): + gb = patch_gentl_sdk + be = gb.GenTLCameraBackend(gentl_settings_factory()) + + class Node: + symbolics = ["Line1", "Software", "Any"] + + class NodeMap: + TriggerSource = Node() + + with pytest.raises(RuntimeError, match="TriggerSource.*Line0"): + be._resolve_trigger_source(NodeMap(), "Line0", strict=True) diff --git a/tests/gui/test_app_entrypoint.py b/tests/gui/test_app_entrypoint.py index 0a68bb2..b6b1c60 100644 --- a/tests/gui/test_app_entrypoint.py +++ b/tests/gui/test_app_entrypoint.py @@ -31,6 +31,7 @@ def set_use_splash_false(monkeypatch): @pytest.mark.gui def test_main_with_splash(monkeypatch, set_use_splash_true): appmod = _import_fresh() + monkeypatch.setattr(appmod, "_maybe_allow_keyboard_interrupt", MagicMock(name="_maybe_allow_keyboard_interrupt")) # --- Patch Qt app & icon in the entry module's namespace --- QApplication_cls = MagicMock(name="QApplication") @@ -101,6 +102,7 @@ def immediate_single_shot(ms, fn): @pytest.mark.gui def test_main_without_splash(monkeypatch, set_use_splash_false): appmod = _import_fresh() + monkeypatch.setattr(appmod, "_maybe_allow_keyboard_interrupt", MagicMock(name="_maybe_allow_keyboard_interrupt")) # Patch Qt app creation & window icon QApplication_cls = MagicMock(name="QApplication") diff --git a/tests/services/test_multicam_controller.py b/tests/services/test_multicam_controller.py index eca5021..1f64dcf 100644 --- a/tests/services/test_multicam_controller.py +++ b/tests/services/test_multicam_controller.py @@ -5,7 +5,12 @@ # from dlclivegui.config import CameraSettings from dlclivegui.config import CameraSettings -from dlclivegui.services.multi_camera_controller import MultiCameraController, get_camera_id +from dlclivegui.services.multi_camera_controller import ( + MultiCameraController, + _camera_start_priority, + _trigger_role_from_settings, + get_camera_id, +) @pytest.mark.unit @@ -95,3 +100,297 @@ def _create(_settings): # Expect initialization_failed with the camera id with qtbot.waitSignals([mc.initialization_failed, mc.all_stopped], timeout=2000) as _: mc.start([cam]) + + +@pytest.mark.unit +def test_get_camera_id_prefers_stable_device_id(): + cam = CameraSettings( + name="GenTL Cam", + backend="gentl", + index=0, + properties={ + "gentl": { + "device_id": "serial:30220469", + "serial_number": "30220469", + } + }, + ).apply_defaults() + + assert get_camera_id(cam) == "gentl:serial:30220469" + + +@pytest.mark.unit +def test_get_camera_id_falls_back_to_index_without_stable_identity(): + cam = CameraSettings( + name="Cam", + backend="opencv", + index=2, + ).apply_defaults() + + assert get_camera_id(cam) == "opencv:index:2" + + +@pytest.mark.unit +@pytest.mark.parametrize( + ("role", "expected"), + [ + ("off", "off"), + ("disabled", "off"), + ("on", "external"), + ("triggered", "external"), + ("external", "external"), + ("follower", "follower"), + ("slave", "follower"), + ("master", "master"), + ("main", "master"), + ], +) +def test_trigger_role_from_settings_aliases(role, expected): + cam = CameraSettings( + name="C", + backend="gentl", + index=0, + properties={ + "gentl": { + "trigger": { + "role": role, + } + } + }, + ).apply_defaults() + + assert _trigger_role_from_settings(cam) == expected + + +@pytest.mark.unit +def test_camera_start_priority_orders_trigger_roles(): + external = CameraSettings( + name="External", + backend="gentl", + index=0, + properties={"gentl": {"trigger": {"role": "external"}}}, + ).apply_defaults() + + normal = CameraSettings( + name="Normal", + backend="gentl", + index=1, + properties={"gentl": {"trigger": {"role": "off"}}}, + ).apply_defaults() + + master = CameraSettings( + name="Master", + backend="gentl", + index=2, + properties={"gentl": {"trigger": {"role": "master"}}}, + ).apply_defaults() + + assert _camera_start_priority(external) == 0 + assert _camera_start_priority(normal) == 1 + assert _camera_start_priority(master) == 2 + + +@pytest.mark.unit +def test_start_preserves_user_display_order_even_when_trigger_start_order_differs(qtbot, patch_factory): + mc = MultiCameraController() + + # User wants master first in tiled view, follower second. + # Startup order should still be follower first internally. + master = CameraSettings( + name="Master", + backend="opencv", + index=0, + enabled=True, + properties={ + "opencv": { + "device_id": "master-cam", + "trigger": {"role": "master"}, + } + }, + ).apply_defaults() + + follower = CameraSettings( + name="Follower", + backend="opencv", + index=1, + enabled=True, + properties={ + "opencv": { + "device_id": "follower-cam", + "trigger": {"role": "follower"}, + } + }, + ).apply_defaults() + + expected_display_order = [get_camera_id(master), get_camera_id(follower)] + + try: + with qtbot.waitSignal(mc.all_started, timeout=1500): + mc.start([master, follower]) + + assert mc._camera_display_order == expected_display_order + + finally: + with qtbot.waitSignal(mc.all_stopped, timeout=2000): + mc.stop(wait=True) + + +@pytest.mark.unit +def test_frame_ready_emits_frames_in_user_configured_order(qtbot, patch_factory): + mc = MultiCameraController() + + cam_a = CameraSettings( + name="A", + backend="opencv", + index=0, + enabled=True, + properties={"opencv": {"device_id": "cam-a"}}, + ).apply_defaults() + + cam_b = CameraSettings( + name="B", + backend="opencv", + index=1, + enabled=True, + properties={"opencv": {"device_id": "cam-b"}}, + ).apply_defaults() + + expected_order = [get_camera_id(cam_a), get_camera_id(cam_b)] + seen_orders: list[list[str]] = [] + + def on_ready(mfd): + if len(mfd.frames) >= 2: + seen_orders.append(list(mfd.frames.keys())) + + mc.frame_ready.connect(on_ready) + + try: + with qtbot.waitSignal(mc.all_started, timeout=1500): + mc.start([cam_a, cam_b]) + + qtbot.waitUntil(lambda: bool(seen_orders), timeout=2500) + + assert seen_orders[-1] == expected_order + + finally: + with qtbot.waitSignal(mc.all_stopped, timeout=2000): + mc.stop(wait=True) + + +@pytest.mark.unit +def test_display_order_is_cleared_on_stop(qtbot, patch_factory): + mc = MultiCameraController() + + cam = CameraSettings( + name="C", + backend="opencv", + index=0, + enabled=True, + properties={"opencv": {"device_id": "cam-0"}}, + ).apply_defaults() + + try: + with qtbot.waitSignal(mc.all_started, timeout=1500): + mc.start([cam]) + + assert mc._camera_display_order == [get_camera_id(cam)] + + finally: + with qtbot.waitSignal(mc.all_stopped, timeout=2000): + mc.stop(wait=True) + + assert mc._camera_display_order == [] + + +@pytest.mark.unit +def test_hardware_trigger_timeouts_are_not_fatal(qtbot, monkeypatch): + class WaitingTriggerBackend: + waits_for_hardware_trigger = True + + def __init__(self, settings): + self.settings = settings + self.opened = False + self.closed = False + + def open(self): + self.opened = True + + def read(self): + raise TimeoutError("waiting for hardware trigger") + + def close(self): + self.closed = True + + def _create(settings): + return WaitingTriggerBackend(settings) + + monkeypatch.setattr(CameraFactory, "create", staticmethod(_create)) + + mc = MultiCameraController() + cam = CameraSettings( + name="Triggered", + backend="gentl", + index=0, + enabled=True, + properties={ + "gentl": { + "device_id": "serial:30220469", + "trigger": {"role": "external", "timeout": 0.1}, + } + }, + ).apply_defaults() + + errors: list[tuple[str, str]] = [] + mc.camera_error.connect(lambda cam_id, msg: errors.append((cam_id, msg))) + + try: + with qtbot.waitSignal(mc.all_started, timeout=1500): + mc.start([cam]) + + # Let several timeout cycles happen. + qtbot.wait(500) + + assert mc.is_running() + assert errors == [] + + finally: + with qtbot.waitSignal(mc.all_stopped, timeout=2000): + mc.stop(wait=True) + + +@pytest.mark.unit +def test_non_trigger_timeouts_are_fatal_after_retries(qtbot, monkeypatch): + class TimeoutBackend: + waits_for_hardware_trigger = False + + def __init__(self, settings): + self.settings = settings + + def open(self): + pass + + def read(self): + raise TimeoutError("camera timeout") + + def close(self): + pass + + def _create(settings): + return TimeoutBackend(settings) + + monkeypatch.setattr(CameraFactory, "create", staticmethod(_create)) + + mc = MultiCameraController() + cam = CameraSettings(name="TimeoutCam", backend="opencv", index=0, enabled=True).apply_defaults() + + with qtbot.waitSignal(mc.camera_error, timeout=3000) as blocker: + mc.start([cam]) + + cam_id, msg = blocker.args + assert cam_id == get_camera_id(cam) + assert "Camera read timeout" in msg + + # Cleanup if still running. + if mc.is_running(): + with qtbot.waitSignal(mc.all_stopped, timeout=2000): + mc.stop(wait=True) diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..9f82017 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,43 @@ +import pytest + +from dlclivegui.config import ApplicationSettings, CameraSettings, CameraTriggerSettings, MultiCameraSettings + + +@pytest.mark.unit +def test_save_applies_gentl_trigger_defaults_to_top_level_camera(): + cam = CameraSettings( + backend="gentl", + properties={"gentl": {}}, + ) + + settings = ApplicationSettings( + camera=cam, + multi_camera=MultiCameraSettings(cameras=[cam]), + ) + + data = settings.to_dict() + + assert "trigger" in data["camera"]["properties"]["gentl"] + + +@pytest.mark.unit +def test_save_applies_gentl_trigger_defaults_to_multi_camera(): + cam = CameraSettings( + backend="gentl", + properties={"gentl": {}}, + ) + + settings = ApplicationSettings( + multi_camera=MultiCameraSettings(cameras=[cam]), + ) + + data = settings.to_dict() + + assert "trigger" in data["multi_camera"]["cameras"][0]["properties"]["gentl"] + + +@pytest.mark.unit +def test_trigger_source_defaults_to_auto(): + trigger = CameraTriggerSettings() + + assert trigger.source == "auto" diff --git a/tests/utils/test_display.py b/tests/utils/test_display.py index 9ce8d49..559aa15 100644 --- a/tests/utils/test_display.py +++ b/tests/utils/test_display.py @@ -41,7 +41,9 @@ def test_compute_tiling_geometry_single_frame_respects_max_canvas_and_min_tile() def test_compute_tiling_geometry_two_frames_is_1x2(): frames = {"camB": _frame(480, 640, 3), "camA": _frame(480, 640, 3)} cam_ids, rows, cols, tile_w, tile_h = compute_tiling_geometry(frames, max_canvas=(1200, 800)) - assert cam_ids == ["camA", "camB"] # sorted + + # Preserve insertion/display order, do not sort by camera ID. + assert cam_ids == ["camB", "camA"] assert (rows, cols) == (1, 2) assert tile_w >= 160 and tile_h >= 120 @@ -49,25 +51,106 @@ def test_compute_tiling_geometry_two_frames_is_1x2(): def test_compute_tiling_geometry_three_frames_is_2x2(): frames = {"c3": _frame(480, 640, 3), "c1": _frame(480, 640, 3), "c2": _frame(480, 640, 3)} cam_ids, rows, cols, tile_w, tile_h = compute_tiling_geometry(frames, max_canvas=(1200, 800)) - assert cam_ids == ["c1", "c2", "c3"] + + # Preserve insertion/display order. + assert cam_ids == ["c3", "c1", "c2"] assert (rows, cols) == (2, 2) assert tile_w >= 160 and tile_h >= 120 -def test_compute_tiling_geometry_reference_aspect_is_first_sorted_cam(): - # camA has aspect 2.0 (w/h), camB has aspect 0.5 +def test_compute_tiling_geometry_reference_aspect_is_first_display_order_cam(): + # camB is first in insertion/display order and has aspect 0.5. + # camA has aspect 2.0. frames = { - "camB": _frame(400, 200, 3), - "camA": _frame(200, 400, 3), + "camB": _frame(400, 200, 3), # aspect = 200 / 400 = 0.5 + "camA": _frame(200, 400, 3), # aspect = 400 / 200 = 2.0 } + cam_ids, rows, cols, tile_w, tile_h = compute_tiling_geometry(frames, max_canvas=(1200, 800)) - assert cam_ids == ["camA", "camB"] + + assert cam_ids == ["camB", "camA"] # For 2 cams, rows=1 cols=2 => initial tile_w=600 tile_h=800 => tile_aspect=0.75 - # frame_aspect for camA = 400/200 = 2.0 > 0.75 => tile_h adjusted to tile_w/frame_aspect = 600/2 = 300 + # frame_aspect for camB = 0.5 <= 0.75 => tile_w adjusted to tile_h * frame_aspect = 800 * 0.5 = 400 + assert (rows, cols) == (1, 2) + assert tile_w == 400 + assert tile_h == 800 + + +def test_compute_tiling_geometry_preserves_frame_insertion_order(): + frames = { + "gentl:serial:30220469": np.zeros((10, 20, 3), dtype=np.uint8), + "gentl:serial:10620051": np.zeros((10, 20, 3), dtype=np.uint8), + } + + cam_ids, rows, cols, tile_w, tile_h = compute_tiling_geometry(frames) + + assert cam_ids == ["gentl:serial:30220469", "gentl:serial:10620051"] + assert rows == 1 + assert cols == 2 + assert tile_w > 0 + assert tile_h > 0 + + +def test_compute_tiling_geometry_preserves_reversed_insertion_order(): + frames = { + "gentl:serial:10620051": np.zeros((10, 20, 3), dtype=np.uint8), + "gentl:serial:30220469": np.zeros((10, 20, 3), dtype=np.uint8), + } + + cam_ids, *_ = compute_tiling_geometry(frames) + + assert cam_ids == ["gentl:serial:10620051", "gentl:serial:30220469"] + + +def test_compute_tile_info_uses_display_order_for_offsets(): + cam0 = "gentl:serial:30220469" + cam1 = "gentl:serial:10620051" + + frames = { + cam0: np.zeros((100, 200, 3), dtype=np.uint8), + cam1: np.zeros((100, 200, 3), dtype=np.uint8), + } + + cam_ids, rows, cols, tile_w, tile_h = compute_tiling_geometry(frames) + + offset0, scale0 = compute_tile_info(cam0, frames[cam0], frames) + offset1, scale1 = compute_tile_info(cam1, frames[cam1], frames) + + assert cam_ids == [cam0, cam1] + assert offset0 == (0, 0) + assert offset1 == (tile_w, 0) + assert scale0[0] > 0 + assert scale0[1] > 0 + assert scale1[0] > 0 + assert scale1[1] > 0 + + +def test_create_tiled_frame_preserves_display_order_by_tile_content(): + # First frame is blue-ish, second is red-ish. + first = np.zeros((100, 100, 3), dtype=np.uint8) + first[:, :] = (255, 0, 0) # BGR blue + + second = np.zeros((100, 100, 3), dtype=np.uint8) + second[:, :] = (0, 0, 255) # BGR red + + frames = { + "gentl:serial:30220469": first, + "gentl:serial:10620051": second, + } + + cam_ids, rows, cols, tile_w, tile_h = compute_tiling_geometry(frames, max_canvas=(400, 200)) + out = create_tiled_frame(frames, max_canvas=(400, 200)) + + assert cam_ids == ["gentl:serial:30220469", "gentl:serial:10620051"] assert (rows, cols) == (1, 2) - assert tile_w == 600 - assert tile_h == 300 + + # Sample away from text label area. + left_sample = out[tile_h // 2, tile_w // 2] + right_sample = out[tile_h // 2, tile_w + tile_w // 2] + + assert left_sample[0] > left_sample[2] # blue tile first + assert right_sample[2] > right_sample[0] # red tile second def test_create_tiled_frame_empty_returns_default_canvas(): @@ -110,16 +193,18 @@ def test_create_tiled_frame_canvas_shape_matches_geometry(): def test_compute_tile_info_offset_and_scale_matches_tiling(): - # 2 frames => 1x2 tiling, cam ids sorted: ["cam1", "cam2"] + # 2 frames => 1x2 tiling, preserving insertion/display order: ["cam2", "cam1"] frames = {"cam2": _frame(200, 400, 3), "cam1": _frame(200, 400, 3)} cam_ids, rows, cols, tile_w, tile_h = compute_tiling_geometry(frames, max_canvas=(1200, 800)) original = _frame(200, 400, 3) (ox, oy), (sx, sy) = compute_tile_info("cam2", original, frames, max_canvas=(1200, 800)) - # cam2 is index 1 -> row 0 col 1 + assert cam_ids == ["cam2", "cam1"] assert (rows, cols) == (1, 2) - assert ox == tile_w + + # cam2 is first in display order => row 0 col 0 + assert ox == 0 assert oy == 0 assert sx == pytest.approx(tile_w / 400) assert sy == pytest.approx(tile_h / 200)