import threading
import time
import cv2
import numpy as np
from PyQt6.QtCore import QObject, pyqtSlot, pyqtSignal
try:
from pypylon import pylon
except Exception as exc: # pragma: no cover - depends on local Basler runtime
pylon = None
_PYPYLON_IMPORT_ERROR = exc
else:
_PYPYLON_IMPORT_ERROR = None
[docs]
def check_camera_availability(required_cameras: int = 2) -> tuple[bool, str]:
"""Return whether the Basler camera backend has enough connected devices."""
if pylon is None:
return False, f"Camera backend is unavailable ({_PYPYLON_IMPORT_ERROR})"
try:
devices = pylon.TlFactory.GetInstance().EnumerateDevices()
except Exception as exc: # pragma: no cover - backend specific
return False, f"Unable to enumerate cameras ({exc})"
count = len(devices)
if count < required_cameras:
plural = "s" if count != 1 else ""
return (
False,
f"Detected {count} Basler camera{plural}; at least {required_cameras} are required.",
)
return True, f"Detected {count} Basler cameras."
[docs]
class CameraWorker(QObject):
"""
This class is used to control the BASLER Cameras.
"""
finished = pyqtSignal() # Define the finished signal
def __init__(self, variables, emitter):
"""
Constructor function which initializes and setups all variables
and parameters for the class.
Args:
variables: The class object of the Variables class.
emitter: The class object of the Emitter class.
Return:
None
"""
super().__init__()
self.flag_default_exposure_time = None
self.exposure_auto = None
self.emitter = emitter
self.variables = variables
self.camera_available = False
self.camera_status_message = ""
self.running = False
self.index_save_image = 0
self.exposure_time_cam_1 = 400000
self.exposure_time_cam_1_light = 10000
self.exposure_time_cam_2 = 1000000
self.exposure_time_cam_2_light = 20000
self.exposure_time_cam_3 = 400000
self.exposure_time_cam_3_light = 10000
self.emitter.cam_1_exposure_time = emitter.cam_1_exposure_time
self.emitter.cam_2_exposure_time = emitter.cam_2_exposure_time
self.emitter.cam_3_exposure_time = emitter.cam_3_exposure_time
self.emitter.default_exposure_time = emitter.default_exposure_time
self.emitter.cam_1_exposure_time.connect(self.set_exposure_time_1)
self.emitter.cam_2_exposure_time.connect(self.set_exposure_time_2)
self.emitter.cam_3_exposure_time.connect(self.set_exposure_time_3)
self.emitter.default_exposure_time.connect(self.set_default_exposure_time)
self.emitter.auto_exposure_time.connect(self.set_auto_exposure_time)
self.initialize_cameras()
[docs]
def start_capturing(self):
if not self.camera_available:
self.finished.emit()
return
self.running = True
self.thread = threading.Thread(target=self.update_cameras)
self.thread.start()
[docs]
def stop_capturing(self):
self.running = False
@pyqtSlot(bool)
def set_default_exposure_time(self):
"""
This class method sets
Args:
None
Return:
None
"""
if not self.exposure_auto:
self.exposure_time_cam_1 = 400000
self.exposure_time_cam_1_light = 10000
self.exposure_time_cam_2 = 1000000
self.exposure_time_cam_2_light = 20000
self.exposure_time_cam_3 = 400000
self.exposure_time_cam_3_light = 10000
self.flag_default_exposure_time = True
if self.variables.light:
exposure_times = [self.exposure_time_cam_1_light, self.exposure_time_cam_2_light,
self.exposure_time_cam_3_light]
self.emitter.cams_exposure_time_default.emit(exposure_times)
else:
exposure_times = [self.exposure_time_cam_1, self.exposure_time_cam_2,
self.exposure_time_cam_3]
self.emitter.cams_exposure_time_default.emit(exposure_times)
else:
print('Cannot set the default exposure time when auto exposure is on')
@pyqtSlot(bool)
def set_auto_exposure_time(self):
"""
This class method sets
Args:
None
Return:
None
"""
if not self.exposure_auto:
self.exposure_mode = 'Continuous'
self.exposure_auto = True
elif self.exposure_auto:
self.exposure_mode = 'Off'
self.exposure_auto = False
@pyqtSlot(int)
def set_exposure_time_1(self, exposure_time):
"""
This class method sets
Args:
exposure_time: The exposure time for the camera.
Return:
None
"""
self.exposure_time_cam_1 = exposure_time
@pyqtSlot(int)
def set_exposure_time_2(self, exposure_time):
"""
This class method sets
Args:
exposure_time: The exposure time for the camera.
Return:
None
"""
self.exposure_time_cam_2 = exposure_time
@pyqtSlot(int)
def set_exposure_time_3(self, exposure_time):
"""
This class method sets
Args:
exposure_time: The exposure time for the camera.
Return:
None
"""
self.exposure_time_cam_3 = exposure_time
[docs]
def initialize_cameras(self):
"""
Initializes and sets up the cameras.
Args:
None
Return:
None
"""
available, message = check_camera_availability(required_cameras=2)
self.camera_available = available
self.camera_status_message = message
self.cameras = None
if not available:
print(message)
return
try:
maxCamerasToUse = 2
self.tlFactory = pylon.TlFactory.GetInstance()
self.devices = self.tlFactory.EnumerateDevices()
self.cameras = pylon.InstantCameraArray(min(len(self.devices), maxCamerasToUse))
for i, cam in enumerate(self.cameras):
cam.Attach(self.tlFactory.CreateDevice(self.devices[i]))
self.converter = pylon.ImageFormatConverter()
self.converter.OutputPixelFormat = pylon.PixelType_BGR8packed
self.converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned
self.cameras[0].Open()
self.cameras[0].ExposureAuto.SetValue('Off')
self.cameras[0].ExposureTime.SetValue(self.exposure_time_cam_1)
self.cameras[1].Open()
self.cameras[1].ExposureAuto.SetValue('Off')
self.cameras[1].ExposureTime.SetValue(self.exposure_time_cam_2)
self.exposure_auto = False
except Exception as e:
self.camera_available = False
self.camera_status_message = f"Error in initializing the camera class ({e})"
self.cameras = None
print(self.camera_status_message)
[docs]
def update_cameras(self):
"""
This class method sets up the cameras to capture the required images.
Args:
None
Return:
None
"""
if not self.camera_available or self.cameras is None:
self.finished.emit()
return
retry_attempts = 5
tmp_exposure_time_cam_1 = self.exposure_time_cam_1
tmp_exposure_time_cam_2 = self.exposure_time_cam_2
# tmp_exposure_time_cam_3 = self.exposure_time_cam_3
# set the auto exposure mode off
self.exposure_mode = 'Off'
tmp_exposure_mode = self.exposure_mode
for attempt in range(retry_attempts):
try:
self.cameras.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
start_time = time.time()
while self.cameras.IsGrabbing() and self.running:
if tmp_exposure_mode != self.exposure_mode:
tmp_exposure_mode = self.exposure_mode
# self.cameras[0].open()
self.cameras[0].ExposureAuto.SetValue(self.exposure_mode)
# self.cameras[1].open()
self.cameras[1].ExposureAuto.SetValue(self.exposure_mode)
# self.cameras[2].open()
# self.cameras[2].ExposureAuto.SetValue(self.exposure_mode)
if tmp_exposure_time_cam_1 != self.exposure_time_cam_1:
tmp_exposure_time_cam_1 = self.exposure_time_cam_1
# self.cameras[0].open()
self.cameras[0].ExposureTime.SetValue(self.exposure_time_cam_1)
if tmp_exposure_time_cam_2 != self.exposure_time_cam_2:
tmp_exposure_time_cam_2 = self.exposure_time_cam_2
# self.cameras[1].open()
self.cameras[1].ExposureTime.SetValue(self.exposure_time_cam_2)
# if tmp_exposure_time_cam_3 != self.exposure_time_cam_3:
# tmp_exposure_time_cam_3 = self.exposure_time_cam_3
# self.cameras[2].open()
# self.cameras[2].ExposureTime.SetValue(self.exposure_time_cam_3)
current_time = time.time()
try:
grabResult0 = self.cameras[0].RetrieveResult(8000, pylon.TimeoutHandling_ThrowException)
grabResult1 = self.cameras[1].RetrieveResult(8000, pylon.TimeoutHandling_ThrowException)
image0 = self.converter.Convert(grabResult0)
img0 = image0.GetArray()
image1 = self.converter.Convert(grabResult1)
img1 = image1.GetArray()
except Exception as e:
print(f"Error in grabbing the images from the camera: {e}")
break
self.img0_orig = img0
self.img1_orig = img1
self.img2_orig = img0
self.emitter.img0_orig.emit(np.swapaxes(self.img0_orig, 0, 1))
self.emitter.img1_orig.emit(np.swapaxes(self.img1_orig, 0, 1))
self.emitter.img2_orig.emit(np.swapaxes(self.img2_orig, 0, 1))
if self.variables.clear_index_save_image:
self.variables.clear_index_save_image = False
self.index_save_image = 0
if current_time - start_time >= self.variables.save_meta_interval_camera and self.variables.start_flag:
start_time = time.time()
path_meta = self.variables.path_meta
cv2.imwrite(path_meta + "/camera_side_%s.png" % self.index_save_image, self.img0_orig)
cv2.imwrite(path_meta + '/camera_top_%s.png' % self.index_save_image, self.img1_orig)
cv2.imwrite(path_meta + '/camera_45_%s.png' % self.index_save_image, self.img2_orig)
self.index_save_image += 1
time.sleep(0.5)
grabResult0.Release()
grabResult1.Release()
if self.variables.light_switch or self.flag_default_exposure_time:
self.light_switch()
self.variables.light_switch = False
self.flag_default_exposure_time = False
time.sleep(0.5)
if not self.variables.flag_camera_grab:
break
break # Exit the retry loop if successful
except Exception as e:
print(f"Error during update_cameras attempt {attempt + 1}: {e}")
self.initialize_cameras()
time.sleep(1)
self.finished.emit() # Emit the finished signal when done
[docs]
def light_switch(self):
"""
This class method sets the Exposure time based on a flag.
Args:
None
Return:
None
"""
if not self.exposure_auto:
try:
if self.variables.light:
# self.cameras[0].Open()
self.cameras[0].ExposureTime.SetValue(self.exposure_time_cam_1_light)
# self.cameras[1].Open()
self.cameras[1].ExposureTime.SetValue(self.exposure_time_cam_2_light)
else:
# self.cameras[0].Open()
self.cameras[0].ExposureTime.SetValue(self.exposure_time_cam_1)
# self.cameras[1].Open()
self.cameras[1].ExposureTime.SetValue(self.exposure_time_cam_2)
except Exception as e:
print(f"Error in switching the light: {e}")