diff --git a/dlclivegui/cameras/backends/gentl_backend.py b/dlclivegui/cameras/backends/gentl_backend.py index b55abdf..0992db0 100644 --- a/dlclivegui/cameras/backends/gentl_backend.py +++ b/dlclivegui/cameras/backends/gentl_backend.py @@ -7,11 +7,12 @@ import threading import time from pathlib import Path -from typing import ClassVar +from typing import Any, ClassVar import cv2 import numpy as np +from ...config import CameraTriggerSettings from ..base import CameraBackend, SupportLevel, register_backend from ..factory import DetectedCamera from .utils import gentl_discovery as cti_finder @@ -75,6 +76,11 @@ class GenTLCameraBackend(CameraBackend): _CTI_FILES_SOURCE_AUTO: ClassVar[str] = "auto" _CTI_FILES_SOURCE_USER: ClassVar[str] = "user" + # Keep individual Harvester.fetch() calls short enough that controller + # shutdown can stop worker threads promptly. Hardware-trigger waits are + # handled by repeated polling in SingleCameraWorker. + _MAX_HARDWARE_TRIGGER_FETCH_TIMEOUT: ClassVar[float] = 1.0 + def __init__(self, settings): super().__init__(settings) @@ -105,6 +111,35 @@ def __init__(self, settings): self._gain = self._positive_float(ns.get("gain", props.get("gain"))) self._timeout: float = float(ns.get("timeout", props.get("timeout", 2.0))) + raw_trigger = ns.get("trigger", props.get("trigger")) + raw_trigger_strict = isinstance(raw_trigger, dict) and bool(raw_trigger.get("strict", False)) + + try: + self._trigger = CameraTriggerSettings.from_any(raw_trigger) + except Exception as exc: + if raw_trigger_strict: + raise ValueError(f"Strict mode failure - Invalid GenTL trigger configuration: {exc}") from exc + + LOG.warning( + "Invalid GenTL trigger config; falling back to trigger role=off: %s. " + "Enable strict mode to force this to raise.", + exc, + ) + self._trigger = CameraTriggerSettings() + + trigger_timeout = self._positive_float(self._trigger_attr(self._trigger, "timeout", None)) + if trigger_timeout is not None: + role = str(self._trigger_attr(self._trigger, "role", "off") or "off").strip().lower() + + if role in {"external", "follower"}: + # Do not let a long hardware-trigger wait block shutdown. + # SingleCameraWorker treats these fetch timeouts as expected + # polling misses while waits_for_hardware_trigger is true. + self._timeout = min(float(trigger_timeout), self._MAX_HARDWARE_TRIGGER_FETCH_TIMEOUT) + else: + # For non-trigger-waiting modes, preserve legacy behavior. + self._timeout = float(trigger_timeout) + self._requested_resolution: tuple[int, int] | None = self._get_requested_resolution_or_none() self._actual_width: int | None = None @@ -154,6 +189,7 @@ def static_capabilities(cls) -> dict[str, SupportLevel]: "set_gain": SupportLevel.SUPPORTED, "device_discovery": SupportLevel.SUPPORTED, "stable_identity": SupportLevel.SUPPORTED, + "hardware_trigger": SupportLevel.BEST_EFFORT, } # ------------------------------------------------------------------ @@ -420,11 +456,12 @@ def open(self) -> None: self._device_label = self._resolve_device_label(node_map) self._configure_pixel_format(node_map) - self._configure_trigger(node_map) self._configure_resolution(node_map) self._configure_exposure(node_map) self._configure_gain(node_map) self._configure_frame_rate(node_map) + self._configure_trigger(node_map) # keep low in the list + self._ensure_settings_ns()["trigger_actual"] = self._trigger_to_dict(self._trigger) self._read_telemetry(node_map) self._persist_device_metadata(selected_info, selected_serial) @@ -449,6 +486,11 @@ def open(self) -> None: f"Failed to open GenTL camera.\n\nLoaded CTIs: {loaded}\nFailed CTIs: {failed}\nReason: {exc}" ) from exc + @property + def waits_for_hardware_trigger(self) -> bool: + role = str(self._trigger_attr(getattr(self, "_trigger", None), "role", "off") or "off").lower() + return role in {"external", "follower"} + def read(self) -> tuple[np.ndarray, float]: if self._acquirer is None: raise RuntimeError("GenTL image acquirer not initialised") @@ -470,6 +512,8 @@ def read(self) -> tuple[np.ndarray, float]: except ValueError: frame = array.copy() except HarvesterTimeoutError as exc: + if self.waits_for_hardware_trigger: + raise TimeoutError(str(exc) + " (GenTL timeout; waiting for hardware trigger?)") from exc raise TimeoutError(str(exc) + " (GenTL timeout)") from exc frame = self._convert_frame(frame) @@ -502,6 +546,12 @@ def close(self) -> None: except Exception: pass + try: + node_map = self._acquirer.remote_device.node_map + self._call_with_optional_lock(self._restore_trigger_idle, node_map) + except Exception: + pass + try: destroy = getattr(self._acquirer, "destroy", None) if destroy is not None: @@ -949,6 +999,107 @@ def _create_acquirer(self, serial: str | None, index: int): # ------------------------------------------------------------------ # Camera configuration helpers # ------------------------------------------------------------------ + @staticmethod + def _node(node_map, name: str): + try: + return getattr(node_map, name) + except Exception: + return None + + @staticmethod + def _node_symbolics(node) -> list[str]: + try: + return list(getattr(node, "symbolics", []) or []) + except Exception: + return [] + + def _set_enum_node(self, node_map, name: str, value: str, *, strict: bool = False) -> bool: + node = self._node(node_map, name) + if node is None: + if strict: + raise RuntimeError(f"GenICam node '{name}' is not available") + LOG.debug("GenICam node '%s' is not available; skipping", name) + return False + + symbolics = self._node_symbolics(node) + if symbolics and value not in symbolics: + if strict: + raise RuntimeError(f"GenICam node '{name}' does not support '{value}'. Available: {symbolics}") + LOG.warning("GenICam node '%s' does not support '%s'. Available: %s", name, value, symbolics) + return False + + try: + node.value = value + return True + except Exception as exc: + if strict: + raise RuntimeError(f"Failed to set GenICam node '{name}' to '{value}': {exc}") from exc + LOG.warning("Failed to set GenICam node '%s' to '%s': %s", name, value, exc) + return False + + @staticmethod + def _trigger_attr(trigger, name: str, default=None): + if isinstance(trigger, dict): + return trigger.get(name, default) + return getattr(trigger, name, default) + + @staticmethod + def _trigger_to_dict(trigger) -> dict[str, Any]: + if trigger is None: + return {} + if isinstance(trigger, dict): + return dict(trigger) + if hasattr(trigger, "model_dump"): + try: + return trigger.model_dump(exclude_none=True) + except Exception: + pass + return {} + + def _resolve_trigger_source(self, node_map, requested: str, *, strict: bool) -> tuple[str, bool]: + """Resolve TriggerSource against the camera-supported GenICam enum values. + + Model-level default is "auto"; this backend maps it to the first preferred + source supported by the actual camera. + """ + requested = str(requested or "auto").strip() + node = self._node(node_map, "TriggerSource") + available = self._node_symbolics(node) + + if not available: + if strict: + raise RuntimeError("GenICam node 'TriggerSource' is not available or has no symbolics") + LOG.warning("GenICam node 'TriggerSource' is not available; disabling trigger input.") + return requested, False + + if requested in available: + return requested, True + + if requested.lower() == "auto": + for candidate in ("Line0", "Line1", "Line2", "Any"): + if candidate in available: + LOG.info( + "GenTL TriggerSource auto-selected '%s'. Available: %s", + candidate, + available, + ) + return candidate, True + + LOG.warning( + "Could not auto-select a GenTL TriggerSource. Available: %s", + available, + ) + return requested, False + + if strict: + raise RuntimeError(f"GenICam node 'TriggerSource' does not support '{requested}'. Available: {available}") + + LOG.warning( + "GenTL TriggerSource '%s' is not available. Available: %s", + requested, + available, + ) + return requested, False def _configure_pixel_format(self, node_map) -> None: try: @@ -1004,12 +1155,149 @@ def _configure_pixel_format(self, node_map) -> None: LOG.warning("Failed to configure pixel format '%s': %s", self._pixel_format, e) def _configure_trigger(self, node_map) -> None: + cfg = self._trigger + role = str(self._trigger_attr(cfg, "role", "off") or "off").strip().lower() + strict = bool(self._trigger_attr(cfg, "strict", False)) + + if role in {"off", "disabled"}: + self._configure_trigger_off(node_map, strict=strict) + return + + if role in {"external", "follower"}: + self._configure_trigger_input(node_map, cfg, strict=strict) + return + + if role == "master": + self._configure_trigger_master(node_map, cfg, strict=strict) + return + + if strict: + raise RuntimeError(f"Unsupported GenTL trigger role: {role!r}") + + LOG.warning("Unsupported GenTL trigger role '%s'; disabling trigger.", role) + self._configure_trigger_off(node_map, strict=False) + + def _configure_trigger_off(self, node_map, *, strict: bool = False) -> None: + self._set_enum_node(node_map, "TriggerMode", "Off", strict=strict) + + def _configure_trigger_input(self, node_map, cfg, *, strict: bool = False) -> None: + role = str(self._trigger_attr(cfg, "role", "external") or "external").strip().lower() + selector = str(self._trigger_attr(cfg, "selector", "FrameStart") or "FrameStart") + source = str(self._trigger_attr(cfg, "source", "Line0") or "Line0") + activation = str(self._trigger_attr(cfg, "activation", "RisingEdge") or "RisingEdge") + + # Disable trigger while changing trigger-related nodes. + self._set_enum_node(node_map, "TriggerMode", "Off", strict=False) + + selector_ok = self._set_enum_node(node_map, "TriggerSelector", selector, strict=strict) + source, source_resolved = self._resolve_trigger_source(node_map, source, strict=strict) + source_ok = source_resolved and self._set_enum_node(node_map, "TriggerSource", source, strict=strict) + activation_ok = self._set_enum_node(node_map, "TriggerActivation", activation, strict=strict) + + # TriggerSelector and TriggerSource are required routing nodes. + # If either failed in non-strict mode, do not arm TriggerMode=On. + # Otherwise the camera may wait on a previous/default input line. + if not (selector_ok and source_ok): + LOG.warning( + "Could not apply GenTL trigger input routing " + "(selector_ok=%s, source_ok=%s); disabling trigger. " + "requested role=%s selector=%s source=%s activation=%s", + selector_ok, + source_ok, + role, + selector, + source, + activation, + ) + self._configure_trigger_off(node_map, strict=False) + self._trigger = CameraTriggerSettings() + return + + if not activation_ok: + LOG.warning( + "Could not apply GenTL TriggerActivation=%s; using camera default/current activation.", + activation, + ) + + self._set_enum_node(node_map, "AcquisitionMode", "Continuous", strict=False) + + if not self._set_enum_node(node_map, "TriggerMode", "On", strict=strict): + LOG.warning("Could not enable GenTL TriggerMode=On; disabling trigger.") + self._configure_trigger_off(node_map, strict=False) + self._trigger = CameraTriggerSettings() + return + + LOG.info( + "GenTL trigger input configured: role=%s selector=%s source=%s activation=%s activation_ok=%s", + role, + selector, + source, + activation, + activation_ok, + ) + + def _configure_trigger_master(self, node_map, cfg, *, strict: bool = False) -> None: + output_line = str(self._trigger_attr(cfg, "output_line", "Line2") or "Line2") + output_source = str(self._trigger_attr(cfg, "output_source", "ExposureActive") or "ExposureActive") + + # Master camera runs freerun and exposes an output signal. + self._configure_trigger_off(node_map, strict=False) + + line_selected = self._set_enum_node( + node_map, + "LineSelector", + output_line, + strict=strict, + ) + + # In non-strict mode, do not continue configuring output behavior if the + # requested line could not be selected. Otherwise we may accidentally drive + # whichever GPIO line the camera had selected previously/defaulted to. + if not line_selected: + LOG.warning( + "Could not select GenTL output line '%s'; skipping trigger output configuration.", + output_line, + ) + return + + mode_ok = self._set_enum_node(node_map, "LineMode", "Output", strict=strict) + source_ok = self._set_enum_node(node_map, "LineSource", output_source, strict=strict) + + if not (mode_ok and source_ok): + LOG.warning( + "GenTL trigger master output configuration incomplete (LineMode ok=%s, LineSource ok=%s).", + mode_ok, + source_ok, + ) + return + + LOG.info( + "GenTL trigger master configured: output_line=%s output_source=%s", + output_line, + output_source, + ) + + def _restore_trigger_idle(self, node_map) -> None: + """Best-effort restore to a safe non-triggering state after acquisition stops. + + Important: + - This should be called after acquirer.stop(), not while acquisition is active. + - It is intentionally non-strict because shutdown should not fail if a node + is missing or read-only. + """ + role = str(self._trigger_attr(getattr(self, "_trigger", None), "role", "off") or "off").lower() + try: - trigger_mode = getattr(node_map, "TriggerMode", None) - if trigger_mode is not None and "Off" in getattr(trigger_mode, "symbolics", []): - trigger_mode.value = "Off" - except Exception as e: - LOG.warning("Failed to disable trigger mode: %s", e) + if role in {"external", "follower"}: + self._set_enum_node(node_map, "TriggerMode", "Off", strict=False) + + elif role == "master": + # Stop driving output if the camera exposes these nodes. + self._set_enum_node(node_map, "LineSource", "Off", strict=False) + self._set_enum_node(node_map, "LineMode", "Input", strict=False) + + except Exception: + LOG.debug("Best-effort GenTL trigger restore failed", exc_info=True) def _configure_resolution(self, node_map) -> None: if self._requested_resolution is None: diff --git a/dlclivegui/cameras/base.py b/dlclivegui/cameras/base.py index bc91ce9..fefedd1 100644 --- a/dlclivegui/cameras/base.py +++ b/dlclivegui/cameras/base.py @@ -70,6 +70,7 @@ class SupportLevel(str, Enum): "set_gain": SupportLevel.UNSUPPORTED, "device_discovery": SupportLevel.UNSUPPORTED, "stable_identity": SupportLevel.UNSUPPORTED, + "hardware_trigger": SupportLevel.UNSUPPORTED, } diff --git a/dlclivegui/config.py b/dlclivegui/config.py index 6d9e1de..1153016 100644 --- a/dlclivegui/config.py +++ b/dlclivegui/config.py @@ -12,6 +12,8 @@ TileLayout = Literal["auto", "2x2", "1x4", "4x1"] Precision = Literal["FP32", "FP16"] ModelType = Literal["pytorch", "tensorflow"] +TriggerRole = Literal["off", "external", "master", "follower"] +TriggerActivation = Literal["RisingEdge", "FallingEdge", "AnyEdge", "LevelHigh", "LevelLow"] class CameraSettings(BaseModel): @@ -168,6 +170,137 @@ def check_diff(old: CameraSettings, new: CameraSettings) -> dict: pass return out + def backend_options(self, backend: str | None = None) -> dict[str, Any]: + key = backend or self.backend + props = self.properties if isinstance(self.properties, dict) else {} + ns = props.get(str(key).lower(), {}) + return ns if isinstance(ns, dict) else {} + + def get_trigger_settings(self, backend: str | None = None) -> CameraTriggerSettings: + ns = self.backend_options(backend) + return CameraTriggerSettings.from_any(ns.get("trigger")) + + def set_trigger_settings(self, trigger: CameraTriggerSettings, backend: str | None = None) -> None: + key = backend or self.backend + if not isinstance(self.properties, dict): + self.properties = {} + ns = self.properties.setdefault(str(key).lower(), {}) + if not isinstance(ns, dict): + ns = {} + self.properties[str(key).lower()] = ns + ns["trigger"] = trigger.to_properties() + + def with_save_defaults(self) -> CameraSettings: + out = self.model_copy(deep=True) + + backend = (out.backend or "").lower() + if backend != "gentl": + return out + + if not isinstance(out.properties, dict): + out.properties = {} + + ns = out.properties.setdefault("gentl", {}) + if not isinstance(ns, dict): + ns = {} + out.properties["gentl"] = ns + + ns.setdefault("trigger", CameraTriggerSettings().to_properties()) + + return out + + +class CameraTriggerSettings(BaseModel): + """ + Generic hardware-trigger settings. + + Backend-specific code may ignore fields that are unsupported by a given + camera/SDK. For GenTL, these map to common GenICam nodes such as: + TriggerMode, TriggerSelector, TriggerSource, TriggerActivation, + LineSelector, LineMode, and LineSource. + """ + + role: TriggerRole = "off" + + # Input trigger config: external/follower + selector: str = "FrameStart" + source: str = "auto" + activation: TriggerActivation | str = "RisingEdge" + + # Output config: master + output_line: str = "Line2" + output_source: str = "ExposureActive" + + # Runtime behavior + timeout: float | None = None + strict: bool = False + + @field_validator("role", mode="before") + @classmethod + def _coerce_role(cls, v): + if v is None: + return "off" + + s = str(v).strip().lower() + aliases = { + "": "off", + "none": "off", + "false": "off", + "disabled": "off", + "disable": "off", + "off": "off", + "true": "external", + "on": "external", + "trigger": "external", + "triggered": "external", + "external": "external", + "follower": "follower", + "slave": "follower", + "master": "master", + "main": "master", + } + return aliases.get(s, s) + + @field_validator("timeout", mode="before") + @classmethod + def _coerce_timeout(cls, v): + if v in (None, ""): + return None + try: + fv = float(v) + except Exception: + return None + return fv if fv > 0 else None + + @field_validator("source", mode="before") + @classmethod + def _coerce_source(cls, v): + if v is None: + return "auto" + + s = str(v).strip() + if not s: + return "auto" + + aliases = { + "default": "auto", + "automatic": "auto", + "device": "auto", + "camera": "auto", + } + return aliases.get(s.lower(), s) + + @classmethod + def from_any(cls, value) -> CameraTriggerSettings: + if isinstance(value, cls): + return value + if isinstance(value, dict): + return cls(**value) + return cls() + + def to_properties(self) -> dict[str, Any]: + return self.model_dump(exclude_none=True) + class MultiCameraSettings(BaseModel): cameras: list[CameraSettings] = Field(default_factory=list) @@ -206,12 +339,19 @@ def from_dict(cls, data: dict[str, Any]) -> MultiCameraSettings: return cls(cameras=cameras, max_cameras=max_cameras, tile_layout=tile_layout) def to_dict(self) -> dict[str, Any]: + out = self.with_save_defaults() return { - "cameras": [cam.model_dump() for cam in self.cameras], - "max_cameras": self.max_cameras, - "tile_layout": self.tile_layout, + "cameras": [cam.model_dump() for cam in out.cameras], + "max_cameras": out.max_cameras, + "tile_layout": out.tile_layout, } + def with_save_defaults(self) -> MultiCameraSettings: + """Return a copy with save defaults applied to all cameras.""" + out = self.model_copy(deep=True) + out.cameras = [cam.with_save_defaults() for cam in out.cameras] + return out + class DynamicCropModel(BaseModel): enabled: bool = False @@ -377,10 +517,13 @@ def from_dict(cls, data: dict[str, Any]) -> ApplicationSettings: ) def to_dict(self) -> dict[str, Any]: + camera = self.camera.with_save_defaults() + multi_camera = self.multi_camera.with_save_defaults() + return { "version": self.version, - "camera": self.camera.model_dump(), - "multi_camera": self.multi_camera.to_dict(), + "camera": camera.model_dump(), + "multi_camera": multi_camera.to_dict(), "dlc": self.dlc.model_dump(), "recording": self.recording.model_dump(), "bbox": self.bbox.model_dump(), diff --git a/dlclivegui/gui/main_window.py b/dlclivegui/gui/main_window.py index 0e3beb0..c16964b 100644 --- a/dlclivegui/gui/main_window.py +++ b/dlclivegui/gui/main_window.py @@ -854,15 +854,23 @@ def _apply_config(self, config: ApplicationSettings) -> None: # Update recording path preview self._update_recording_path_preview() - def _current_config(self) -> ApplicationSettings: - # Get the first camera from multi-camera config for backward compatibility - active_cameras = self._config.multi_camera.get_active_cameras() - camera = active_cameras[0] if active_cameras else CameraSettings() + def _current_config(self, *, allow_empty_model_path=False) -> ApplicationSettings: + multi_camera = self._config.multi_camera + active_cameras = multi_camera.get_active_cameras() + camera = ( + active_cameras[0].model_copy(deep=True) + if active_cameras + else ( + multi_camera.cameras[0].model_copy(deep=True) + if multi_camera.cameras + else self._config.camera.model_copy(deep=True) + ) + ) return ApplicationSettings( camera=camera, - multi_camera=self._config.multi_camera, - dlc=self._dlc_settings_from_ui(), + multi_camera=multi_camera, + dlc=self._dlc_settings_from_ui(allow_empty_model_path=allow_empty_model_path), recording=self._recording_settings_from_ui(), bbox=self._bbox_settings_from_ui(), visualization=self._visualization_settings_from_ui(), @@ -874,14 +882,29 @@ def _parse_json(self, value: str) -> dict: return {} return json.loads(text) - def _dlc_settings_from_ui(self) -> DLCProcessorSettings: + def _dlc_settings_from_ui(self, *, allow_empty_model_path=False) -> DLCProcessorSettings: model_path = self.model_path_edit.text().strip() if Path(model_path).exists() and Path(model_path).suffix == ".pb": # IMPORTANT NOTE: DLClive expects a directory for TensorFlow models, # so if user selects a .pb file, we should pass the parent directory to DLCLive model_path = str(Path(model_path).parent) - if model_path == "": + + existing_dlc = ( # explicitly init from default if unset + self._config.dlc.model_copy(deep=True) + if getattr(self._config, "dlc", None) is not None + else DEFAULT_CONFIG.dlc.model_copy(deep=True) + ) + if not model_path: + if allow_empty_model_path: + # Preserve all existing DLC settings and only clear the model path. + return existing_dlc.model_copy( + update={ + "model_path": "", + } + ) + raise ValueError("Model path cannot be empty. Please enter a valid path to a DLCLive model file.") + try: model_bknd = DLCLiveProcessor.get_model_backend(model_path) except Exception as e: @@ -890,15 +913,13 @@ def _dlc_settings_from_ui(self) -> DLCProcessorSettings: "Please ensure the model file is valid and has an appropriate extension " "(.pt, .pth for PyTorch or model directory for TensorFlow)." ) from e - return DLCProcessorSettings( - model_path=model_path, - model_directory=self._config.dlc.model_directory, # Preserve from config - device=self._config.dlc.device, # Preserve from config - dynamic=self._config.dlc.dynamic, # Preserve from config - resize=self._config.dlc.resize, # Preserve from config - precision=self._config.dlc.precision, # Preserve from config - model_type=model_bknd, - # additional_options=self._parse_json(self.additional_options_edit.toPlainText()), + + # Preserve all unchanged DLC settings and only update values derived from the UI. + return existing_dlc.model_copy( + update={ + "model_path": model_path, + "model_type": model_bknd, + } ) def _recording_settings_from_ui(self) -> RecordingSettings: @@ -965,7 +986,7 @@ def _action_save_config_as(self) -> None: def _save_config_to_path(self, path: Path) -> None: try: - config = self._current_config() + config = self._current_config(allow_empty_model_path=True) config.save(path) self._settings_store.set_last_config_path(str(path)) self._settings_store.save_full_config_snapshot(config) @@ -1265,8 +1286,10 @@ def _refresh_dlc_camera_list_running(self) -> None: """Populate the inference camera dropdown from currently running cameras.""" self.dlc_camera_combo.blockSignals(True) self.dlc_camera_combo.clear() - for cam_id in sorted(self._running_cams_ids): - self.dlc_camera_combo.addItem(self._label_for_cam_id(cam_id), cam_id) + for cam in self._config.multi_camera.get_active_cameras(): + cam_id = get_camera_id(cam) + if cam_id in self._running_cams_ids: + self.dlc_camera_combo.addItem(self._label_for_cam_id(cam_id), cam_id) # Keep current selection if still present, else select first running if self._inference_camera_id in self._running_cams_ids: @@ -1368,7 +1391,7 @@ def _on_multi_frame_ready(self, frame_data: MultiFrameData) -> None: # Determine DLC camera (first active camera) selected_id = self._inference_camera_id - available_ids = sorted(frame_data.frames.keys()) + available_ids = list(frame_data.frames.keys()) if selected_id in frame_data.frames: dlc_cam_id = selected_id else: @@ -1611,7 +1634,7 @@ def _stop_preview(self) -> None: def _configure_dlc(self) -> bool: try: settings = self._dlc_settings_from_ui() - except (ValueError, json.JSONDecodeError) as exc: + except (ValueError, RuntimeError, json.JSONDecodeError) as exc: self._show_error(f"Invalid DLCLive settings: {exc}") return False if not settings.model_path: diff --git a/dlclivegui/main.py b/dlclivegui/main.py index eace8bd..9e38a08 100644 --- a/dlclivegui/main.py +++ b/dlclivegui/main.py @@ -27,32 +27,70 @@ def _maybe_allow_keyboard_interrupt(app: QApplication) -> None: """ - Gracefully handle Ctrl+C (SIGINT) by closing the main window and quitting Qt. + Gracefully handle Ctrl+C/SIGTERM by closing the main window and quitting Qt. + + Notes: + - The small timer keeps Python signal handling responsive while Qt owns the event loop. + - First Ctrl+C tries graceful cleanup via closeEvent(). + - Second Ctrl+C exits immediately with code 130. """ + quitting = {"requested": False} def _request_quit() -> None: + if quitting["requested"]: + return + + quitting["requested"] = True logging.info("Keyboard interrupt received, closing application...") + win = getattr(app, "_main_window", None) + if win is not None: - # Trigger your existing closeEvent cleanup (camera stop, threads, timers, etc.) - win.close() - else: - app.quit() + try: + # Trigger existing closeEvent cleanup: + # camera stop, controller shutdown, timers, DLC shutdown, etc. + win.close() + except Exception: + logging.exception("Error while closing main window after Ctrl+C") + + # Explicitly ask Qt to leave app.exec(). + # Do this even after win.close(), because closeEvent cleanup can be async + # and relying only on quitOnLastWindowClosed can be fragile. + QTimer.singleShot(0, app.quit) + + def _force_exit() -> None: + logging.warning("Second interrupt received, forcing process exit.") + os._exit(130) def _sigint_handler(_signum, _frame) -> None: + if quitting["requested"]: + _force_exit() QTimer.singleShot(0, _request_quit) signal.signal(signal.SIGINT, _sigint_handler) - # Keepalive timer to allow Python to handle signals while Qt is running. - sig_timer = QTimer() - sig_timer.setInterval(100) # 50–200ms typical; keep low overhead + # Ctrl+Break on Windows. + if hasattr(signal, "SIGBREAK"): + signal.signal(signal.SIGBREAK, _sigint_handler) + + # Useful when process is terminated from shells/process managers. + if hasattr(signal, "SIGTERM"): + signal.signal(signal.SIGTERM, _sigint_handler) + + # Parent the timer to app so Qt owns its lifetime. + sig_timer = QTimer(app) + sig_timer.setInterval(100) sig_timer.timeout.connect(lambda: None) sig_timer.start() - if hasattr(app, "_sig_timer"): - app._sig_timer.stop() # Stop any existing timer to avoid duplicates - app._sig_timer = sig_timer # Store on app to keep it alive and allow cleanup on exit + old_timer = getattr(app, "_sig_timer", None) + if old_timer is not None: + try: + old_timer.stop() + except Exception: + pass + + app._sig_timer = sig_timer def configure_logging(debug: bool = False) -> None: diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index ad1a863..cd13ae0 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -4,7 +4,6 @@ import copy import logging -import time from dataclasses import dataclass from threading import Event, Lock @@ -49,6 +48,7 @@ def __init__(self, camera_id: str, settings: CameraSettings): self._backend: CameraBackend | None = None self._max_consecutive_errors = 5 self._retry_delay = 0.1 + self._trigger_timeout_delay = 0.05 @Slot() def run(self) -> None: @@ -93,12 +93,40 @@ def run(self) -> None: self._camera_id, "Too many empty frames.\nWas the device disconnected ?" ) break - time.sleep(self._retry_delay) + if self._stop_event.wait(self._retry_delay): + break continue consecutive_errors = 0 self.frame_captured.emit(self._camera_id, frame, timestamp) + except TimeoutError as exc: + if self._stop_event.is_set(): + break + + # In hardware-trigger mode, a timeout usually means: + # "no trigger pulse arrived during this poll interval". + # This is expected and should not count as a camera failure. + if bool(getattr(self._backend, "waits_for_hardware_trigger", False)): + LOGGER.debug( + "[Worker %s] waiting for hardware trigger: %s", + self._camera_id, + exc, + ) + consecutive_errors = 0 + + if self._stop_event.wait(self._trigger_timeout_delay): + break # Stop event set during wait + continue + + consecutive_errors += 1 + if consecutive_errors >= self._max_consecutive_errors: + self.error_occurred.emit(self._camera_id, f"Camera read timeout: {exc}") + break + if self._stop_event.wait(self._retry_delay): + break + continue + except Exception as exc: consecutive_errors += 1 if self._stop_event.is_set(): @@ -106,7 +134,8 @@ def run(self) -> None: if consecutive_errors >= self._max_consecutive_errors: self.error_occurred.emit(self._camera_id, f"Camera read error: {exc}") break - time.sleep(self._retry_delay) + if self._stop_event.wait(self._retry_delay): + break continue # Cleanup @@ -142,6 +171,46 @@ def get_camera_id(settings: CameraSettings) -> str: return f"{backend}:index:{int(settings.index)}" +def _trigger_role_from_settings(settings: CameraSettings) -> str: + backend = (settings.backend or "").lower() + props = settings.properties if isinstance(settings.properties, dict) else {} + ns = props.get(backend, {}) if isinstance(props.get(backend), dict) else {} + + trigger = ns.get("trigger", {}) + if not isinstance(trigger, dict): + return "off" + + role = str(trigger.get("role", "off") or "off").strip().lower() + + # Match CameraTriggerSettings aliases enough for controller ordering. + if role in {"true", "on", "trigger", "triggered"}: + return "external" + if role in {"slave"}: + return "follower" + if role in {"main"}: + return "master" + if role in {"false", "none", "disabled", "disable", ""}: + return "off" + + return role + + +def _camera_start_priority(settings: CameraSettings) -> int: + """Start trigger-waiting cameras before trigger-generating cameras. + + Priority: + 0: external/follower cameras, which should be armed first + 1: normal/free-run cameras + 2: master cameras, which may generate trigger pulses + """ + role = _trigger_role_from_settings(settings) + if role in {"external", "follower"}: + return 0 + if role == "master": + return 2 + return 1 + + class MultiCameraController(QObject): """Controller for managing multiple cameras simultaneously.""" @@ -166,6 +235,7 @@ def __init__(self): self._frame_lock = Lock() self._running = False self._started_cameras: set = set() + self._camera_display_order: list[str] = [] self._failed_cameras: dict[str, str] = {} # camera_id -> error message self._expected_cameras: int = 0 # Number of cameras we're trying to start @@ -183,7 +253,17 @@ def start(self, camera_settings: list[CameraSettings]) -> None: LOGGER.warning("Multi-camera controller already running") return - active_settings = [s for s in camera_settings if s.enabled][: self.MAX_CAMERAS] + active_settings_user_order = [s for s in camera_settings if s.enabled][: self.MAX_CAMERAS] + if not active_settings_user_order: + LOGGER.warning("No active cameras to start") + return + + # Display/tile order follows the user-configured camera order. + self._camera_display_order = [get_camera_id(s) for s in active_settings_user_order] + + # Startup order may differ for trigger safety: + # followers/external first, master last. + active_settings = sorted(active_settings_user_order, key=_camera_start_priority) if not active_settings: LOGGER.warning("No active cameras to start") return @@ -275,6 +355,7 @@ def stop(self, wait: bool = True) -> None: self._settings.clear() self._started_cameras.clear() self._failed_cameras.clear() + self._camera_display_order.clear() self._expected_cameras = 0 self.all_stopped.emit() @@ -297,10 +378,27 @@ def _on_frame_captured(self, camera_id: str, frame: np.ndarray, timestamp: float # Emit frame data without tiling (tiling done in GUI for performance) if self._frames: + ordered_frames: dict[str, np.ndarray] = {} + ordered_timestamps: dict[str, float] = {} + + for cam_id in self._camera_display_order: + if cam_id in self._frames: + ordered_frames[cam_id] = self._frames[cam_id] + if cam_id in self._timestamps: + ordered_timestamps[cam_id] = self._timestamps[cam_id] + + # Any unexpected/legacy IDs, appended deterministically. + for cam_id in self._frames: + if cam_id not in ordered_frames: + ordered_frames[cam_id] = self._frames[cam_id] + for cam_id in self._timestamps: + if cam_id not in ordered_timestamps: + ordered_timestamps[cam_id] = self._timestamps[cam_id] + frame_data = MultiFrameData( - frames=dict(self._frames), - timestamps=dict(self._timestamps), - source_camera_id=camera_id, # Track which camera triggered this + frames=ordered_frames, + timestamps=ordered_timestamps, + source_camera_id=camera_id, tiled_frame=None, ) self.frame_ready.emit(frame_data) diff --git a/dlclivegui/utils/display.py b/dlclivegui/utils/display.py index 0eac657..01b0be0 100644 --- a/dlclivegui/utils/display.py +++ b/dlclivegui/utils/display.py @@ -38,10 +38,10 @@ def compute_tiling_geometry( """Compute consistent tiling geometry for both tiling and overlay transforms. Returns: - (sorted_cam_ids, rows, cols, tile_w, tile_h) + (cam_ids, rows, cols, tile_w, tile_h) Notes: - - We intentionally base tile aspect on the first frame in sorted_cam_ids, + - We intentionally base tile aspect on the first frame in cam_ids, because create_tiled_frame uses the same ordering. This guarantees that compute_tile_info() and create_tiled_frame() agree on tile_w/tile_h. - If frames have different aspect ratios, they will be resized (possibly distorted) @@ -50,7 +50,7 @@ def compute_tiling_geometry( if not frames: return ([], 1, 1, 640, 480) - cam_ids = sorted(frames.keys()) + cam_ids = list(frames.keys()) frames_list = [frames[cid] for cid in cam_ids] num_frames = len(frames_list) @@ -63,7 +63,7 @@ def compute_tiling_geometry( max_w, max_h = max_canvas - # Reference aspect is based on the first frame in sorted order (matches tiler). + # Reference aspect is based on the first frame in display order (matches tiler). h0, w0 = frames_list[0].shape[:2] frame_aspect = (w0 / h0) if h0 > 0 else 1.0 @@ -135,7 +135,7 @@ def compute_tile_info( Critical robustness fix: - Tile dimensions are computed from the same reference used by create_tiled_frame() - (first frame in sorted order), so offsets/scales match the actual tiling. + (first frame in display order), so offsets/scales match the actual tiling. """ if not frames: return (0, 0), (1.0, 1.0) diff --git a/tests/cameras/backends/conftest.py b/tests/cameras/backends/conftest.py index 9459c35..f0a0b6b 100644 --- a/tests/cameras/backends/conftest.py +++ b/tests/cameras/backends/conftest.py @@ -599,6 +599,18 @@ def __init__( self.GainAuto = _FakeNode("Off") self.Gain = _FakeNode(float(gain)) + # Trigger input nodes + self.AcquisitionMode = _FakeNode("Continuous", symbolics=["Continuous", "SingleFrame"]) + self.TriggerSelector = _FakeNode("FrameStart", symbolics=["FrameStart"]) + self.TriggerMode = _FakeNode("Off", symbolics=["Off", "On"]) + self.TriggerSource = _FakeNode("Line1", symbolics=["Line0", "Line1", "Software"]) + self.TriggerActivation = _FakeNode("RisingEdge", symbolics=["RisingEdge", "FallingEdge"]) + + # GPIO output nodes for master/follower setups + self.LineSelector = _FakeNode("Line0", symbolics=["Line0", "Line1", "Line2"]) + self.LineMode = _FakeNode("Input", symbolics=["Input", "Output"]) + self.LineSource = _FakeNode("Off", symbolics=["Off", "ExposureActive", "AcquisitionActive"]) + class _FakeRemoteDevice: def __init__(self, node_map: _FakeNodeMap): diff --git a/tests/cameras/backends/test_gentl_trigger.py b/tests/cameras/backends/test_gentl_trigger.py new file mode 100644 index 0000000..57339a1 --- /dev/null +++ b/tests/cameras/backends/test_gentl_trigger.py @@ -0,0 +1,460 @@ +# tests/cameras/backends/test_gentl_trigger.py +from __future__ import annotations + +import pytest + +# --------------------------------------------------------------------- +# GenTL hardware trigger configuration +# --------------------------------------------------------------------- + + +def _gentl_trigger_settings(gentl_settings_factory, trigger: dict, **kwargs): + """Build CameraSettings with a GenTL trigger block.""" + return gentl_settings_factory(properties={"gentl": {"trigger": trigger}}, **kwargs) + + +def test_gentl_capabilities_advertise_hardware_trigger_best_effort(patch_gentl_sdk): + gb = patch_gentl_sdk + + caps = gb.GenTLCameraBackend.static_capabilities() + + assert caps.get("hardware_trigger") == gb.SupportLevel.BEST_EFFORT + + +def test_trigger_default_off_configures_trigger_mode_off(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = gentl_settings_factory() + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + assert nm.TriggerMode.value == "Off" + + ns = settings.properties.get("gentl", {}) + assert ns.get("trigger_actual", {}).get("role") == "off" + + be.close() + + +def test_trigger_explicit_off_configures_trigger_mode_off(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings(gentl_settings_factory, {"role": "off"}) + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + assert nm.TriggerMode.value == "Off" + assert settings.properties["gentl"]["trigger_actual"]["role"] == "off" + + be.close() + + +def test_trigger_external_configures_input_line_and_timeout(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "external", + "selector": "FrameStart", + "source": "Line0", + "activation": "RisingEdge", + "timeout": gb.GenTLCameraBackend._MAX_HARDWARE_TRIGGER_FETCH_TIMEOUT, + }, + ) + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + assert nm.TriggerSelector.value == "FrameStart" + assert nm.TriggerSource.value == "Line0" + assert nm.TriggerActivation.value == "RisingEdge" + assert nm.TriggerMode.value == "On" + assert be.waits_for_hardware_trigger is True + assert be._timeout == pytest.approx(gb.GenTLCameraBackend._MAX_HARDWARE_TRIGGER_FETCH_TIMEOUT) + + ns = settings.properties["gentl"] + assert ns["trigger_actual"]["role"] == "external" + assert ns["trigger_actual"]["source"] == "Line0" + + be.close() + + +def test_trigger_follower_configures_input_line(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "follower", + "selector": "FrameStart", + "source": "Line1", + "activation": "FallingEdge", + }, + ) + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + assert nm.TriggerSelector.value == "FrameStart" + assert nm.TriggerSource.value == "Line1" + assert nm.TriggerActivation.value == "FallingEdge" + assert nm.TriggerMode.value == "On" + + ns = settings.properties["gentl"] + assert ns["trigger_actual"]["role"] == "follower" + + be.close() + + +def test_trigger_master_configures_output_line_and_keeps_trigger_off(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "master", + "output_line": "Line2", + "output_source": "ExposureActive", + }, + ) + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + assert nm.TriggerMode.value == "Off" + assert nm.LineSelector.value == "Line2" + assert nm.LineMode.value == "Output" + assert nm.LineSource.value == "ExposureActive" + + ns = settings.properties["gentl"] + assert ns["trigger_actual"]["role"] == "master" + assert ns["trigger_actual"]["output_line"] == "Line2" + + be.close() + + +def test_trigger_invalid_source_non_strict_disables_trigger(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "external", + "source": "LineDoesNotExist", + "strict": False, + }, + ) + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + # Source was unsupported, so the fake node should retain its default. + assert nm.TriggerSource.value == "Line1" + + # Safety behavior: do not arm TriggerMode on the previous/default source. + assert nm.TriggerMode.value == "Off" + + # Controller should not treat timeouts as expected trigger waits. + assert be.waits_for_hardware_trigger is False + + # trigger_actual is persisted after _configure_trigger(); since we reset + # self._trigger to off, the effective trigger state is off. + actual = settings.properties["gentl"]["trigger_actual"] + assert actual["role"] == "off" + + be.close() + + +def test_trigger_invalid_source_strict_raises_and_cleans_up(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "external", + "source": "LineDoesNotExist", + "strict": True, + }, + ) + be = gb.GenTLCameraBackend(settings) + + with pytest.raises(RuntimeError): + be.open() + + assert be._harvester is None + assert be._shared_entry is None + assert be._acquirer is None + + +def test_trigger_invalid_master_output_source_non_strict_does_not_crash(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "master", + "output_line": "Line2", + "output_source": "NotARealLineSource", + "strict": False, + }, + ) + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + assert nm.TriggerMode.value == "Off" + assert nm.LineSelector.value == "Line2" + assert nm.LineMode.value == "Output" + # Unsupported source should not be applied in non-strict mode. + assert nm.LineSource.value == "Off" + + be.close() + + +def test_trigger_invalid_master_output_source_strict_raises_and_cleans_up(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "master", + "output_line": "Line2", + "output_source": "NotARealLineSource", + "strict": True, + }, + ) + be = gb.GenTLCameraBackend(settings) + + with pytest.raises(RuntimeError): + be.open() + + assert be._harvester is None + assert be._shared_entry is None + assert be._acquirer is None + + +def test_trigger_alias_on_maps_to_external(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "on", + "source": "Line1", + }, + ) + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + assert nm.TriggerMode.value == "On" + assert nm.TriggerSource.value == "Line1" + assert settings.properties["gentl"]["trigger_actual"]["role"] == "external" + + be.close() + + +def test_trigger_timeout_is_capped_for_hardware_trigger_fetch_polling( + patch_gentl_sdk, + gentl_settings_factory, +): + gb = patch_gentl_sdk + expected_fetch_timeout = gb.GenTLCameraBackend._MAX_HARDWARE_TRIGGER_FETCH_TIMEOUT + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "external", + "timeout": 7.5, + }, + ) + be = gb.GenTLCameraBackend(settings) + + try: + be.open() + + # Hardware-trigger fetch calls are intentionally capped so stop(wait=True) + # is not blocked by a long user trigger timeout. + assert be._timeout == pytest.approx(expected_fetch_timeout) + + # Fake acquisition is started, so read should pass and record the capped timeout. + frame, _ = be.read() + assert frame is not None + assert be._acquirer.fetch_calls[-1] == pytest.approx(expected_fetch_timeout) + + # The requested trigger timeout is still preserved in persisted trigger_actual. + actual = settings.properties["gentl"]["trigger_actual"] + assert actual["timeout"] == pytest.approx(7.5) + + finally: + be.close() + + +def test_trigger_timeout_error_mentions_hardware_trigger_when_waiting( + patch_gentl_sdk, + gentl_settings_factory, +): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "external", + "timeout": 3.0, + }, + ) + # fast_start keeps acquisition stopped; fake fetch then raises timeout. + # This lets us assert the backend timeout message without hardware. + settings.properties["gentl"]["fast_start"] = True + + be = gb.GenTLCameraBackend(settings) + + try: + be.open() + + assert be._timeout == pytest.approx(gb.GenTLCameraBackend._MAX_HARDWARE_TRIGGER_FETCH_TIMEOUT) + + with pytest.raises(TimeoutError) as ei: + be.read() + + msg = str(ei.value).lower() + assert "gentl timeout" in msg + assert "hardware trigger" in msg or "trigger" in msg + + finally: + be.close() + + +def test_trigger_actual_is_persisted_for_debugging(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "follower", + "source": "Line1", + "activation": "FallingEdge", + "timeout": 9.0, + "strict": False, + }, + ) + be = gb.GenTLCameraBackend(settings) + + try: + be.open() + + # Requested timeout remains in trigger_actual for debugging/config visibility. + actual = settings.properties["gentl"].get("trigger_actual") + assert isinstance(actual, dict) + assert actual["role"] == "follower" + assert actual["source"] == "Line1" + assert actual["activation"] == "FallingEdge" + assert actual["timeout"] == pytest.approx(9.0) + + # But each blocking Harvester.fetch() call is capped for responsive shutdown. + assert be._timeout == pytest.approx(gb.GenTLCameraBackend._MAX_HARDWARE_TRIGGER_FETCH_TIMEOUT) + + finally: + be.close() + + +def test_trigger_invalid_selector_non_strict_disables_trigger(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "external", + "selector": "NotARealSelector", + "source": "Line1", + "strict": False, + }, + ) + be = gb.GenTLCameraBackend(settings) + + be.open() + nm = be._acquirer.remote_device.node_map + + # Selector was unsupported, so the fake node should retain its default. + assert nm.TriggerSelector.value == "FrameStart" + + # Source may have been applied, but trigger must not be armed because + # the required selector routing failed. + assert nm.TriggerSource.value == "Line1" + assert nm.TriggerMode.value == "Off" + assert be.waits_for_hardware_trigger is False + + actual = settings.properties["gentl"]["trigger_actual"] + assert actual["role"] == "off" + + be.close() + + +def test_trigger_timeout_not_capped_for_master_mode(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + settings = _gentl_trigger_settings( + gentl_settings_factory, + { + "role": "master", + "timeout": 7.5, + }, + ) + be = gb.GenTLCameraBackend(settings) + + try: + be.open() + + # Master is free-running / trigger-generating, not waiting for hardware input. + assert be.waits_for_hardware_trigger is False + assert be._timeout == pytest.approx(7.5) + + finally: + be.close() + + +def test_resolve_trigger_source_auto_selects_supported_line( + patch_gentl_sdk, + gentl_settings_factory, +): + gb = patch_gentl_sdk + be = gb.GenTLCameraBackend(gentl_settings_factory()) + + class Node: + symbolics = ["Line1", "Software", "Any"] + + class NodeMap: + TriggerSource = Node() + + source, ok = be._resolve_trigger_source(NodeMap(), "auto", strict=False) + + assert ok is True + assert source == "Line1" + + +def test_resolve_trigger_source_strict_raises_for_unsupported_explicit_line( + patch_gentl_sdk, + gentl_settings_factory, +): + gb = patch_gentl_sdk + be = gb.GenTLCameraBackend(gentl_settings_factory()) + + class Node: + symbolics = ["Line1", "Software", "Any"] + + class NodeMap: + TriggerSource = Node() + + with pytest.raises(RuntimeError, match="TriggerSource.*Line0"): + be._resolve_trigger_source(NodeMap(), "Line0", strict=True) diff --git a/tests/gui/test_app_entrypoint.py b/tests/gui/test_app_entrypoint.py index 0a68bb2..b6b1c60 100644 --- a/tests/gui/test_app_entrypoint.py +++ b/tests/gui/test_app_entrypoint.py @@ -31,6 +31,7 @@ def set_use_splash_false(monkeypatch): @pytest.mark.gui def test_main_with_splash(monkeypatch, set_use_splash_true): appmod = _import_fresh() + monkeypatch.setattr(appmod, "_maybe_allow_keyboard_interrupt", MagicMock(name="_maybe_allow_keyboard_interrupt")) # --- Patch Qt app & icon in the entry module's namespace --- QApplication_cls = MagicMock(name="QApplication") @@ -101,6 +102,7 @@ def immediate_single_shot(ms, fn): @pytest.mark.gui def test_main_without_splash(monkeypatch, set_use_splash_false): appmod = _import_fresh() + monkeypatch.setattr(appmod, "_maybe_allow_keyboard_interrupt", MagicMock(name="_maybe_allow_keyboard_interrupt")) # Patch Qt app creation & window icon QApplication_cls = MagicMock(name="QApplication") diff --git a/tests/services/test_multicam_controller.py b/tests/services/test_multicam_controller.py index 0893bd9..d41befe 100644 --- a/tests/services/test_multicam_controller.py +++ b/tests/services/test_multicam_controller.py @@ -5,7 +5,13 @@ # from dlclivegui.config import CameraSettings from dlclivegui.config import CameraSettings -from dlclivegui.services.multi_camera_controller import MultiCameraController, get_display_id +from dlclivegui.services.multi_camera_controller import ( + MultiCameraController, + _camera_start_priority, + _trigger_role_from_settings, + get_camera_id, + get_display_id, +) @pytest.mark.unit @@ -95,3 +101,297 @@ def _create(_settings): # Expect initialization_failed with the camera id with qtbot.waitSignals([mc.initialization_failed, mc.all_stopped], timeout=2000) as _: mc.start([cam]) + + +@pytest.mark.unit +def test_get_camera_id_prefers_stable_device_id(): + cam = CameraSettings( + name="GenTL Cam", + backend="gentl", + index=0, + properties={ + "gentl": { + "device_id": "serial:30220469", + "serial_number": "30220469", + } + }, + ).apply_defaults() + + assert get_camera_id(cam) == "gentl:serial:30220469" + + +@pytest.mark.unit +def test_get_camera_id_falls_back_to_index_without_stable_identity(): + cam = CameraSettings( + name="Cam", + backend="opencv", + index=2, + ).apply_defaults() + + assert get_camera_id(cam) == "opencv:index:2" + + +@pytest.mark.unit +@pytest.mark.parametrize( + ("role", "expected"), + [ + ("off", "off"), + ("disabled", "off"), + ("on", "external"), + ("triggered", "external"), + ("external", "external"), + ("follower", "follower"), + ("slave", "follower"), + ("master", "master"), + ("main", "master"), + ], +) +def test_trigger_role_from_settings_aliases(role, expected): + cam = CameraSettings( + name="C", + backend="gentl", + index=0, + properties={ + "gentl": { + "trigger": { + "role": role, + } + } + }, + ).apply_defaults() + + assert _trigger_role_from_settings(cam) == expected + + +@pytest.mark.unit +def test_camera_start_priority_orders_trigger_roles(): + external = CameraSettings( + name="External", + backend="gentl", + index=0, + properties={"gentl": {"trigger": {"role": "external"}}}, + ).apply_defaults() + + normal = CameraSettings( + name="Normal", + backend="gentl", + index=1, + properties={"gentl": {"trigger": {"role": "off"}}}, + ).apply_defaults() + + master = CameraSettings( + name="Master", + backend="gentl", + index=2, + properties={"gentl": {"trigger": {"role": "master"}}}, + ).apply_defaults() + + assert _camera_start_priority(external) == 0 + assert _camera_start_priority(normal) == 1 + assert _camera_start_priority(master) == 2 + + +@pytest.mark.unit +def test_start_preserves_user_display_order_even_when_trigger_start_order_differs(qtbot, patch_factory): + mc = MultiCameraController() + + # User wants master first in tiled view, follower second. + # Startup order should still be follower first internally. + master = CameraSettings( + name="Master", + backend="opencv", + index=0, + enabled=True, + properties={ + "opencv": { + "device_id": "master-cam", + "trigger": {"role": "master"}, + } + }, + ).apply_defaults() + + follower = CameraSettings( + name="Follower", + backend="opencv", + index=1, + enabled=True, + properties={ + "opencv": { + "device_id": "follower-cam", + "trigger": {"role": "follower"}, + } + }, + ).apply_defaults() + + expected_display_order = [get_camera_id(master), get_camera_id(follower)] + + try: + with qtbot.waitSignal(mc.all_started, timeout=1500): + mc.start([master, follower]) + + assert mc._camera_display_order == expected_display_order + + finally: + with qtbot.waitSignal(mc.all_stopped, timeout=2000): + mc.stop(wait=True) + + +@pytest.mark.unit +def test_frame_ready_emits_frames_in_user_configured_order(qtbot, patch_factory): + mc = MultiCameraController() + + cam_a = CameraSettings( + name="A", + backend="opencv", + index=0, + enabled=True, + properties={"opencv": {"device_id": "cam-a"}}, + ).apply_defaults() + + cam_b = CameraSettings( + name="B", + backend="opencv", + index=1, + enabled=True, + properties={"opencv": {"device_id": "cam-b"}}, + ).apply_defaults() + + expected_order = [get_display_id(cam_a), get_display_id(cam_b)] + seen_orders: list[list[str]] = [] + + def on_ready(mfd): + if len(mfd.frames) >= 2: + seen_orders.append(list(mfd.frames.keys())) + + mc.frame_ready.connect(on_ready) + + try: + with qtbot.waitSignal(mc.all_started, timeout=1500): + mc.start([cam_a, cam_b]) + + qtbot.waitUntil(lambda: bool(seen_orders), timeout=2500) + + assert seen_orders[-1] == expected_order + + finally: + with qtbot.waitSignal(mc.all_stopped, timeout=2000): + mc.stop(wait=True) + + +@pytest.mark.unit +def test_display_order_is_cleared_on_stop(qtbot, patch_factory): + mc = MultiCameraController() + + cam = CameraSettings( + name="C", + backend="opencv", + index=0, + enabled=True, + properties={"opencv": {"device_id": "cam-0"}}, + ).apply_defaults() + + try: + with qtbot.waitSignal(mc.all_started, timeout=1500): + mc.start([cam]) + + assert mc._camera_display_order == [get_camera_id(cam)] + + finally: + with qtbot.waitSignal(mc.all_stopped, timeout=2000): + mc.stop(wait=True) + + assert mc._camera_display_order == [] + + +@pytest.mark.unit +def test_hardware_trigger_timeouts_are_not_fatal(qtbot, monkeypatch): + class WaitingTriggerBackend: + waits_for_hardware_trigger = True + + def __init__(self, settings): + self.settings = settings + self.opened = False + self.closed = False + + def open(self): + self.opened = True + + def read(self): + raise TimeoutError("waiting for hardware trigger") + + def close(self): + self.closed = True + + def _create(settings): + return WaitingTriggerBackend(settings) + + monkeypatch.setattr(CameraFactory, "create", staticmethod(_create)) + + mc = MultiCameraController() + cam = CameraSettings( + name="Triggered", + backend="gentl", + index=0, + enabled=True, + properties={ + "gentl": { + "device_id": "serial:30220469", + "trigger": {"role": "external", "timeout": 0.1}, + } + }, + ).apply_defaults() + + errors: list[tuple[str, str]] = [] + mc.camera_error.connect(lambda cam_id, msg: errors.append((cam_id, msg))) + + try: + with qtbot.waitSignal(mc.all_started, timeout=1500): + mc.start([cam]) + + # Let several timeout cycles happen. + qtbot.wait(500) + + assert mc.is_running() + assert errors == [] + + finally: + with qtbot.waitSignal(mc.all_stopped, timeout=2000): + mc.stop(wait=True) + + +@pytest.mark.unit +def test_non_trigger_timeouts_are_fatal_after_retries(qtbot, monkeypatch): + class TimeoutBackend: + waits_for_hardware_trigger = False + + def __init__(self, settings): + self.settings = settings + + def open(self): + pass + + def read(self): + raise TimeoutError("camera timeout") + + def close(self): + pass + + def _create(settings): + return TimeoutBackend(settings) + + monkeypatch.setattr(CameraFactory, "create", staticmethod(_create)) + + mc = MultiCameraController() + cam = CameraSettings(name="TimeoutCam", backend="opencv", index=0, enabled=True).apply_defaults() + + with qtbot.waitSignal(mc.camera_error, timeout=3000) as blocker: + mc.start([cam]) + + cam_id, msg = blocker.args + assert cam_id == get_display_id(cam) + assert "Camera read timeout" in msg + + # Cleanup if still running. + if mc.is_running(): + with qtbot.waitSignal(mc.all_stopped, timeout=2000): + mc.stop(wait=True) diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..9f82017 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,43 @@ +import pytest + +from dlclivegui.config import ApplicationSettings, CameraSettings, CameraTriggerSettings, MultiCameraSettings + + +@pytest.mark.unit +def test_save_applies_gentl_trigger_defaults_to_top_level_camera(): + cam = CameraSettings( + backend="gentl", + properties={"gentl": {}}, + ) + + settings = ApplicationSettings( + camera=cam, + multi_camera=MultiCameraSettings(cameras=[cam]), + ) + + data = settings.to_dict() + + assert "trigger" in data["camera"]["properties"]["gentl"] + + +@pytest.mark.unit +def test_save_applies_gentl_trigger_defaults_to_multi_camera(): + cam = CameraSettings( + backend="gentl", + properties={"gentl": {}}, + ) + + settings = ApplicationSettings( + multi_camera=MultiCameraSettings(cameras=[cam]), + ) + + data = settings.to_dict() + + assert "trigger" in data["multi_camera"]["cameras"][0]["properties"]["gentl"] + + +@pytest.mark.unit +def test_trigger_source_defaults_to_auto(): + trigger = CameraTriggerSettings() + + assert trigger.source == "auto" diff --git a/tests/utils/test_display.py b/tests/utils/test_display.py index 9ce8d49..559aa15 100644 --- a/tests/utils/test_display.py +++ b/tests/utils/test_display.py @@ -41,7 +41,9 @@ def test_compute_tiling_geometry_single_frame_respects_max_canvas_and_min_tile() def test_compute_tiling_geometry_two_frames_is_1x2(): frames = {"camB": _frame(480, 640, 3), "camA": _frame(480, 640, 3)} cam_ids, rows, cols, tile_w, tile_h = compute_tiling_geometry(frames, max_canvas=(1200, 800)) - assert cam_ids == ["camA", "camB"] # sorted + + # Preserve insertion/display order, do not sort by camera ID. + assert cam_ids == ["camB", "camA"] assert (rows, cols) == (1, 2) assert tile_w >= 160 and tile_h >= 120 @@ -49,25 +51,106 @@ def test_compute_tiling_geometry_two_frames_is_1x2(): def test_compute_tiling_geometry_three_frames_is_2x2(): frames = {"c3": _frame(480, 640, 3), "c1": _frame(480, 640, 3), "c2": _frame(480, 640, 3)} cam_ids, rows, cols, tile_w, tile_h = compute_tiling_geometry(frames, max_canvas=(1200, 800)) - assert cam_ids == ["c1", "c2", "c3"] + + # Preserve insertion/display order. + assert cam_ids == ["c3", "c1", "c2"] assert (rows, cols) == (2, 2) assert tile_w >= 160 and tile_h >= 120 -def test_compute_tiling_geometry_reference_aspect_is_first_sorted_cam(): - # camA has aspect 2.0 (w/h), camB has aspect 0.5 +def test_compute_tiling_geometry_reference_aspect_is_first_display_order_cam(): + # camB is first in insertion/display order and has aspect 0.5. + # camA has aspect 2.0. frames = { - "camB": _frame(400, 200, 3), - "camA": _frame(200, 400, 3), + "camB": _frame(400, 200, 3), # aspect = 200 / 400 = 0.5 + "camA": _frame(200, 400, 3), # aspect = 400 / 200 = 2.0 } + cam_ids, rows, cols, tile_w, tile_h = compute_tiling_geometry(frames, max_canvas=(1200, 800)) - assert cam_ids == ["camA", "camB"] + + assert cam_ids == ["camB", "camA"] # For 2 cams, rows=1 cols=2 => initial tile_w=600 tile_h=800 => tile_aspect=0.75 - # frame_aspect for camA = 400/200 = 2.0 > 0.75 => tile_h adjusted to tile_w/frame_aspect = 600/2 = 300 + # frame_aspect for camB = 0.5 <= 0.75 => tile_w adjusted to tile_h * frame_aspect = 800 * 0.5 = 400 + assert (rows, cols) == (1, 2) + assert tile_w == 400 + assert tile_h == 800 + + +def test_compute_tiling_geometry_preserves_frame_insertion_order(): + frames = { + "gentl:serial:30220469": np.zeros((10, 20, 3), dtype=np.uint8), + "gentl:serial:10620051": np.zeros((10, 20, 3), dtype=np.uint8), + } + + cam_ids, rows, cols, tile_w, tile_h = compute_tiling_geometry(frames) + + assert cam_ids == ["gentl:serial:30220469", "gentl:serial:10620051"] + assert rows == 1 + assert cols == 2 + assert tile_w > 0 + assert tile_h > 0 + + +def test_compute_tiling_geometry_preserves_reversed_insertion_order(): + frames = { + "gentl:serial:10620051": np.zeros((10, 20, 3), dtype=np.uint8), + "gentl:serial:30220469": np.zeros((10, 20, 3), dtype=np.uint8), + } + + cam_ids, *_ = compute_tiling_geometry(frames) + + assert cam_ids == ["gentl:serial:10620051", "gentl:serial:30220469"] + + +def test_compute_tile_info_uses_display_order_for_offsets(): + cam0 = "gentl:serial:30220469" + cam1 = "gentl:serial:10620051" + + frames = { + cam0: np.zeros((100, 200, 3), dtype=np.uint8), + cam1: np.zeros((100, 200, 3), dtype=np.uint8), + } + + cam_ids, rows, cols, tile_w, tile_h = compute_tiling_geometry(frames) + + offset0, scale0 = compute_tile_info(cam0, frames[cam0], frames) + offset1, scale1 = compute_tile_info(cam1, frames[cam1], frames) + + assert cam_ids == [cam0, cam1] + assert offset0 == (0, 0) + assert offset1 == (tile_w, 0) + assert scale0[0] > 0 + assert scale0[1] > 0 + assert scale1[0] > 0 + assert scale1[1] > 0 + + +def test_create_tiled_frame_preserves_display_order_by_tile_content(): + # First frame is blue-ish, second is red-ish. + first = np.zeros((100, 100, 3), dtype=np.uint8) + first[:, :] = (255, 0, 0) # BGR blue + + second = np.zeros((100, 100, 3), dtype=np.uint8) + second[:, :] = (0, 0, 255) # BGR red + + frames = { + "gentl:serial:30220469": first, + "gentl:serial:10620051": second, + } + + cam_ids, rows, cols, tile_w, tile_h = compute_tiling_geometry(frames, max_canvas=(400, 200)) + out = create_tiled_frame(frames, max_canvas=(400, 200)) + + assert cam_ids == ["gentl:serial:30220469", "gentl:serial:10620051"] assert (rows, cols) == (1, 2) - assert tile_w == 600 - assert tile_h == 300 + + # Sample away from text label area. + left_sample = out[tile_h // 2, tile_w // 2] + right_sample = out[tile_h // 2, tile_w + tile_w // 2] + + assert left_sample[0] > left_sample[2] # blue tile first + assert right_sample[2] > right_sample[0] # red tile second def test_create_tiled_frame_empty_returns_default_canvas(): @@ -110,16 +193,18 @@ def test_create_tiled_frame_canvas_shape_matches_geometry(): def test_compute_tile_info_offset_and_scale_matches_tiling(): - # 2 frames => 1x2 tiling, cam ids sorted: ["cam1", "cam2"] + # 2 frames => 1x2 tiling, preserving insertion/display order: ["cam2", "cam1"] frames = {"cam2": _frame(200, 400, 3), "cam1": _frame(200, 400, 3)} cam_ids, rows, cols, tile_w, tile_h = compute_tiling_geometry(frames, max_canvas=(1200, 800)) original = _frame(200, 400, 3) (ox, oy), (sx, sy) = compute_tile_info("cam2", original, frames, max_canvas=(1200, 800)) - # cam2 is index 1 -> row 0 col 1 + assert cam_ids == ["cam2", "cam1"] assert (rows, cols) == (1, 2) - assert ox == tile_w + + # cam2 is first in display order => row 0 col 0 + assert ox == 0 assert oy == 0 assert sx == pytest.approx(tile_w / 400) assert sy == pytest.approx(tile_h / 200)