Source code for pyccapt.control.tdc_surface_concept.scTDC

# -*- coding: utf-8 -*-
"""
Copyright 2018, 2019, 2021 Surface Concept GmbH

This file is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This file is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public License
along with this file.  If not, see <https://www.gnu.org/licenses/>.

----------------------------------------------------------------------------

Python wrapper for a subset of the scTDC library built on pythons ctypes
module. The class scTDClib holds the dynamically loaded library object and
provides all low-level functions supported by this wrapper. Additionally,
this module defines structures from the scTDC_types.h header file. The
functions defined in the scTDC class aim to match the original scTDC C API
as close as possible, except for a few cases where they have been changed
for more convenient use (sc_get_err_msg, for example).

A higher-level API is defined in the Device and Pipe classes, which
facilitates usage of 1D,2D,3D histogram pipes, statistics data, and time
histograms of stand-alone TDCs.

A higher-level API for processing of event data is provided via the classes
buffered_data_callbacks_pipe and usercallbacks_pipe. usercallbacks_pipe has
been in existence for a longer time, but it suffers from a high number of
callbacks into python code which makes it rather slow. It is still included
for backwards compatibility.
The buffered_data_callbacks_pipe improves the performance by buffering a
user-defined number of events and invoking callbacks into python code after
filling this buffer and optionally at the end of each measurement.
The buffer size can be tuned large enough to reduce the number of callbacks
into python code. The data offered in the callbacks are in the form of 1D
numpy arrays --- one array for one selected event data field --- ready to be
processed in vectorized mathematical operations.
The buffered_data_callbacks_pipe requires scTDC1 library versions not older
than 1.3010.0.

Following constants are defined for selecting which event data fields shall
be buffered by the buffered_data_callbacks_pipe interface:
SC_DATA_FIELD_SUBDEVICE, SC_DATA_FIELD_CHANNEL, SC_DATA_FIELD_START_COUNTER,
SC_DATA_FIELD_TIME_TAG, SC_DATA_FIELD_DIF1, SC_DATA_FIELD_DIF2,
SC_DATA_FIELD_TIME, SC_DATA_FIELD_MASTER_RST_COUNTER, SC_DATA_FIELD_ADC,
SC_DATA_FIELD_SIGNAL1BIT.

Some remarks concerning low-level interfaces:
* Following constants are defined for pipe types: TDC_HISTO, DLD_IMAGE_XY,
  DLD_IMAGE_XT, DLD_IMAGE_YT, DLD_IMAGE_3D, DLD_SUM_HISTO, STATISTICS,
  TMSTAMP_TDC_HISTO, TDC_STATISTICS, DLD_STATISTICS, USER_CALLBACKS,
  DLD_IMAGE_XY_EXT, BUFFERED_DATA_CALLBACKS.
  These are to be used for the pipe_type parameter in sc_pipe_open2.
* Following constants are defined for bit sizes: BS8, BS16, BS32, BS64.
  These are to be used for the depth field in the structures
  sc_pipe_dld_image_xyt_params_t,
  sc_pipe_tdc_histo_params_t.

----------------------------------------------------------------------------

Additions 2019-09-19 : structures for USER_CALLBACKS pipe
Additions 2019-10-15 : higher-level Device and Pipe classes
Additions 2020-05-14 : added wrapper for a separate library to save DLD
                       events to HDF5
Additions 2021-06-15 : added support for BUFFERED_DATA_CALLBACKS pipe
                       (requires scTDC1 library version >= 1.3010.0)
"""

__version__ = "1.2.0"

import ctypes
import os
from pathlib import Path
import time
import traceback
from contextlib import contextmanager

try:  # most stuff works without numpy
    import numpy as np
except:
    pass


# pipe types
TDC_HISTO = 0
DLD_IMAGE_XY = 1
DLD_IMAGE_XT = 2
DLD_IMAGE_YT = 3
DLD_IMAGE_3D = 4
DLD_SUM_HISTO = 5  # Used to get dld time histogram data
STATISTICS = 6  # Used to get statistics for last exposure
TMSTAMP_TDC_HISTO = 7
TDC_STATISTICS = 8
DLD_STATISTICS = 9
USER_CALLBACKS = 10  # slow in python
DLD_IMAGE_XY_EXT = 11
BUFFERED_DATA_CALLBACKS = 12  # more efficient variant of USER_CALLBACKS

# bitsizes for depth parameter in
#  sc_pipe_dld_image_xyt_params_t  and
#  sc_pipe_tdc_histo_params_t
BS8 = 0
BS16 = 1
BS32 = 2
BS64 = 3

# callback reasons for end-of-measurement callback in conjunction with
# sc_tdc_set_complete_callback2
CBR_COMPLETE = 1
CBR_USER_ABORT = 2
CBR_BUFFER_FULL = 3
CBR_EARLY_NOTIF = 4
CBR_DICT = {
    CBR_COMPLETE: "Measurement and data processing completed.",
    CBR_USER_ABORT: "Measurement was interrupted by user.",
    CBR_BUFFER_FULL: "Measurement was aborted because buffers were full.",
    CBR_EARLY_NOTIF: "Acquisition finished, not all data processed yet.",
}

# enum sc_data_field_t
SC_DATA_FIELD_SUBDEVICE = 0x0001
SC_DATA_FIELD_CHANNEL = 0x0002
SC_DATA_FIELD_START_COUNTER = 0x0004
SC_DATA_FIELD_TIME_TAG = 0x0008
SC_DATA_FIELD_DIF1 = 0x0010
SC_DATA_FIELD_DIF2 = 0x0020
SC_DATA_FIELD_TIME = 0x0040
SC_DATA_FIELD_MASTER_RST_COUNTER = 0x0080
SC_DATA_FIELD_ADC = 0x0100
SC_DATA_FIELD_SIGNAL1BIT = 0x0200

_FUNCTYPE = None
if os.name == 'nt':
    _FUNCTYPE = ctypes.WINFUNCTYPE
else:
    _FUNCTYPE = ctypes.CFUNCTYPE


