Skip to content
30 changes: 19 additions & 11 deletions examples/synchronization/multi_function/cont_ai_ci_tdms_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import time
from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pathlib import Path

import numpy as np
from nitypes.waveform import AnalogWaveform
from nptdms import ChannelObject, GroupObject, RootObject, TdmsFile, TdmsWriter

Expand All @@ -36,16 +38,16 @@
DEVICE_NAME = "Dev1"

TaskData = tuple[
Sequence[AnalogWaveform], # Analog input: sequence of waveforms
AnalogWaveform, # Counter input: waveform
Sequence[AnalogWaveform[np.float64]], # Analog input: sequence of waveforms
AnalogWaveform[np.float64], # Counter input: waveform
]

data_queue: queue.Queue[Sequence[TaskData]] = queue.Queue(maxsize=10)
data_queue: queue.Queue[TaskData | None] = queue.Queue(maxsize=10)


def producer(
tasks: Sequence[nidaqmx.Task],
data_queue: queue.Queue[Sequence[TaskData]],
data_queue: queue.Queue[TaskData | None],
stop_event: threading.Event,
) -> None:
"""Producer function that reads data from DAQmx tasks and puts it in the queue."""
Expand Down Expand Up @@ -89,7 +91,7 @@ def producer(


def consumer(
data_queue: queue.Queue[Sequence[TaskData]],
data_queue: queue.Queue[TaskData | None],
tdms_path: str,
group_names: Sequence[str],
channel_names: Sequence[Sequence[str]],
Expand Down Expand Up @@ -131,7 +133,11 @@ def consumer(
"wf_increment": waveform.timing.sample_interval.total_seconds(),
"wf_samples": len(waveform.raw_data),
"wf_start_offset": 0.0,
"wf_start_time": waveform.timing.start_time.strftime("%Y-%m-%d %H:%M:%S"),
"wf_start_time": (
waveform.timing.start_time
if isinstance(waveform.timing.start_time, datetime)
else datetime.now()
).strftime("%Y-%m-%d %H:%M:%S"),
},
)
objects_to_write.append(channel)
Expand All @@ -148,9 +154,11 @@ def consumer(
"wf_increment": counter_waveform.timing.sample_interval.total_seconds(),
"wf_samples": len(counter_waveform.raw_data),
"wf_start_offset": 0.0,
"wf_start_time": counter_waveform.timing.start_time.strftime(
"%Y-%m-%d %H:%M:%S"
),
"wf_start_time": (
counter_waveform.timing.start_time
if isinstance(counter_waveform.timing.start_time, datetime)
else datetime.now()
).strftime("%Y-%m-%d %H:%M:%S"),
},
)
objects_to_write.append(channel)
Expand All @@ -170,7 +178,7 @@ def main():
Data is acquired continuously until user presses Enter, then saved to a TDMS file
using a producer-consumer pattern with a queue for thread-safe data transfer.
"""
data_queue = queue.Queue(maxsize=10)
data_queue: queue.Queue[TaskData | None] = queue.Queue(maxsize=10)
stop_event = threading.Event()

ai_task = nidaqmx.Task()
Expand Down Expand Up @@ -207,7 +215,7 @@ def main():
consumer_future = executor.submit(
consumer,
data_queue,
tdms_filepath,
str(tdms_filepath),
["AI_Task", "CI_Task"],
[["Channel01", "Channel02"], ["Counter0"]],
stop_event,
Expand Down