Source code for pyccapt.control.devices.camera

import threading
import time

import cv2
import numpy as np
from PyQt6.QtCore import QObject, pyqtSlot, pyqtSignal

try:
    from pypylon import pylon
except Exception as exc:  # pragma: no cover - depends on local Basler runtime
    pylon = None
    _PYPYLON_IMPORT_ERROR = exc
else:
    _PYPYLON_IMPORT_ERROR = None


[docs] def check_camera_backend() -> tuple[bool, str]: """Return whether the Basler camera backend is usable on this host. A True result means we can enumerate Basler cameras at runtime. The actual *number* of connected cameras is allowed to fluctuate while the GUI is running, so the worker handles that dynamically; this check is only about whether pypylon itself is available. """ if pylon is None: return False, f"Camera backend is unavailable ({_PYPYLON_IMPORT_ERROR})" try: devices = pylon.TlFactory.GetInstance().EnumerateDevices() except Exception as exc: # pragma: no cover - backend specific return False, f"Unable to enumerate cameras ({exc})" count = len(devices) if count == 0: return True, "No Basler cameras detected; GUI will retry while running." plural = "s" if count != 1 else "" return True, f"Detected {count} Basler camera{plural}."
[docs] def check_camera_availability(required_cameras: int = 1) -> tuple[bool, str]: """Compatibility wrapper. Older call sites used this to gate whether the camera GUI could open at all. The GUI now always opens when the backend is loaded and the worker handles missing/disconnected cameras dynamically, so this just forwards to :func:`check_camera_backend`. """ del required_cameras # unused; kept for backward compatibility return check_camera_backend()
[docs] class CameraWorker(QObject): """Drives Basler camera capture with hot-reconnect support. The worker keeps a fixed number of slots (one per logical camera in the GUI) and binds each slot to a physical camera by serial number the first time it sees one. If a camera disappears, its slot is freed and the worker keeps trying to (re)attach it. Other slots keep streaming. """ finished = pyqtSignal() SLOT_COUNT = 2 RECONNECT_INTERVAL = 3.0 def __init__(self, variables, emitter): super().__init__() self.flag_default_exposure_time = None # Cameras start with ExposureAuto = Continuous so the user gets a # usable image even before they touch the exposure inputs. The # GUI's auto-exposure button toggles this back to manual ('Off'). self.exposure_auto = True self.exposure_mode = 'Continuous' self.emitter = emitter self.variables = variables self.running = False self.index_save_image = 0 # Defaults for the "light off" case (microseconds). 2,000,000 µs # (2 s) is the value the user previously dialled in by hand to # see the puck in the dark — matches the new auto-exposure # upper limit and the default reset button. self.exposure_time_cam_1 = 2_000_000 self.exposure_time_cam_1_light = 10000 self.exposure_time_cam_2 = 2_000_000 self.exposure_time_cam_2_light = 20000 self.exposure_time_cam_3 = 2_000_000 self.exposure_time_cam_3_light = 10000 self.emitter.cam_1_exposure_time.connect(self.set_exposure_time_1) self.emitter.cam_2_exposure_time.connect(self.set_exposure_time_2) self.emitter.cam_3_exposure_time.connect(self.set_exposure_time_3) self.emitter.default_exposure_time.connect(self.set_default_exposure_time) self.emitter.auto_exposure_time.connect(self.set_auto_exposure_time) self._slots = [None] * self.SLOT_COUNT self._slot_serials = [None] * self.SLOT_COUNT self._applied_exposure = [None] * self.SLOT_COUNT self._applied_exposure_mode = [None] * self.SLOT_COUNT self._last_reconnect_attempt = 0.0 self._converter = None self._announced_no_backend = False # Per-slot dedup of error messages emitted from the hot loop. # _reconcile_slots / _apply_exposure_changes / grab all run every # iteration; without dedup a single persistent error floods # stdout with thousands of identical lines. Each tracker is # cleared whenever the corresponding operation succeeds. # Attach errors are keyed by device-id (the unique device path # pylon reports) instead of slot, because reconcile_slots tries # every device against every empty slot — keying on slot means # the cached message gets clobbered on each device tried in the # same iteration and dedup never matches. self._last_attach_error_by_device = {} self._last_exposure_mode_error = [None] * self.SLOT_COUNT self._last_exposure_error = [None] * self.SLOT_COUNT self._last_grab_error = [None] * self.SLOT_COUNT # Serials the user explicitly disconnected from the camera GUI; # _reconcile_slots skips these so they don't auto-reattach. self._user_disabled_serials = set() # Most recent human-readable status / error message, displayed in # the camera GUI's bottom banner. Updated by _set_status() so the # GUI can poll it on a timer. self.latest_status = "" self.camera_available, self.camera_status_message = check_camera_backend() self.latest_status = self.camera_status_message if self.camera_available: self._init_backend() def _init_backend(self): try: self._tl_factory = pylon.TlFactory.GetInstance() self._converter = pylon.ImageFormatConverter() self._converter.OutputPixelFormat = pylon.PixelType_BGR8packed self._converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned except Exception as e: self.camera_available = False self.camera_status_message = f"Error initializing the camera backend ({e})" print(self.camera_status_message)
[docs] def initialize_cameras(self): # backwards-compat alias for any caller still using the old name self._reconcile_slots(force=True)
[docs] def start_capturing(self): if not self.camera_available: self.finished.emit() return self.running = True self.thread = threading.Thread(target=self.update_cameras, daemon=True) self.thread.start()
[docs] def stop_capturing(self): self.running = False
@pyqtSlot(bool) def set_default_exposure_time(self): if not self.exposure_auto: self.exposure_time_cam_1 = 2_000_000 self.exposure_time_cam_1_light = 10000 self.exposure_time_cam_2 = 2_000_000 self.exposure_time_cam_2_light = 20000 self.exposure_time_cam_3 = 2_000_000 self.exposure_time_cam_3_light = 10000 self.flag_default_exposure_time = True if self.variables.light: exposure_times = [ self.exposure_time_cam_1_light, self.exposure_time_cam_2_light, self.exposure_time_cam_3_light, ] else: exposure_times = [self.exposure_time_cam_1, self.exposure_time_cam_2, self.exposure_time_cam_3] self.emitter.cams_exposure_time_default.emit(exposure_times) else: print('Cannot set the default exposure time when auto exposure is on') @pyqtSlot(bool) def set_auto_exposure_time(self): if not self.exposure_auto: self.exposure_mode = 'Continuous' self.exposure_auto = True else: self.exposure_mode = 'Off' self.exposure_auto = False @pyqtSlot(int) def set_exposure_time_1(self, exposure_time): self.exposure_time_cam_1 = exposure_time @pyqtSlot(int) def set_exposure_time_2(self, exposure_time): self.exposure_time_cam_2 = exposure_time @pyqtSlot(int) def set_exposure_time_3(self, exposure_time): self.exposure_time_cam_3 = exposure_time def _exposure_for_slot(self, slot): if slot == 0: return self.exposure_time_cam_1 if slot == 1: return self.exposure_time_cam_2 return self.exposure_time_cam_3 def _close_slot(self, slot): cam = self._slots[slot] self._slots[slot] = None self._applied_exposure[slot] = None self._applied_exposure_mode[slot] = None if cam is None: return try: if cam.IsGrabbing(): cam.StopGrabbing() except Exception: pass try: if cam.IsOpen(): cam.Close() except Exception: pass def _reconcile_slots(self, force=False): """Fill empty slots from currently-enumerated devices. Slots are bound by serial number: a slot remembers the first serial it was assigned to and will only re-attach to that same physical camera. Brand-new serials fill the lowest empty slot that has never been claimed. """ if pylon is None: return now = time.time() if not force and now - self._last_reconnect_attempt < self.RECONNECT_INTERVAL: return self._last_reconnect_attempt = now try: devices = self._tl_factory.EnumerateDevices() except Exception as e: if not self._announced_no_backend: print(f"Camera enumeration failed: {e}") self._announced_no_backend = True return self._announced_no_backend = False by_serial = {} for dev in devices: try: serial_number = dev.GetSerialNumber() except Exception: serial_number = None if serial_number: by_serial[serial_number] = dev used_serials = set() for slot in range(self.SLOT_COUNT): if self._slots[slot] is not None: try: if self._slots[slot].IsOpen(): used_serials.add(self._slot_serials[slot]) continue except Exception: pass self._close_slot(slot) sn = self._slot_serials[slot] if sn in self._user_disabled_serials: continue if sn is not None and sn in by_serial: if self._attach_slot(slot, by_serial[sn]): used_serials.add(sn) for sn, dev in by_serial.items(): if sn in used_serials or sn in self._user_disabled_serials: continue for slot in range(self.SLOT_COUNT): if self._slots[slot] is not None: continue if self._slot_serials[slot] is not None: continue if self._attach_slot(slot, dev): self._slot_serials[slot] = sn used_serials.add(sn) break def _device_key(self, device_info): """Return a stable identifier for *device_info*. Prefers the serial number; falls back to the device's full path (which still uniquely identifies the physical USB endpoint). """ for getter in ("GetSerialNumber", "GetFullName", "GetDeviceID"): fn = getattr(device_info, getter, None) if fn is None: continue try: value = fn() except Exception: continue if value: return str(value) return repr(device_info) def _attach_slot(self, slot, device_info): device_key = self._device_key(device_info) try: cam = pylon.InstantCamera(self._tl_factory.CreateDevice(device_info)) cam.Open() try: cam.ExposureAuto.SetValue(self.exposure_mode) except Exception: pass if not self.exposure_auto: try: cam.ExposureTime.SetValue(self._exposure_for_slot(slot)) except Exception: pass self._apply_quality_settings(cam) cam.StartGrabbing(pylon.GrabStrategy_LatestImageOnly) except Exception as e: # Dedup per-device. The reconcile loop tries every visible # device against every empty slot, so a single stuck device # would otherwise emit one message per (slot, iteration). msg = str(e) if self._last_attach_error_by_device.get(device_key) != msg: self._last_attach_error_by_device[device_key] = msg print(f"Could not attach camera (slot {slot}): {msg}") self._set_status(f"Camera {device_key}: {msg}") return False # Successful attach - reset the dedup state for this device so # the next failure (if any) prints again. self._last_attach_error_by_device.pop(device_key, None) self._slots[slot] = cam # If we're attaching in auto mode we didn't write ExposureTime, so # leave the cache empty — the manual-mode branch in # _apply_exposure_changes will push the configured value the # first time the user disables auto. self._applied_exposure[slot] = None if self.exposure_auto else self._exposure_for_slot(slot) self._applied_exposure_mode[slot] = self.exposure_mode try: sn = device_info.GetSerialNumber() except Exception: sn = self._slot_serials[slot] print(f"Camera attached in slot {slot} (serial={sn}).") self._set_status(f"Camera {sn} attached (slot {slot}).") return True def _set_status(self, message): """Publish a human-readable status / error string for the GUI.""" self.latest_status = message or "" @staticmethod def _try_set(node, *values): """Try a list of candidate values on a pylon node, accept the first one that sticks. Returns True on success, False otherwise. Used for image-quality features that exist on some Basler models but not others (e.g. BslSharpnessEnhancement on dart/ace2 cameras only). """ if node is None: return False for value in values: try: node.SetValue(value) return True except Exception: continue return False def _apply_quality_settings(self, cam): """Push image-quality tweaks that help with sample alignment. All operations are best-effort: every Basler model exposes a slightly different feature set, so each tweak is wrapped so that an unsupported camera silently keeps its defaults rather than bailing out of the attach path. The goals are: * Pick the highest mono / colour pixel format the camera supports (12-bit if available) for better tonal range before we push to the BGR8 converter. * Enable Basler's PGI / sharpness enhancement and gamma so dark alignment features (specimen edges, puck contours) lift out of the background. * Disable auto-white-balance jitter on colour cameras — for alignment we want a stable image, not one that re-balances on every frame. """ # Pixel format: prefer 12-bit. The pyqtgraph view will still # render 8-bit (via the converter) but a wider source format # gives the auto-level step more headroom. try: pf = cam.PixelFormat current = pf.GetValue() preferred = ( "BayerRG12", "BayerBG12", "BayerGR12", "BayerGB12", "Mono12", "BayerRG8", "BayerBG8", "Mono8", ) available = set() try: available = {entry.GetSymbolic() for entry in pf.GetEntries() if entry.IsAvailable()} except Exception: pass for cand in preferred: if cand in available and cand != current: try: pf.SetValue(cand) break except Exception: continue except Exception: pass # Sharpness / PGI: Basler's image enhancement pipeline. # BslSharpnessEnhancement is the modern node; older firmware # exposes SharpnessEnhancement (no Bsl prefix) and the legacy # PgiMode toggle. try: self._try_set(getattr(cam, "BslSharpnessEnhancement", None), 1.5) except Exception: pass try: self._try_set(getattr(cam, "SharpnessEnhancement", None), 1.5) except Exception: pass try: self._try_set(getattr(cam, "PgiMode", None), "On") except Exception: pass # Gamma < 1 lifts shadows — useful when the light is off and # the specimen detail lives in the darker half of the histogram. try: gsel = getattr(cam, "GammaSelector", None) if gsel is not None: self._try_set(gsel, "User") gamma = getattr(cam, "Gamma", None) if gamma is not None: gamma.SetValue(0.7) except Exception: pass # Lock auto-white-balance off so the image is stable while the # operator is centring the specimen. try: wb = getattr(cam, "BalanceWhiteAuto", None) if wb is not None: self._try_set(wb, "Off") except Exception: pass # Black-level: keep at default but make sure it isn't pinned to # an inherited high value from a previous run. try: bl = getattr(cam, "BlackLevel", None) if bl is not None: bl.SetValue(0) except Exception: pass # Auto-exposure bounds. # # Sample alignment with the light off is very dark, and the # user was bumping ExposureTime up to ~2,000,000 µs by hand to # see the puck. Out of the box Basler caps AutoExposureTime at # something much shorter (often 100,000–500,000 µs), so the # firmware auto-loop hits the ceiling and gives up before the # image is bright enough. Raising the ceiling lets the auto # loop keep extending exposure for dark scenes, while a higher # target brightness (≈0.6 vs. the default 0.5) shifts the # set-point a notch brighter so dim specimen edges remain # visible. Feature names vary across Basler model families # (ace2/dart use AutoExposureTimeUpperLimit, older ace uses # AutoExposureTimeAbsUpperLimit, …), so each set is wrapped. DARK_EXPOSURE_UPPER_US = 3_000_000 # 3 s — covers "light off" alignment. SHORT_EXPOSURE_LOWER_US = 100 # 100 µs — fast end for "light on". TARGET_BRIGHTNESS = 0.6 # 0..1, default ~0.5; lift dark scenes. for name in ("AutoExposureTimeUpperLimit", "AutoExposureTimeAbsUpperLimit"): node = getattr(cam, name, None) if node is not None: try: node.SetValue(DARK_EXPOSURE_UPPER_US) break except Exception: continue for name in ("AutoExposureTimeLowerLimit", "AutoExposureTimeAbsLowerLimit"): node = getattr(cam, name, None) if node is not None: try: node.SetValue(SHORT_EXPOSURE_LOWER_US) break except Exception: continue for name in ("AutoTargetBrightness", "AutoTargetValue", "BslAutoTargetBrightness"): node = getattr(cam, name, None) if node is not None: try: # Some firmwares expect 0..1, some 0..255. Try both. self._try_set(node, TARGET_BRIGHTNESS, int(TARGET_BRIGHTNESS * 255)) break except Exception: continue # ------------------------------------------------------------ public API
[docs] def list_cameras(self): """Return one dict per currently-detected Basler camera. Each dict has ``serial``, ``model``, ``slot`` (or None when the device is detected but not bound), ``attached`` (bool), and ``user_disabled`` (bool — user explicitly disconnected it). """ if pylon is None or not self.camera_available: return [] try: devices = self._tl_factory.EnumerateDevices() except Exception as e: self._set_status(f"Camera enumeration failed: {e}") return [] # serial -> slot, derived from currently-open slots slot_by_serial = {} for slot in range(self.SLOT_COUNT): cam = self._slots[slot] sn = self._slot_serials[slot] if cam is not None and sn is not None: slot_by_serial[sn] = slot out = [] seen = set() for dev in devices: try: sn = dev.GetSerialNumber() except Exception: sn = None if not sn or sn in seen: continue seen.add(sn) try: model = dev.GetModelName() except Exception: model = "" out.append( { "serial": sn, "model": model, "slot": slot_by_serial.get(sn), "attached": sn in slot_by_serial, "user_disabled": sn in self._user_disabled_serials, } ) return out
[docs] def disconnect_serial(self, serial): """Close any open slot bound to *serial* and prevent auto-reattach. Use :meth:`connect_serial` to re-enable it. """ if not serial: return self._user_disabled_serials.add(serial) for slot in range(self.SLOT_COUNT): if self._slot_serials[slot] == serial and self._slots[slot] is not None: self._close_slot(slot) self._set_status(f"Disconnected camera {serial} from slot {slot}.") return self._set_status(f"Camera {serial} marked disconnected.")
[docs] def connect_serial(self, serial): """Permit *serial* to attach again and force a reconcile pass.""" if not serial: return self._user_disabled_serials.discard(serial) # Drop the cached attach error so the next failure (if any) # prints fresh. self._last_attach_error_by_device.pop(serial, None) try: self._reconcile_slots(force=True) except Exception as e: self._set_status(f"Could not connect {serial}: {e}") return for slot in range(self.SLOT_COUNT): if self._slot_serials[slot] == serial and self._slots[slot] is not None: self._set_status(f"Connected camera {serial} (slot {slot}).") return self._set_status(f"Camera {serial} could not be attached — see terminal log.")
def _apply_exposure_changes(self): for slot in range(self.SLOT_COUNT): cam = self._slots[slot] if cam is None: continue target_mode = self.exposure_mode if self._applied_exposure_mode[slot] != target_mode: try: cam.ExposureAuto.SetValue(target_mode) # Gain follows exposure: in auto mode, let the camera # also auto-tune the gain so low-light / high-light # scenes (light on/off) converge faster and cleaner. # In manual mode, GainAuto must be Off or the camera # will keep overriding the user's exposure value. try: cam.GainAuto.SetValue(target_mode) except Exception: pass # Auto-mode wrote its own value into ExposureTime; the # cache no longer reflects the camera. Invalidate so # the manual block below always re-pushes the user's # configured exposure on the next iteration. self._applied_exposure[slot] = None self._applied_exposure_mode[slot] = target_mode self._last_exposure_mode_error[slot] = None except Exception as e: msg = str(e) if self._last_exposure_mode_error[slot] != msg: self._last_exposure_mode_error[slot] = msg print(f"Slot {slot} exposure-auto change failed: {msg}") # Skip pushing manual exposure values while the camera is in # auto mode — the camera owns the value and writing here just # races with the firmware loop. if self.exposure_mode != 'Off': continue target_exposure = self._exposure_for_slot(slot) if self._applied_exposure[slot] != target_exposure: try: cam.ExposureTime.SetValue(target_exposure) self._applied_exposure[slot] = target_exposure self._last_exposure_error[slot] = None except Exception as e: msg = str(e) if self._last_exposure_error[slot] != msg: self._last_exposure_error[slot] = msg print(f"Slot {slot} exposure change failed: {msg}")
[docs] def update_cameras(self): if not self.camera_available: self.finished.emit() return last_save_time = time.time() self._reconcile_slots(force=True) while self.running: self._reconcile_slots() self._apply_exposure_changes() any_open = False grabbed_images = [None] * self.SLOT_COUNT for slot in range(self.SLOT_COUNT): cam = self._slots[slot] if cam is None: continue any_open = True try: if not cam.IsGrabbing(): cam.StartGrabbing(pylon.GrabStrategy_LatestImageOnly) grab = cam.RetrieveResult(2000, pylon.TimeoutHandling_ThrowException) try: if grab.GrabSucceeded(): image = self._converter.Convert(grab) grabbed_images[slot] = image.GetArray() finally: grab.Release() except Exception as e: msg = str(e) if self._last_grab_error[slot] != msg: self._last_grab_error[slot] = msg print(f"Slot {slot} grab failed: {msg}; will try to reconnect.") self._set_status(f"Slot {slot} grab failed: {msg}") self._close_slot(slot) else: # Successful grab - clear the dedup state. self._last_grab_error[slot] = None self._emit_images(grabbed_images) if self.variables.clear_index_save_image: self.variables.clear_index_save_image = False self.index_save_image = 0 now = time.time() if now - last_save_time >= self.variables.save_meta_interval_camera and self.variables.start_flag: last_save_time = now self._save_screenshots(grabbed_images) if self.variables.light_switch or self.flag_default_exposure_time: self.light_switch() self.variables.light_switch = False self.flag_default_exposure_time = False time.sleep(0.5) if not self.variables.flag_camera_grab: break for slot in range(self.SLOT_COUNT): self._close_slot(slot) self.finished.emit()
def _emit_images(self, images): # Slot 0 -> img0 (side overview), Slot 1 -> img1 (top overview). # img2 mirrors slot 0 so the existing 'angle' view doesn't go blank # when only one camera is connected. img0 = images[0] if len(images) > 0 else None img1 = images[1] if len(images) > 1 else None if img0 is not None: self.emitter.img0_orig.emit(np.swapaxes(img0, 0, 1)) if img1 is not None: self.emitter.img1_orig.emit(np.swapaxes(img1, 0, 1)) angle_src = img0 if img0 is not None else img1 if angle_src is not None: self.emitter.img2_orig.emit(np.swapaxes(angle_src, 0, 1)) def _save_screenshots(self, images): path_meta = self.variables.path_meta labels = ("camera_side", "camera_top", "camera_45") save_sources = list(images) if len(save_sources) >= 1 and (len(save_sources) < 3 or save_sources[2] is None): while len(save_sources) < 3: save_sources.append(None) save_sources[2] = save_sources[0] for label, img in zip(labels, save_sources): if img is None: continue try: cv2.imwrite(f"{path_meta}/{label}_{self.index_save_image}.png", img) except Exception as e: print(f"Could not save {label} screenshot: {e}") self.index_save_image += 1 time.sleep(0.5)
[docs] def light_switch(self): if self.exposure_auto: return try: light_on = bool(self.variables.light) slot_targets = ( self.exposure_time_cam_1_light if light_on else self.exposure_time_cam_1, self.exposure_time_cam_2_light if light_on else self.exposure_time_cam_2, self.exposure_time_cam_3_light if light_on else self.exposure_time_cam_3, ) for slot in range(self.SLOT_COUNT): cam = self._slots[slot] if cam is None: continue target = slot_targets[slot] cam.ExposureTime.SetValue(target) self._applied_exposure[slot] = target except Exception as e: print(f"Error in switching the light: {e}")