[docs] class sc3du_t(ctypes.Structure): _fields_ = [("x", ctypes.c_uint), ("y", ctypes.c_uint), ("time", ctypes.c_uint64)]
[docs] class sc3d_t(ctypes.Structure): _fields_ = [("x", ctypes.c_int), ("y", ctypes.c_int), ("time", ctypes.c_int64)]
[docs] class roi_t(ctypes.Structure): _fields_ = [("offset", sc3d_t), ("size", sc3du_t)]
ALLOCATORFUNC = _FUNCTYPE(ctypes.c_int, ctypes.POINTER(None), ctypes.POINTER(ctypes.POINTER(None)))
[docs] class sc_pipe_dld_image_xyt_params_t(ctypes.Structure): _fields_ = [ ("depth", ctypes.c_int), ("channel", ctypes.c_int), ("modulo", ctypes.c_uint64), ("binning", sc3du_t), ("roi", roi_t), ("accumulation_ms", ctypes.c_uint), ("allocator_owner", ctypes.c_char_p), ("allocator_cb", ALLOCATORFUNC), ]
[docs] class sc_pipe_tdc_histo_params_t(ctypes.Structure): _fields_ = [ ("depth", ctypes.c_int), ("channel", ctypes.c_uint), ("modulo", ctypes.c_uint64), ("binning", ctypes.c_uint), ("offset", ctypes.c_uint64), ("size", ctypes.c_uint), ("accumulation_ms", ctypes.c_uint), ("allocator_owner", ctypes.c_char_p), ("allocator_cb", ALLOCATORFUNC), ]
[docs] class sc_pipe_statistics_params_t(ctypes.Structure): _fields_ = [("allocator_owner", ctypes.c_char_p), ("allocator_cb", ALLOCATORFUNC)]
[docs] class statistics_t(ctypes.Structure): _fields_ = [ ("counts_read", ctypes.c_uint * 64), ("counts_received", ctypes.c_uint * 64), ("events_found", ctypes.c_uint * 4), ("events_in_roi", ctypes.c_uint * 4), ("events_received", ctypes.c_uint * 4), ("counters", ctypes.c_uint * 64), ("reserved", ctypes.c_uint * 52), ]
[docs] class tdc_event_t(ctypes.Structure): _fields_ = [ ("subdevice", ctypes.c_uint), ("channel", ctypes.c_uint), ("start_counter", ctypes.c_ulonglong), ("time_tag", ctypes.c_ulonglong), ("time_data", ctypes.c_ulonglong), ("sign_counter", ctypes.c_ulonglong), ]
[docs] class dld_event_t(ctypes.Structure): _fields_ = [ ("start_counter", ctypes.c_ulonglong), ("time_tag", ctypes.c_ulonglong), ("subdevice", ctypes.c_uint), ("channel", ctypes.c_uint), ("sum", ctypes.c_ulonglong), ("dif1", ctypes.c_ushort), ("dif2", ctypes.c_ushort), ("master_rst_counter", ctypes.c_uint), ("adc", ctypes.c_ushort), ("signal1bit", ctypes.c_ushort), ]
[docs] class sc_pipe_buf_callback_args(ctypes.Structure): _fields_ = [ ("event_index", ctypes.c_ulonglong), ("som_indices", ctypes.POINTER(ctypes.c_ulonglong)), ("ms_indices", ctypes.POINTER(ctypes.c_ulonglong)), ("subdevice", ctypes.POINTER(ctypes.c_uint)), ("channel", ctypes.POINTER(ctypes.c_uint)), ("start_counter", ctypes.POINTER(ctypes.c_ulonglong)), ("time_tag", ctypes.POINTER(ctypes.c_uint)), ("dif1", ctypes.POINTER(ctypes.c_uint)), ("dif2", ctypes.POINTER(ctypes.c_uint)), ("time", ctypes.POINTER(ctypes.c_ulonglong)), ("master_rst_counter", ctypes.POINTER(ctypes.c_uint)), ("adc", ctypes.POINTER(ctypes.c_int)), ("signal1bit", ctypes.POINTER(ctypes.c_ushort)), ("som_indices_len", ctypes.c_uint), ("ms_indices_len", ctypes.c_uint), ("data_len", ctypes.c_uint), ("reserved", ctypes.c_char * 12), ]
### ---- callbacks ------------------------------------------------------- # void (*start_of_measure) (void *priv); CB_STARTMEAS = _FUNCTYPE(None, ctypes.POINTER(None)) # void (*end_of_measure) (void *priv); CB_ENDMEAS = CB_STARTMEAS # void (*millisecond_countup) (void *priv); CB_MILLISEC = CB_STARTMEAS # void (*statistics) (void *priv, const struct statistics_t *stat); CB_STATISTICS = _FUNCTYPE(None, ctypes.POINTER(None), ctypes.POINTER(statistics_t)) # void (*tdc_event) # (void *priv, const struct sc_TdcEvent *const event_array, # size_t event_array_len); CB_TDCEVENT = _FUNCTYPE(None, ctypes.POINTER(None), ctypes.POINTER(tdc_event_t), ctypes.c_size_t) # void (*dld_event) # (void *priv, const struct sc_DldEvent *const event_array, # size_t event_array_len); CB_DLDEVENT = _FUNCTYPE(None, ctypes.POINTER(None), ctypes.POINTER(dld_event_t), ctypes.c_size_t) # the following callback type does not belong to the user callbacks, but is # used in the sc_tdc_set_complete_callback2 function # void (*cb)(void *, int)); CB_COMPLETE = _FUNCTYPE(None, ctypes.c_void_p, ctypes.c_int) # the following callback belongs to the BUFFERED_DATA_CALLBACKS pipe CB_BUFDATA_DATA = _FUNCTYPE(None, ctypes.c_void_p, ctypes.POINTER(sc_pipe_buf_callback_args)) CB_BUFDATA_END_OF_MEAS = _FUNCTYPE(ctypes.c_bool, ctypes.c_void_p) ### ---------------------------------------------------------------------------
[docs] class sc_pipe_callbacks(ctypes.Structure): _fields_ = [ ("priv", ctypes.POINTER(None)), ("start_of_measure", CB_STARTMEAS), ("end_of_measure", CB_ENDMEAS), ("millisecond_countup", CB_MILLISEC), ("statistics", CB_STATISTICS), ("tdc_event", CB_TDCEVENT), ("dld_event", CB_DLDEVENT), ]
[docs] class sc_pipe_callback_params_t(ctypes.Structure): _fields_ = [("callbacks", ctypes.POINTER(sc_pipe_callbacks))]
[docs] class sc_pipe_buf_callbacks_params_t(ctypes.Structure): _fields_ = [ ("priv", ctypes.POINTER(None)), ("data", CB_BUFDATA_DATA), ("end_of_measurement", CB_BUFDATA_END_OF_MEAS), ("data_field_selection", ctypes.c_uint), ("max_buffered_data_len", ctypes.c_uint), ("dld_events", ctypes.c_int), ("version", ctypes.c_int), ("reserved", ctypes.c_ubyte * 24), ]
[docs] def copy_statistics(s): assert type(s) == statistics_t r = statistics_t() ctypes.memmove(ctypes.byref(r), ctypes.byref(s), ctypes.sizeof(s)) return r
@contextmanager def _vendor_runtime_environment(module_dir: Path): old_cwd = Path.cwd() dll_directory = None set_dll_directory = None if os.name == 'nt': if hasattr(os, "add_dll_directory"): dll_directory = os.add_dll_directory(str(module_dir)) try: kernel32 = ctypes.windll.kernel32 kernel32.SetDllDirectoryW.argtypes = [ctypes.c_wchar_p] kernel32.SetDllDirectoryW.restype = ctypes.c_bool set_dll_directory = kernel32.SetDllDirectoryW set_dll_directory(str(module_dir)) except Exception: set_dll_directory = None try: os.chdir(module_dir) yield finally: os.chdir(old_cwd) if set_dll_directory is not None: try: set_dll_directory(None) except Exception: pass if dll_directory is not None: dll_directory.close()
[docs] class scTDClib: def __init__(self): """ loads the shared library """ self._module_dir = Path(__file__).resolve().parent if os.name == 'nt': with _vendor_runtime_environment(self._module_dir): self.lib = ctypes.WinDLL(str(self._module_dir / "scTDC1.dll")) self.lib.sc_tdc_init_inifile.argtypes = [ctypes.c_char_p] self.lib.sc_get_err_msg.argtypes = [ctypes.c_int, ctypes.c_char_p] else: self.lib = ctypes.CDLL("libscTDC.so.1") self.lib.sc_tdc_init_inifile.argtypes = [ctypes.c_char_p] self.lib.sc_tdc_init_inifile.restype = ctypes.c_int self.lib.sc_get_err_msg.argtypes = [ctypes.c_int, ctypes.c_char_p] self.lib.sc_get_err_msg.restype = None self.lib.sc_tdc_deinit2.argtypes = [ctypes.c_int] self.lib.sc_tdc_deinit2.restype = ctypes.c_int self.lib.sc_tdc_start_measure2.argtypes = [ctypes.c_int, ctypes.c_int] self.lib.sc_tdc_start_measure2.restype = ctypes.c_int self.lib.sc_tdc_interrupt2.argtypes = [ctypes.c_int] self.lib.sc_tdc_interrupt2.restype = ctypes.c_int self.lib.sc_pipe_open2.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.POINTER(None)] self.lib.sc_pipe_open2.restype = ctypes.c_int self.lib.sc_pipe_close2.argtypes = [ctypes.c_int, ctypes.c_int] self.lib.sc_pipe_close2.restype = ctypes.c_int self.lib.sc_tdc_get_status2.argtypes = [ctypes.c_int, ctypes.POINTER(ctypes.c_int)] self.lib.sc_tdc_get_status2.restype = ctypes.c_int self.lib.sc_pipe_read2.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.POINTER(None)), ctypes.c_uint] self.lib.sc_pipe_read2.restype = ctypes.c_int self.lib.sc_tdc_get_statistics2.argtypes = [ctypes.c_int, ctypes.POINTER(statistics_t)] self.lib.sc_tdc_set_complete_callback2.argtypes = [ctypes.c_int, ctypes.c_void_p, CB_COMPLETE] self.lib.sc_tdc_set_complete_callback2.restype = ctypes.c_int
[docs] def sc_tdc_init_inifile(self, inifile_path="tdc_gpx3.ini"): """ Initializes the hardware and loads the initial settings reading it from the specified ini file. Returns an integer, containing a non-negative device iterator on success, or, a negative error code in case of failure. The device descriptor is needed for almost all other functions. """ ini_path = Path(inifile_path) if not ini_path.is_absolute(): candidate = self._module_dir / ini_path if candidate.exists(): ini_path = candidate if os.name == 'nt': with _vendor_runtime_environment(self._module_dir): return self.lib.sc_tdc_init_inifile(str(ini_path).encode('utf-8')) return self.lib.sc_tdc_init_inifile(str(ini_path).encode('utf-8'))
[docs] def sc_get_err_msg(self, errcode): """ Returns an error message to the given error code (signed integer) """ if errcode >= 0: return "" sbuf = ctypes.create_string_buffer(1024) self.lib.sc_get_err_msg(errcode, sbuf) if type(sbuf.value) == type(b''): return sbuf.value.decode('utf-8') else: return sbuf.value
[docs] def sc_tdc_deinit2(self, dev_desc): """ Deinitialize the hardware for the given device descriptor which was retrieved from sc_tdc_init_inifile. Returns 0 on success or negative error code. """ return self.lib.sc_tdc_deinit2(dev_desc)
[docs] def sc_tdc_start_measure2(self, dev_desc, exposure_ms): """ Start a measurement (asynchronously/non-blocking) for the hardware indicated by the device descriptor with the given exposure time in milliseconds. Returns 0 on success, or negative error code. """ return self.lib.sc_tdc_start_measure2(dev_desc, exposure_ms)
[docs] def sc_tdc_interrupt2(self, dev_desc): """Interrupts a measurement asynchronously (non-blocking)""" return self.lib.sc_tdc_interrupt2(dev_desc)
[docs] def sc_pipe_open2(self, dev_desc, pipe_type, pipe_params): """ Open a pipe. dev_desc is the device descriptor. Here is a table of which pipe_type requires which type of pipe_params: DLD_IMAGE_XY : sc_pipe_dld_image_xyt_params_t DLD_IMAGE_XT : sc_pipe_dld_image_xyt_params_t DLD_IMAGE_YT : sc_pipe_dld_image_xyt_params_t DLD_IMAGE_3D : sc_pipe_dld_image_xyt_params_t DLD_SUM_HISTO : sc_pipe_dld_image_xyt_params_t TDC_HISTO : sc_pipe_tdc_histo_params_t STATISTICS : sc_pipe_statistics_params_t This python function expects a structure object for pipe_params. Do not pass a pointer-to-structure here. ctypes.addressof(pipe_params) is already internally performed for the call of the C function. /// Support for other pipe types is a TODO item /// Returns an integer containing either the pipe handle (non-negative), or, in case of failure, an error code (negative number) """ assert ( (pipe_type == DLD_IMAGE_XY and isinstance(pipe_params, sc_pipe_dld_image_xyt_params_t)) or (pipe_type == DLD_IMAGE_XT and isinstance(pipe_params, sc_pipe_dld_image_xyt_params_t)) or (pipe_type == DLD_IMAGE_YT and isinstance(pipe_params, sc_pipe_dld_image_xyt_params_t)) or (pipe_type == DLD_IMAGE_3D and isinstance(pipe_params, sc_pipe_dld_image_xyt_params_t)) or (pipe_type == DLD_SUM_HISTO and isinstance(pipe_params, sc_pipe_dld_image_xyt_params_t)) or (pipe_type == TDC_HISTO and isinstance(pipe_params, sc_pipe_tdc_histo_params_t)) or (pipe_type == STATISTICS and isinstance(pipe_params, sc_pipe_statistics_params_t)) or (pipe_type == USER_CALLBACKS and isinstance(pipe_params, sc_pipe_callback_params_t)) or (pipe_type == BUFFERED_DATA_CALLBACKS and isinstance(pipe_params, sc_pipe_buf_callbacks_params_t)) ) return self.lib.sc_pipe_open2(dev_desc, pipe_type, ctypes.addressof(pipe_params))
[docs] def sc_pipe_close2(self, dev_desc, pipe_handle): """ Close a pipe. dev_desc is the device descriptor. pipe_handle is the pipe handle as returned by sc_pipe_open2. Returns 0 on success, or negative error code. """ return self.lib.sc_pipe_close2(dev_desc, pipe_handle)
[docs] def sc_pipe_read2(self, dev_desc, pipe_handle, timeout): """ Read a pipe. dev_desc is the device descriptor. pipe_handle is the pipe handle as returned by sc_pipe_open2. timeout is the timeout after which the function returns when no data is available. Returns a tuple containing the return code and a ctypes.POINTER to the databuffer. """ bufptr = ctypes.POINTER(None) retcode = self.lib.sc_pipe_read2(dev_desc, pipe_handle, ctypes.byref(bufptr), timeout) return (retcode, bufptr)
[docs] def sc_tdc_get_status2(self, dev_desc): """ Get status. Returns 0 (idle), 1 (exposure), or negative error codes """ statuscode = ctypes.c_int() # statuscodeptr = ctypes.POINTER(ctypes.c_int)(statuscode) retcode = self.lib.sc_tdc_get_status2(dev_desc, ctypes.byref(statuscode)) if retcode < 0: return retcode else: return 1 if statuscode.value == 0 else 0
[docs] def sc_tdc_get_statistics2(self, dev_desc): """ This function is deprecated! Use the statistics pipe, instead. This function is kept for older scTDC library versions. """ stat1 = statistics_t() retcode = self.lib.sc_tdc_get_statistics2(dev_desc, ctypes.byref(stat1)) if retcode < 0: return retcode else: return stat1
[docs] def sc_tdc_set_complete_callback2(self, dev_desc, privptr, callback): """ Set a measurement complete callback """ return self.lib.sc_tdc_set_complete_callback2(dev_desc, privptr, callback)
[docs] class scTDC_hdf5lib: def __init__(self): """ loads the shared library scTDC_hdf5 """ self._module_dir = Path(__file__).resolve().parent if os.name == 'nt': with _vendor_runtime_environment(self._module_dir): self.lib = ctypes.WinDLL(str(self._module_dir / "scTDC_hdf50.dll")) else: self.lib = ctypes.CDLL("libscTDC_hdf5.so.0") l = self.lib l.sc_tdc_hdf5_create.argtypes = [] l.sc_tdc_hdf5_create.restype = ctypes.c_int l.sc_tdc_hdf5_destroy.argtypes = [ctypes.c_int] l.sc_tdc_hdf5_destroy.restype = ctypes.c_int l.sc_tdc_hdf5_connect.argtypes = [ctypes.c_int, ctypes.c_int] l.sc_tdc_hdf5_connect.restype = ctypes.c_int l.sc_tdc_hdf5_disconnect.argtypes = [ctypes.c_int] l.sc_tdc_hdf5_disconnect.restype = ctypes.c_int l.sc_tdc_hdf5_setactive.argtypes = [ctypes.c_int, ctypes.c_int] l.sc_tdc_hdf5_setactive.restype = ctypes.c_int l.sc_tdc_hdf5_isactive.argtypes = [ctypes.c_int] l.sc_tdc_hdf5_isactive.restype = ctypes.c_int l.sc_tdc_hdf5_cfg_outfile.argtypes = [ctypes.c_int, ctypes.c_char_p] l.sc_tdc_hdf5_cfg_outfile.restype = ctypes.c_int l.sc_tdc_hdf5_cfg_comment.argtypes = [ctypes.c_int, ctypes.c_char_p] l.sc_tdc_hdf5_cfg_comment.restype = ctypes.c_int l.sc_tdc_hdf5_cfg_datasel.argtypes = [ctypes.c_int, ctypes.c_uint] l.sc_tdc_hdf5_cfg_datasel.restype = ctypes.c_int l.sc_tdc_hdf5_version.argtypes = [ctypes.c_char_p, ctypes.c_size_t] l.sc_tdc_hdf5_version.restype = None # abbreviations self.sc_tdc_hdf5_create = l.sc_tdc_hdf5_create self.sc_tdc_hdf5_destroy = l.sc_tdc_hdf5_destroy self.sc_tdc_hdf5_connect = l.sc_tdc_hdf5_connect self.sc_tdc_hdf5_disconnect = l.sc_tdc_hdf5_disconnect self.sc_tdc_hdf5_setactive = l.sc_tdc_hdf5_setactive self.sc_tdc_hdf5_isactive = l.sc_tdc_hdf5_isactive self.sc_tdc_hdf5_cfg_outfile = l.sc_tdc_hdf5_cfg_outfile self.sc_tdc_hdf5_cfg_comment = l.sc_tdc_hdf5_cfg_comment self.sc_tdc_hdf5_cfg_datasel = l.sc_tdc_hdf5_cfg_datasel
[docs] def version(self): BUFLEN = 32 sbuf = ctypes.create_string_buffer(BUFLEN) self.lib.sc_tdc_hdf5_version(sbuf, BUFLEN) if type(sbuf.value) == bytes: return sbuf.value.decode('utf-8') else: return sbuf.value
[docs] def cfg_outfile(self, objID, filepath): self.lib.sc_tdc_hdf5_cfg_outfile(objID, filepath.encode('utf-8'))
[docs] def cfg_comment(self, objID, comment): self.lib.sc_tdc_hdf5_cfg_comment(objID, comment.encode('utf-8'))
[docs] class buffered_data_callbacks_pipe(object): """ Base class for using the "BUFFERED_DATA_CALLBACKS" interface. Requires scTDC1 library version >= 1.3010.0. In comparison to the USER_CALLBACKS pipe, this pipe reduces the number of callbacks into python, buffering a higher number of events within the library before invoking the callbacks. The on_data callback receives a dictionary containing 1D numpy arrays where the size of these arrays can be as large as specified by the max_buffered_data_len parameter. To use this interface, write a class that derives from this class and override the methods on_data, on_end_of_meas """ def __init__( self, lib, dev_desc, data_field_selection=SC_DATA_FIELD_TIME, max_buffered_data_len=(1 << 16), dld_events=True ): """ Parameters ---------- lib : scTDClib a scTDClib object. dev_desc : int device descriptor as returned by sc_tdc_init_inifile(...). data_field_selection : int, optional a 'bitwise or' combination of SC_DATA_FIELD_xyz constants. The default is SC_DATA_FIELD_TIME. max_buffered_data_len : int, optional The number of events that are buffered before invoking the on_data callback. Less events can also be received in the on_data callback, when the user chooses to return True from the on_end_of_meas callback. The default is (1<<16). dld_events : bool, optional if True, receive DLD events. If False, receive TDC events. Depending on the configuration in the tdc_gpx3.ini file, only one type of events may be available. The default is True. Returns ------- None. """ self.dev_desc = dev_desc self.lib = lib self._pipe_desc = None self._open_pipe(data_field_selection, max_buffered_data_len, dld_events) def _open_pipe(self, data_field_selection, max_buffered_data_len, dld_events): p = sc_pipe_buf_callbacks_params_t() p.priv = None self._cb_data = CB_BUFDATA_DATA(lambda x, y: self._data_cb(y)) self._cb_eom = CB_BUFDATA_END_OF_MEAS(lambda x: self.on_end_of_meas()) p.data = self._cb_data p.end_of_measurement = self._cb_eom p.data_field_selection = data_field_selection p.max_buffered_data_len = max_buffered_data_len p.dld_events = 1 if dld_events else 0 p.version = 0 reservedlist = [0] * 24 p.reserved = (ctypes.c_ubyte * 24)(*reservedlist) self._pipe_args = p # prevent garbage collection! self._pipe_desc = self.lib.sc_pipe_open2(self.dev_desc, BUFFERED_DATA_CALLBACKS, p) def _data_cb(self, dptr): d = dptr.contents x = {"event_index": d.event_index, "data_len": d.data_len} f = np.ctypeslib.as_array if d.subdevice: x["subdevice"] = f(d.subdevice, shape=(d.data_len,)) if d.channel: x["channel"] = f(d.channel, shape=(d.data_len,)) if d.start_counter: x["start_counter"] = f(d.start_counter, shape=(d.data_len,)) if d.time_tag: x["time_tag"] = f(d.time_tag, shape=(d.data_len,)) if d.dif1: x["dif1"] = f(d.dif1, shape=(d.data_len,)) if d.dif2: x["dif2"] = f(d.dif2, shape=(d.data_len,)) if d.time: x["time"] = f(d.time, shape=(d.data_len,)) if d.master_rst_counter: x["master_rst_counter"] = f(d.master_rst_counter, shape=(d.data_len,)) if d.adc: x["adc"] = f(d.adc, shape=(d.data_len,)) if d.signal1bit: x["signal1bit"] = f(d.signal1bit, shape=(d.data_len,)) if d.som_indices: x["som_indices"] = f(d.som_indices, shape=(d.som_indices_len,)) if d.ms_indices: x["ms_indices"] = f(d.ms_indices, shape=(d.ms_indices_len,)) self.on_data(x)
[docs] def on_data(self, data): """ Override this method to process the data. Parameters ---------- data : dict A dictionary containing several numpy arrays. The selection of arrays depends on the data_field_selection value used during initialization of the class. The key names in this dictionary, that are always present, are: event_index, data_len Keywords related to regular event data are: subdevice, channel, start_counter, time_tag, dif1, dif2, time, master_rst_counter, adc, signal1bit Keywords related to indexing arrays are: som_indices, ms_indices. These contain event indices where the start of a measurement happened, or a millisecond count up happened. Returns ------- None. """ pass
[docs] def on_end_of_meas(self): """ Override this method to trigger actions at the end of the measurement. Do not call methods that start the next measurement from this callback. This cannot succeed. Use a signalling mechanism into your main thread, instead. Returns ------- bool True indicates that the pipe should transfer the remaining buffered events immediately after returning from this callback. False indicates that the pipe may continue buffering the next measurements until the max_buffered_data_len threshold is reached. """ return True # True signalizes that all buffered data shall be emitted
[docs] def close(self): """ Close the pipe. Returns ------- None. """ self.lib.sc_pipe_close2(self.dev_desc, self._pipe_desc)
[docs] def start_measurement_sync(self, time_ms): """ Start a measurement and wait until it is finished. Parameters ---------- time_ms : int the duration of the measurement in milliseconds. Returns ------- int 0 on success or a negative error code. """ retcode = self.lib.sc_tdc_start_measure2(self.dev_desc, time_ms) if retcode < 0: return retcode time.sleep(time_ms / 1000.0) # sleep expects floating point seconds while self.lib.sc_tdc_get_status2(self.dev_desc) == 1: time.sleep(0.01) return 0
[docs] def start_measurement(self, time_ms, retries=3): """ Start a measurement 'in the background', i.e. don't wait for it to finish. Parameters ---------- time_ms : int the duration of the measurement in milliseconds. retries : int in an asynchronous scheme of measurement sequences, trying to start the next measurement can occasionally result in a "NOT READY" error. Often some thread of the scTDC1 library just needs a few more cycles to reach the "idle" state again, where the start of the next measurement will be accepted. The retries parameter specifies how many retries with 0.001 s sleeps in between will be made before giving up. Returns ------- int 0 on success or a negative error code. """ while True: retcode = self.lib.sc_tdc_start_measure2(self.dev_desc, time_ms) if retcode != -11: # "not ready" error return retcode retries -= 1 if retries <= 0: return -11 time.sleep(0.001)
[docs] class usercallbacks_pipe(object): """ Base class for user implementations of the "USER_CALLBACKS" interface. Derive from this class and override some or all of the methods on_start_of_meas, on_end_of_meas, on_millisecond, on_statistics, on_tdc_event, on_dld_event The lib argument in the constructor expects a scTDClib object. The dev_desc argument in the constructor expects the device descriptor as returned by sc_tdc_init_inifile(...). """ def __init__(self, lib, dev_desc): self.dev_desc = dev_desc self.lib = lib self._pipe_desc = None self._open_pipe() def _open_pipe(self): p = sc_pipe_callbacks() p.priv = None p.start_of_measure = CB_STARTMEAS(lambda x: self.on_start_of_meas()) p.end_of_measure = CB_ENDMEAS(lambda x: self.on_end_of_meas()) p.millisecond_countup = CB_MILLISEC(lambda x: self.on_millisecond()) p.tdc_event = CB_TDCEVENT(lambda x, y, z: self.on_tdc_event(y, z)) p.dld_event = CB_DLDEVENT(lambda x, y, z: self.on_dld_event(y, z)) p.statistics = CB_STATISTICS(lambda x, y: self.on_statistics(y)) self.struct_callbacks = p p2 = sc_pipe_callback_params_t() p2.callbacks = ctypes.pointer(self.struct_callbacks) self._pipe_args = p self._pipe_args2 = p2 self._pipe_desc = self.lib.sc_pipe_open2(self.dev_desc, USER_CALLBACKS, p2)
[docs] def do_measurement(self, time_ms): self.lib.sc_tdc_start_measure2(self.dev_desc, time_ms) time.sleep(time_ms / 1000.0) # sleep expects floating point seconds while self.lib.sc_tdc_get_status2(self.dev_desc) == 1: time.sleep(0.01)
[docs] def on_start_of_meas(self): pass
[docs] def on_end_of_meas(self): pass
[docs] def on_millisecond(self): pass
[docs] def on_statistics(self, stats): pass
[docs] def on_tdc_event(self, tdc_events, nr_tdc_events): pass
[docs] def on_dld_event(self, dld_events, nr_dld_events): pass
[docs] def close(self): self.lib.sc_pipe_close2(self.dev_desc, self._pipe_desc)
def _get_voxel_type(depth): if depth == BS8: return ctypes.c_uint8 elif depth == BS16: return ctypes.c_uint16 elif depth == BS32: return ctypes.c_uint32 elif depth == BS64: return ctypes.c_uint64 else: return -1 # * 0x1 start counter, 0x2 time tag, 0x4 subdevice, 0x8 channel, # * 0x10 time since start pulse ("sum"), 0x20 "x" detector coordinate ("dif1"), # * 0x40 "y" detector coordinate ("dif2"), 0x80 master reset counter, # * 0x100 ADC value, 0x200 signal bit. If this function is not called, the
[docs] class HDF5DataSelection: STARTCTR = 0x001 TIMETAG = 0x002 SUBDEVICE = 0x004 CHANNEL = 0x008 TIME = 0x010 X = 0x020 Y = 0x040 MASTER_RESET_CTR = 0x080 ADC = 0x100 SIGNALBIT = 0x200 def __init__(self, value=0): self.value = value
[docs] def add(self, value): self.value = self.value | value
[docs] def remove(self, value): self.value = self.value & (~value)
[docs] class Device(object): def __init__(self, inifilepath="tdc_gpx3.ini", autoinit=True, lib=None): """ Creates a Device object that will use the specified inifilepath during initialization. If autoinit==True, initialize the hardware immediately. lib can be specified to reuse an existing scTDClib object. """ self.inifilepath = inifilepath self.dev_desc = None self.pipes = {} self.eomcb = {} # end of measurement callbacks if lib is None: self.lib = scTDClib() else: self.lib = lib if autoinit: self.initialize()
[docs] def initialize(self): """ Initialize the hardware. Returns a tuple, containing an error code and a human-readable error message (zero and empty string in case of success). """ retcode = self.lib.sc_tdc_init_inifile(self.inifilepath) if retcode < 0: return (retcode, self.lib.sc_get_err_msg(retcode)) else: self.dev_desc = retcode # register end of measurement callback if not hasattr(self, "_eomcbfobj"): def _eomcb(privptr, reason): for i in self.eomcb.keys(): self.eomcb[i](reason) self._eomcbfobj = CB_COMPLETE(_eomcb) # extend lifetime! ret2 = self.lib.sc_tdc_set_complete_callback2(self.dev_desc, None, self._eomcbfobj) if ret2 < 0: print("Registering measurement-complete callback failed") print(" message:", self.lib.sc_get_err_msg(ret2)) return (0, "")
[docs] def deinitialize(self): """ Deinitialize the hardware. Returns a tuple, containing an error code and a human-readable error message (zero and empty string in case of success). """ if self.dev_desc is None or self.dev_desc < 0: return (0, "") # don't argue if there is nothing to do retcode = self.lib.sc_tdc_deinit2(self.dev_desc) if retcode < 0: return (retcode, self.lib.sc_get_err_msg(retcode)) else: try: pipekeys = [x for x in self.pipes.keys()] for p in pipekeys: del self.pipes[p] except: traceback.print_exc() traceback.print_stack() self.dev_desc = None return (0, "")
[docs] def is_initialized(self): """Returns True, if the device is initialized""" return self.dev_desc is not None
[docs] def do_measurement(self, time_ms=100, synchronous=False): """ Perform a measurement. If synchronous is True, block until the measurement has finished. Returns a tuple (0, "") in case of success, or a negative error code and a string with the error message. """ retcode = self.lib.sc_tdc_start_measure2(self.dev_desc, time_ms) if retcode < 0: return (retcode, self.lib.sc_get_err_msg(retcode)) else: if synchronous: time.sleep(time_ms / 1000.0) while self.lib.sc_tdc_get_status2(self.dev_desc) == 1: time.sleep(0.01) return (0, "")
[docs] def interrupt_measurement(self): """ Interrupt a measurement that was started with synchronous=False. Returns a tuple (0, "") in case of success, or a negative error code and a string with the error message. """ retcode = self.lib.sc_tdc_interrupt2(self.dev_desc) return (retcode, self.lib.sc_get_err_msg(retcode))
[docs] def add_end_of_measurement_callback(self, cb): """ Adds a callback function for the end of measurement, the callback function needs to accept one argument which indicates the reason for callback. Notification via callback is useful if you want to use do_measurement(...) with synchronous=False, for example in GUIs that need to be responsive during measurement. This functions returns a non-negative ID for the callback that can be used for later removal, or -1 on error (that would be a bug, though). """ for i in range(len(self.eomcb), -1, -1): if not i in self.eomcb: self.eomcb[i] = cb return i return -1
[docs] def remove_end_of_measurement_callback(self, id_of_cb): """ Removes a previously added callback function for the end of measurement by its ID. This function returns 0 on success, or -1 if the id_of_cb is unknown. """ if id_of_cb in self.eomcb: del self.eomcb[id_of_cb] return 0 else: return -1
def _make_new_pipe(self, typestr, par, parent): for i in range(1000): if i not in self.pipes: self.pipes[i] = Pipe(typestr, par, parent) return (i, self.pipes[i]) return None def _add_img_pipe_impl(self, depth, modulo, binning, roi, typestr): par = sc_pipe_dld_image_xyt_params_t() par.depth = depth par.channel = -1 par.modulo = modulo par.binning.x = binning[0] par.binning.y = binning[1] par.binning.time = binning[2] par.roi.offset.x = roi[0][0] par.roi.offset.y = roi[1][0] par.roi.offset.time = roi[2][0] par.roi.size.x = roi[0][1] par.roi.size.y = roi[1][1] par.roi.size.time = roi[2][1] par.accumulation_ms = 1 << 31 pipe = self._make_new_pipe(typestr, par, self) if pipe is None: return (-1, "Too many pipes open") else: return pipe # return id and object
[docs] def add_3d_pipe(self, depth, modulo, binning, roi): """ Adds a 3D pipe (x,y,time) with static buffer. Returns a tuple containing a non-negative pipe ID on success and the Pipe object --- or negative error code and a string containing the error message. depth is either BS8, BS16, BS32, BS64 and determines the voxel bit width. modulo unequal zero applies a modulo operation to the time before sorting it into the 3D buffer. Note that the modulo value is in digital time units _times_ 32. binning is a triple specifying the binning in x, y, and time and each entry has to be a power of 2. roi is a triple of (offset,size) pairs specifying ranges in the x, y, and time axis. The 3D buffer is organized such that a point (x,y,time_slice) is addressed by x + y * size_x + time_slice * size_x * size_y. When getting a numpy array view/copy of the buffer, the 'F' (Fortran) indexing order can be chosen, such that the indices are intuitively ordered x, y, time. """ return self._add_img_pipe_impl(depth, modulo, binning, roi, typestr="3d")
[docs] def add_xy_pipe(self, depth, modulo, binning, roi): """ Adds a 2D pipe (x,y) with static buffer. Returns a tuple containing a non-negative pipe ID on success and the Pipe object --- or negative error code and a string containing the error message. depth is either BS8, BS16, BS32, BS64 and determines the voxel bit width. modulo unequal zero has the effect that the time value is transformed by the modulo operation and the transformed value is checked whether it is within the specified integration range, given in the third element of the roi parameter. Note that the modulo value is in digital time units _times_ 32. binning is a triple specifying the binning in x, y, and time and each entry has to be a power of 2. roi is a triple of (offset,size) pairs specifying ranges in the x, y, and time axis. The 2D buffer is organized such that a point (x,y) is addressed by x + y * size_x. When getting a numpy array view/copy of the buffer, the 'F' (Fortran) indexing order can be chosen, such that the indices are intuitively ordered x, y. The binning in time has an influence only on the time units in the roi. The time part in the roi specifies the integration range, such that only events inside this time range are inserted into the data buffer. """ return self._add_img_pipe_impl(depth, modulo, binning, roi, typestr="xy")
[docs] def add_xt_pipe(self, depth, modulo, binning, roi): """ Adds a 2D pipe (x,t) with static buffer. Returns a tuple containing a non-negative pipe ID on success and the Pipe object --- or negative error code and a string containing the error message. depth is either BS8, BS16, BS32, BS64 and determines the voxel bit width. modulo unequal zero applies a modulo operation to the time before sorting it into the 2D buffer. Note that the modulo value is in digital time units _times_ 32. binning is a triple specifying the binning in x, y, and time and each entry has to be a power of 2. roi is a triple of (offset,size) pairs specifying ranges in the x, y, and time axis. The 2D buffer is organized such that a point (x,time) is addressed by x + time * size_x. When getting a numpy array view/copy of the buffer, the 'F' (Fortran) indexing order can be chosen, such that the indices are intuitively ordered x, time. The binning in y has an influence only on the y units in the roi. The y part in the roi specifies the integration range, such that only events inside this y range are inserted into the data buffer. """ return self._add_img_pipe_impl(depth, modulo, binning, roi, typestr="xt")
[docs] def add_yt_pipe(self, depth, modulo, binning, roi): """ Adds a 2D pipe (y,t) with static buffer. Returns a tuple containing a non-negative pipe ID on success and the Pipe object --- or negative error code and a string containing the error message. depth is either BS8, BS16, BS32, BS64 and determines the voxel bit width. modulo unequal zero applies a modulo operation to the time before sorting it into the 2D buffer. Note that the modulo value is in digital time units _times_ 32. binning is a triple specifying the binning in x, y, and time and each entry has to be a power of 2. roi is a triple of (offset,size) pairs specifying ranges in the x, y, and time axis. The 2D buffer is organized such that a point (y,time) is addressed by y + time * size_y. When getting a numpy array view/copy of the buffer, the 'F' (Fortran) indexing order can be chosen, such that the indices are intuitively ordered y, time. The binning in x has an influence only on the x units in the roi. The x part in the roi specifies the integration range, such that only events inside this x range are inserted into the data buffer. """ return self._add_img_pipe_impl(depth, modulo, binning, roi, typestr="yt")
[docs] def add_t_pipe(self, depth, modulo, binning, roi): """ Adds a 1D time histogram pipe, integrated over a rectangular region in the (x,y) plane (for delay-line detectors) with static buffer. Returns a tuple containing a non-negative pipe ID on success and the Pipe object --- or negative error code and a string containing the error message. depth is either BS8, BS16, BS32, BS64 and determines the bit width of the intensity entries. modulo unequal zero applies a modulo operation to the time before sorting it into the 1D array. Note that the modulo value is in digital time units _times_ 32. binning is a triple specifying the binning in x, y, and time and each entry has to be a power of 2. roi is a triple of (offset,size) pairs specifying ranges in the x, y, and time axis. The buffer is a 1D array of the intensity values for all resolved time bins. The binning in x and y has an influence only on the x and y units in the roi. The x and y parts in the roi specify the integration ranges, such that only events inside the x and y ranges are inserted into the data buffer. """ return self._add_img_pipe_impl(depth, modulo, binning, roi, typestr="t")
[docs] def add_statistics_pipe(self): """ Adds a pipe for statistics data (what is shown in rate meters). The statistics data is only updated at the end of each measurement. Returns a tuple with a non-negative pipe ID and the Pipe object in case of success --- or a negative error code and a string containing the error message in case of error.""" par = sc_pipe_statistics_params_t() pipe = self._make_new_pipe("stat", par, self) if pipe is None: return (-1, "Too many pipes open") else: return pipe # return id and object
[docs] def add_tdc_histo_pipe(self, depth, channel, modulo, binning, offset, size): """ Adds a pipe for time histograms from a stand-alone TDC. Returns a tuple containing a non-negative pipe ID on success and the Pipe object --- or negative error code and a string containing the error message. depth is either BS8, BS16, BS32, BS64 and determines the bit width of the histogram values. The channel parameter selects a channel of the TDC, or accepts all channels if channel==-1. modulo unequal zero applies a modulo operation to the time before sorting it into the 1D array. Note that the modulo value is in digital time units _times_ 32. Binning is an integer number and has to be a power of 2. offset and size specify the range of the time axis. The size is equal to the number of entries in the histogram. The original time value is first transformed by modulo, then by binning, then the offset is subtracted and in the last step it is clipped to the size value followed by insertion into the histogram.""" par = sc_pipe_tdc_histo_params_t() par.depth = depth par.channel = channel par.modulo = modulo par.binning = binning par.offset = offset par.size = size par.accumulation_ms = 1 << 31 pipe = self._make_new_pipe("tdch", par, self) if pipe is None: return (-1, "Too many pipes open") else: return pipe # return id and object
[docs] def remove_pipe(self, pipeid): """ Remove a pipe specified by the pipe ID as returned in the first entry of a tuple by all add_XXX_pipe functions. Note that deinitialize will remove all pipes, as well.""" try: self.pipes[pipeid].close() except KeyError: return -1 try: del self.pipes[pipeid] except KeyError: return -1 return 0
[docs] def hdf5_enable(self): """ Attempts to load the scTDC_hdf5 library which implements event streaming to HDF5 files. This should be called only once with an initialized Device. Returns (True, "") if HDF5 is enabled or (False, error_message) """ if not hasattr(self, 'libh5'): try: self.libh5 = scTDC_hdf5lib() except OSError: traceback.print_exc() return (False, "Loading of scTDC_hdf5 library failed") if self.dev_desc >= 0: result = self.libh5.sc_tdc_hdf5_create() if result >= 0: self.h5obj = result # store the HDF5 streamer instance handle r2 = self.libh5.sc_tdc_hdf5_connect(self.h5obj, self.dev_desc) if r2 == 0: return (True, "") else: return (False, "Instantiation of HDF5 streamer failed") else: return (False, "Need initialized device to enable hdf5 streaming")
[docs] def hdf5_disable(self): """Disconnects the HDF5 streamer instance from this Device. Does not return anything""" if not hasattr(self, 'libh5') or not hasattr(self, 'h5obj'): return self.libh5.sc_tdc_hdf5_disconnect(self.h5obj)
[docs] def hdf5_open(self, filepath, comment, data_selection): """ Opens an HDF5 file and activates streaming. Comment is a string that will be included as the '/user_comment' attribute in the HDF5 file. data_selection is an instance of the HDF5DataSelection class and specifies the selection of data fields from DLD events to be included in the HDF5 file (as separate 1D datasets such that in each dataset, the first event is represented by the first element of the dataset and so on). (Technical remark: Internally, this involves creation of a thread and registration of a USER_CALLBACKS pipe, done by the scTDC_hdf5 library.) Returns (True, "") if successful or (False, error_message) """ if not hasattr(self, 'libh5') or not hasattr(self, 'h5obj'): return (False, "HDF5 streaming is not enabled.") if self.libh5.sc_tdc_hdf5_isactive(self.h5obj) > 0: return (False, "an HDF5 file is already open") self.libh5.cfg_outfile(self.h5obj, filepath) self.libh5.cfg_comment(self.h5obj, comment) self.libh5.sc_tdc_hdf5_cfg_datasel(self.h5obj, data_selection.value) r1 = self.libh5.sc_tdc_hdf5_setactive(self.h5obj, 1) if r1 == 1: return (True, "") elif r1 == 0: return (False, "Opening HDF5 file failed") else: return (False, "Streaming instance became invalid")
[docs] def hdf5_close(self): """ Writes the remaining, internally buffered events and closes the HDF5 file. (Technical remark: Internally, this also involves termination of a thread and deregistration of a USER_CALLBACKS pipe). Returns (True, "") if successful or (False, error_message) """ if not hasattr(self, 'libh5') or not hasattr(self, 'h5obj'): return (False, "HDF5 streaming is not enabled.") r1 = self.libh5.sc_tdc_hdf5_setactive(self.h5obj, 0) if r1 == 0: return (True, "") else: return (False, "Error")
[docs] def hdf5_lib_version(self): if not hasattr(self, 'libh5') or not hasattr(self, 'h5obj'): return "HDF5 streaming is not enabled." else: return self.libh5.version()
[docs] class Pipe(object): """ Pipe objects are used to let the scTDC library construct 1D, 2D, 3D histograms from DLD events occuring during measurements, or to collect statistics data after measurements. They are preferably created via the Device object and its add_XXX_pipe functions. """ def __init__(self, typestr, par, parent): """Constructs a pipe object. typestr must be one of '3d', 'xy', 'xt', 'yt', 't', 'stat'. par is of the sc_pipe_dld_image_xyt_params_t type, or in case of typestr=='stat', of the sc_pipe_statistics_params_t type. parent must be a Device object. Creates the data buffer and opens the pipe in the scTDC library for the parent device. """ self.par = par self.parent = parent self.typestr = typestr self.handle = None self.buf = None self.bufptr = None self.bufsize = None self.par.allocator_owner = None self.pipetypeconst = None # --------------------------------------------------------------------- # --- statistics case ----------------------------------------- # --------------------------------------------------------------------- if self.typestr == 'stat': self.pipetypeconst = STATISTICS self.par.allocator_cb = self._get_stat_allocator() retcode, errmsg = self.reopen() if retcode < 0: print("scTDC.Pipe.__init__ : error during creation:\n" + " ({}) {}".format(errmsg, retcode)) return # --------------------------------------------------------------------- self.nrvoxels = None self.voxeltype = _get_voxel_type(self.par.depth) if self.typestr == '3d': self.nrvoxels = par.roi.size.x * par.roi.size.y * par.roi.size.time self.pipetypeconst = DLD_IMAGE_3D elif self.typestr == 'xy': self.nrvoxels = par.roi.size.x * par.roi.size.y self.pipetypeconst = DLD_IMAGE_XY elif self.typestr == 'xt': self.nrvoxels = par.roi.size.x * par.roi.size.time self.pipetypeconst = DLD_IMAGE_XT elif self.typestr == 'yt': self.nrvoxels = par.roi.size.y * par.roi.size.time self.pipetypeconst = DLD_IMAGE_YT elif self.typestr == 't': self.nrvoxels = par.roi.size.time self.pipetypeconst = DLD_SUM_HISTO elif self.typestr == 'tdch': self.nrvoxels = par.size self.pipetypeconst = TDC_HISTO if self.nrvoxels is not None: self.par.allocator_cb = self._get_allocator(self.nrvoxels, self.voxeltype) retcode, errmsg = self.reopen() if retcode < 0: print("scTDC.Pipe.__init__ : error during creation:\n" + " ({}) {}".format(errmsg, retcode)) def _get_allocator(self, nrvoxels, voxeltype): if self.buf is not None and self.nrvoxels != nrvoxels: return None # already have a buffer, cannot change elif self.buf is None: self.buf = (voxeltype * nrvoxels)() # fixed-size array of voxeltype self.bufptr = ctypes.POINTER(type(self.buf))(self.buf) self.nrvoxels = nrvoxels self.bufsize = nrvoxels * ctypes.sizeof(voxeltype) if not hasattr(self, '_allocatorfunc'): def _allocator(privptr, bufptrptr): bufptrptr[0] = ctypes.cast(self.bufptr, ctypes.c_void_p) return 0 self._allocatorfunc = ALLOCATORFUNC(_allocator) return self._allocatorfunc def _get_stat_allocator(self): if not hasattr(self, '_stat_allocfunc'): self.buf = statistics_t() self.bufptr = ctypes.POINTER(type(self.buf))(self.buf) self.bufsize = ctypes.sizeof(statistics_t) def _stat_alloc(privptr, bufptrptr): bufptrptr[0] = ctypes.cast(self.bufptr, ctypes.c_void_p) return 0 self._stat_allocfunc = ALLOCATORFUNC(_stat_alloc) return self._stat_allocfunc
[docs] def is_open(self): """ Returns True if the pipe is active in the scTDC library (i.e. the library will access the data buffer and increment voxels on incoming events. """ return self.handle is not None
[docs] def reopen(self, force=False): """ Open a pipe with previous parameters, if currently not open. Use force=True, if the pipe had not been explicitly closed, but the device had been deinitialized, causing an implicit destruction of the pipe (implicit desctruction only happens through low-level API calls, whereas Device.deinitialize will close all pipe objects and delete its references to them).""" if self.handle is not None and not force: return retcode = self.parent.lib.sc_pipe_open2(self.parent.dev_desc, self.pipetypeconst, self.par) if retcode < 0: return (retcode, self.parent.lib.sc_get_err_msg(retcode)) else: self.handle = retcode return (0, "")
[docs] def close(self): """ Closes the pipe such that no events are sorted into the data buffer anymore. The data buffer remains unchanged. In that sense, closing acts more like setting the pipe inactive and you can reopen it, later. The data buffer can only be garbage-collected after deleting the pipe object via the parent device and discarding all other references to the Pipe object, as well. """ retcode = self.parent.lib.sc_pipe_close2(self.parent.dev_desc, self.handle) if retcode < 0: return (retcode, self.parent.lib.sc_get_err_msg(retcode)) else: self.handle = None return (0, "")
def _reshape(self, a): if self.typestr == '3d': return np.reshape(a, (self.par.roi.size.x, self.par.roi.size.y, self.par.roi.size.time), order='F') elif self.typestr == 'xy': return np.reshape(a, (self.par.roi.size.x, self.par.roi.size.y), order='F') elif self.typestr == 'xt': return np.reshape(a, (self.par.roi.size.x, self.par.roi.size.time), order='F') elif self.typestr == 'yt': return np.reshape(a, (self.par.roi.size.y, self.par.roi.size.time), order='F') elif self.typestr == 't' or self.typestr == 'tdch': # return np.reshape(a, (self.par.roi.size.time,)) return a # buffer is already 1D, needs no reshaping
[docs] def get_buffer_view(self): """ For 1D, 2D, 3D pipes, returns a numpy array of the data buffer, constructed without copying. As a consequence, changes to the data buffer, made by the scTDC library after getting the buffer view, will be visible to the numpy array returned from this function. The indexing is in Fortran order, i.e. x, y, time. If the pipe is a statistics pipe, return the statistics_t object which may be modified subsequently by the scTDC. """ if self.typestr == 'stat': return self.buf else: return self._reshape(np.ctypeslib.as_array(self.buf))
[docs] def get_buffer_copy(self): """ For 1D, 2D, 3D pipes, returns a numpy array of a copy of the data buffer. The indexing is in Fortran order, i.e. x, y, time. If the pipe is a statistics pipe, return a copy of the statistics_t object. """ if self.typestr == 'stat': return copy_statistics(self.buf) else: return self._reshape(np.array(self.buf, copy=True))
[docs] def clear(self): """ Set all voxels of the data buffer to zero """ ctypes.memset(self.buf, 0, self.bufsize)