Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions examples/nidaqmx - time tased synchronization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import nidaqmx
import time
from nidaqmx.constants import AcquisitionType
from datetime import timedelta, datetime, timezone

# Function to perform the measurement
def perform_measurement():
with nidaqmx.Task() as task:
task.ai_channels.add_ai_voltage_chan("Dev1/ai0")

# Configure finite sampling
num_samples = 1000
task.timing.cfg_samp_clk_timing(1000, sample_mode=AcquisitionType.FINITE, samps_per_chan=num_samples)
Comment on lines +12 to +13
Copy link
Collaborator

@zhindes zhindes Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With my suggested change to the read, this can be simplified

Suggested change
num_samples = 1000
task.timing.cfg_samp_clk_timing(1000, sample_mode=AcquisitionType.FINITE, samps_per_chan=num_samples)
task.timing.cfg_samp_clk_timing(1000, sample_mode=AcquisitionType.FINITE, samps_per_chan=1000)


# Calculate the start time (1 minute from now)
start_time = datetime.now(timezone.utc) + timedelta(minutes=1)
Comment on lines +15 to +16
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 minute is a lot...

Suggested change
# Calculate the start time (1 minute from now)
start_time = datetime.now(timezone.utc) + timedelta(minutes=1)
# Calculate the start time (10 seconds from now)
start_time = datetime.now(timezone.utc) + timedelta(seconds=10)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: I was going to ask whether it's really necessary to specify UTC here, but then I looked up whether datetime.now() returns an "aware" datetime and now I am sad.

https://discuss.python.org/t/datetime-api-no-atomic-way-to-get-aware-current-local-datetime-when-moving-between-time-zones/55041/8
https://stackoverflow.com/questions/24281525/what-is-the-point-of-a-naive-datetime

print(f"Scheduled start time: {start_time}")

# Configure the time start trigger
task.triggers.start_trigger.cfg_time_start_trig(start_time)

print("Starting the task...")
task.start()

print("Waiting for the scheduled start time...")
# Wait until the scheduled start time
Comment on lines +17 to +26
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things:

  1. Generally printing commentary isn't really necessary, but for something that must wait, I like it. But let's not overdo it.
  2. You don't need comments that repeat the code. I like the comment describing how the time delta stuff works out. These feel a bit less useful.
Suggested change
print(f"Scheduled start time: {start_time}")
# Configure the time start trigger
task.triggers.start_trigger.cfg_time_start_trig(start_time)
print("Starting the task...")
task.start()
print("Waiting for the scheduled start time...")
# Wait until the scheduled start time
print(f"Scheduled start time: {start_time}")
task.triggers.start_trigger.cfg_time_start_trig(start_time)
task.start()
print("Waiting for the scheduled start time...")

while datetime.now(timezone.utc) < start_time:
time.sleep(1)
Comment on lines +27 to +28
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a DAQmx API for this, Task.wait_for_valid_timestamp, so you don't need to poll.
https://github.com/ni/nidaqmx-python/blob/master/generated/nidaqmx/task/_task.py#L1028

I think you can wait for either TimestampEvent.FIRST_SAMPLE or TimestampEvent.START_TRIGGER.

I assume you are doing this to avoid getting a read timeout. FYI, another solution is to increase the read timeout to account for the scheduled start time, but that's probably less clear.


# Read data after the task starts
data = task.read(number_of_samples_per_channel=num_samples)
Copy link
Collaborator

@zhindes zhindes Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a more standard read from a finite task:

Suggested change
data = task.read(number_of_samples_per_channel=num_samples)
data = task.read(READ_ALL_AVAILABLE)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note you'll need to import READ_ALL_AVAILABLE or fully specify it


# Perform the measurement
perform_measurement()
Comment on lines +33 to +34
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our examples usually prefer simplicity. I would remove this function and simply put your with ... task: block at the top-level

Loading