Source code for pyccapt.calibration.data_tools.raw_data_surface_concept

from tqdm import tqdm


def _sort_sequence_by_channel(start_counter_values, channel_values, time_values):
    indexed_channels = sorted(enumerate(channel_values), key=lambda item: item[1])
    sorted_indices = [index for index, _ in indexed_channels]
    sorted_channels = [channel_values[index] for index in sorted_indices]
    sorted_time = [time_values[index] for index in sorted_indices]
    sorted_start_counter = [start_counter_values[index] for index in sorted_indices]
    return sorted_channels, sorted_time, sorted_start_counter


def _build_valid_event_flags(sorted_channels):
    length = len(sorted_channels)
    if length < 4:
        return [False]
    if length == 4:
        return [sorted_channels == [0, 1, 2, 3]]

    valid_event = []
    for index_valid_event in range((length + 3) // 4):
        chunk = sorted_channels[index_valid_event * 4:(index_valid_event + 1) * 4]
        valid_event.append(chunk == [0, 1, 2, 3])
    return valid_event


def _normalize_sequence(current_sequence, ch, time):
    sc = current_sequence.copy()
    length = len(current_sequence)
    if length <= 4:
        sorted_channels, sorted_time, sorted_start_counter = _sort_sequence_by_channel(sc, ch, time)
        valid_event = _build_valid_event_flags(sorted_channels)
        return sorted_channels, sorted_time, sorted_start_counter, valid_event

    ch_sorted = []
    index = []
    k = 0
    j = 0
    while len(ch) != len(ch_sorted):
        if j == len(ch):
            j = 0
            k = k + 1
            if k == 4:
                k = 0

        if ch[j] == k:
            if j not in index:
                index.append(j)
                ch_sorted.append(ch[index[-1]])
                k = k + 1
                if k == 4:
                    k = 0
                j = 0
                continue
        j = j + 1

    sorted_time = [time[idx] for idx in index]
    sorted_start_counter = [sc[idx] for idx in index]
    valid_event = _build_valid_event_flags(ch_sorted)
    return ch_sorted, sorted_time, sorted_start_counter, valid_event


[docs] def find_consecutive_sequences_seperatly(start_counter, channel, time_data, high_voltage, pulse): """" find the consecutive sequences of the start counter and the corresponding channels Args: start_counter: list of start counter values channel: list of channel values time_data: list of time data values high_voltage: list of high voltage values pulse: list of pulse values Return: result_4: list of dictionaries containing the valid sequences of 4 channels result_4_invalid: list of dictionaries containing the invalid sequences of 4 channels result_3_invalid: list of dictionaries containing the invalid sequences of 3 channels result_2_invalid: list of dictionaries containing the invalid sequences of 2 channels result_1_invalid: list of dictionaries containing the invalid sequences of 1 channels result_other_odd: list of dictionaries containing the sequences of odd length result_other_even: list of dictionaries containing the sequences of even length """ result_4 = [] result_4_invalid = [] result_3_invalid = [] result_2_invalid = [] result_1_invalid = [] result_other_odd = [] result_other_even = [] current_sequence = [] ch = [] time = [] current_start = 0 # for i, value in enumerate(start_counter): for i, value in tqdm(enumerate(start_counter), desc="Processing", total=len(start_counter)): hv = high_voltage[i] pulse_c = pulse[i] if i == 0: current_sequence.append(value) ch.append(channel[i]) time.append(time_data[i]) continue if current_sequence[-1] == value: current_sequence.append(value) ch.append(channel[i]) time.append(time_data[i]) else: length = len(current_sequence) ch, time, sc, valid_event = _normalize_sequence(current_sequence, ch, time) if length == 4: if valid_event[0]: result_4.append({ 'channels': ch, 'time_data': time, 'start_counter': sc, 'valid_event': valid_event, 'high_voltage': hv, 'pulse': pulse_c, 'indices': (current_start, i - 1), 'length': length }) else: result_4_invalid.append({ 'channels': ch, 'time_data': time, 'start_counter': sc, 'valid_event': valid_event, 'high_voltage': hv, 'pulse': pulse_c, 'indices': (current_start, i - 1), 'length': length }) elif length == 3: result_3_invalid.append({ 'channels': ch, 'time_data': time, 'start_counter': sc, 'valid_event': valid_event, 'high_voltage': hv, 'pulse': pulse_c, 'indices': (current_start, i - 1), 'length': length }) elif length == 2: result_2_invalid.append({ 'channels': ch, 'time_data': time, 'start_counter': sc, 'valid_event': valid_event, 'high_voltage': hv, 'pulse': pulse_c, 'indices': (current_start, i - 1), 'length': length }) elif length == 1: result_1_invalid.append({ 'channels': ch, 'time_data': time, 'start_counter': sc, 'valid_event': valid_event, 'high_voltage': hv, 'pulse': pulse_c, 'indices': (current_start, i - 1), 'length': length }) else: if length % 4 == 0: result_other_even.append({ 'channels': ch, 'time_data': time, 'start_counter': sc, 'valid_event': valid_event, 'high_voltage': hv, 'pulse': pulse_c, 'indices': (current_start, i - 1), 'length': length }) else: result_other_odd.append({ 'channels': ch, 'time_data': time, 'start_counter': sc, 'valid_event': valid_event, 'high_voltage': hv, 'pulse': pulse_c, 'indices': (current_start, i - 1), 'length': length }) current_sequence = [value] ch = [channel[i]] time = [time_data[i]] current_start = i # print(f"Processing: {i} / {len(start_counter)}") # Handle the last sequence length = len(current_sequence) ch, time, sc, valid_event = _normalize_sequence(current_sequence, ch, time) if length == 4: if valid_event[0]: result_4.append({ 'channels': ch, 'time_data': time, 'start_counter': sc, 'valid_event': valid_event, 'high_voltage': hv, 'pulse': pulse_c, 'indices': (current_start, len(start_counter) - 1), 'length': length, }) else: result_4_invalid.append({ 'channels': ch, 'time_data': time, 'start_counter': sc, 'valid_event': valid_event, 'high_voltage': hv, 'pulse': pulse_c, 'indices': (current_start, len(start_counter) - 1), 'length': length, }) elif length == 3: result_3_invalid.append({ 'channels': ch, 'time_data': time, 'start_counter': sc, 'valid_event': valid_event, 'high_voltage': hv, 'pulse': pulse_c, 'indices': (current_start, len(start_counter) - 1), 'length': length, }) elif length == 2: result_2_invalid.append({ 'channels': ch, 'time_data': time, 'start_counter': sc, 'valid_event': valid_event, 'high_voltage': hv, 'pulse': pulse_c, 'indices': (current_start, len(start_counter) - 1), 'length': length, }) elif length == 1: result_1_invalid.append({ 'channels': ch, 'time_data': time, 'start_counter': sc, 'valid_event': valid_event, 'high_voltage': hv, 'pulse': pulse_c, 'indices': (current_start, len(start_counter) - 1), 'length': length, }) else: if length % 4 == 0: result_other_even.append({ 'channels': ch, 'time_data': time, 'start_counter': sc, 'valid_event': valid_event, 'high_voltage': hv, 'pulse': pulse_c, 'indices': (current_start, len(start_counter) - 1), 'length': length, }) else: result_other_odd.append({ 'channels': ch, 'time_data': time, 'start_counter': sc, 'valid_event': valid_event, 'high_voltage': hv, 'pulse': pulse_c, 'indices': (current_start, len(start_counter) - 1), 'length': length, }) lenght_result_4 = len(result_4) * 4 length_result_4_invalid = len(result_4_invalid) * 4 lenght_result_3 = len(result_3_invalid) * 3 lenght_result_2 = len(result_2_invalid) * 2 lenght_result_1 = len(result_1_invalid) * 1 lenght_result_other_odd = sum(item['length'] for item in result_other_odd) lenght_result_other_even = sum(item['length'] for item in result_other_even) print(f"Length of 4 channel: {lenght_result_4 / 4}, {lenght_result_4 / len(start_counter) * 100} %") print( f"Length of 4 channel (invalid): {length_result_4_invalid / 4}, {length_result_4_invalid / len(start_counter) * 100} %") print(f"Length of 3 channel: {lenght_result_3 / 3}, {lenght_result_3 / len(start_counter) * 100} %") print(f"Length of 2 channel: {lenght_result_2 / 2}, {lenght_result_2 / len(start_counter) * 100} %") print(f"Length of 1 channel: {lenght_result_1}, {lenght_result_1 / len(start_counter) * 100} %") print( f"Length of groups of four channel (multihit): {lenght_result_other_even}, {lenght_result_other_even / len(start_counter) * 100} %") print( f"Length of not group of four channel (multihit): {lenght_result_other_odd}, {lenght_result_other_odd / len(start_counter) * 100} %") # Check the conditions total_length = (lenght_result_4 + length_result_4_invalid + lenght_result_3 + lenght_result_2 + lenght_result_1 + lenght_result_other_odd + lenght_result_other_even) assert total_length == len( start_counter), "The total length of the sequences is not equal to the length of the array" print(f"Total length: {total_length}") return (result_4, result_4_invalid, result_3_invalid, result_2_invalid, result_1_invalid, result_other_odd, result_other_even)
[docs] def find_consecutive_sequences(start_counter, channel, time_data, high_voltage, pulse, print_stats=False): """" Find the consecutive sequences of the start counter and the corresponding channels Args: start_counter: list of start counter values channel: list of channel values time_data: list of time data values high_voltage: list of high voltage values pulse: list of pulse values print_stats: bool, print the statistics of the sequences Return: result: list of dictionaries containing the sequences """ result = [] current_sequence = [] ch = [] time = [] current_start = 0 # for i, value in enumerate(start_counter): for i, value in tqdm(enumerate(start_counter), desc="Processing", total=len(start_counter)): hv = high_voltage[i] pulse_c = pulse[i] if i == 0: current_sequence.append(value) ch.append(channel[i]) time.append(time_data[i]) continue if current_sequence[-1] == value: current_sequence.append(value) ch.append(channel[i]) time.append(time_data[i]) else: length = len(current_sequence) ch, time, sc, valid_event = _normalize_sequence(current_sequence, ch, time) result.append({ 'channels': ch, 'time_data': time, 'start_counter': sc, 'valid_event': valid_event, 'high_voltage': hv, 'pulse': pulse_c, 'indices': (current_start, i - 1), 'length': length }) current_sequence = [value] ch = [channel[i]] time = [time_data[i]] current_start = i # Handle the last sequence length = len(current_sequence) ch, time, sc, valid_event = _normalize_sequence(current_sequence, ch, time) result.append({ 'channels': ch, 'time_data': time, 'start_counter': sc, 'valid_event': valid_event, 'high_voltage': hv, 'pulse': pulse_c, 'indices': (current_start, len(start_counter) - 1), 'length': length, }) if print_stats: lenght_result_4 = len([x for x in result if x['length'] == 4]) length_result_4_invalid = len([x for x in result if x['length'] == 4 and x['valid_event'] == [False]]) lenght_result_3 = len([x for x in result if x['length'] == 3]) lenght_result_2 = len([x for x in result if x['length'] == 2]) lenght_result_1 = len([x for x in result if x['length'] == 1]) # lenght_result_other_even = len([x for x in result if x['length'] > 4 and x['length'] % 4 == 0]) # lenght_result_other_odd = len([x for x in result if x['length'] > 4 and x['length'] % 4 != 0]) lenght_result_other_even = 0 for i in range(len(result)): if result[i]['length'] % 4 == 0 and result[i]['length'] > 4: lenght_result_other_even += result[i]['length'] lenght_result_other_odd = 0 for i in range(len(result)): if result[i]['length'] % 4 != 0 and result[i]['length'] > 4: lenght_result_other_odd += result[i]['length'] # length of valid events if results that has lenth bigger than 4 lenght_of_valid_even_events = len([x for x in result if x['length'] > 4 and x['valid_event'] == [True]]) print(f"Length of 4 channel: {lenght_result_4}, {lenght_result_4 * 4 / len(start_counter) * 100} %") print( f"Length of 4 channel (invalid): {length_result_4_invalid}, {length_result_4_invalid * 4 / len(start_counter) * 100} %") print(f"Length of 3 channel: {lenght_result_3}, {lenght_result_3 * 3 / len(start_counter) * 100} %") print(f"Length of 2 channel: {lenght_result_2}, {lenght_result_2 * 2 / len(start_counter) * 100} %") print(f"Length of 1 channel: {lenght_result_1}, {lenght_result_1 * 1 / len(start_counter) * 100} %") print( f"Length of groups of four channel (multi hit): {lenght_result_other_even}, {lenght_result_other_even / len(start_counter) * 100} %") print( f"Length of not group of four and one less than 4 channel (multi hit): {lenght_result_other_odd}, {lenght_result_other_odd / len(start_counter) * 100} %") # Check the conditions total_length = (lenght_result_4 * 4 + lenght_result_3 * 3 + lenght_result_2 * 2 + lenght_result_1 * 1 + lenght_result_other_odd + lenght_result_other_even) assert total_length == len( start_counter), "The total length of the sequences is not equal to the length of the array" print(f"Total length: {total_length}") return result
[docs] def find_nth_max_repeated_indices(nums, n): """ Find the start and end indices of the longest repeated sequence in the list. Args: nums: n: Returns: """ while True: max_count = 0 max_number = None start_index = None end_index = None current_count = 0 current_number = None for i, num in enumerate(nums): if num != -1: if num != current_number: current_number = num current_count = 1 else: current_count += 1 if current_count > max_count: max_count = current_count max_number = current_number start_index = i - current_count + 1 end_index = i n = n - 1 if n < 0: break nums[start_index:end_index + 1] = [-1] * max_count return start_index, end_index, max_count, max_number