@@ -248,7 +248,7 @@ void radio_zmq_tx_channel::transmit_samples(span<radio_sample_type> data)
248248 sample_count += count;
249249}
250250
251- bool radio_zmq_tx_channel::align (uint64_t timestamp)
251+ bool radio_zmq_tx_channel::align (uint64_t timestamp, std::chrono::milliseconds timeout )
252252{
253253 unsigned count = 0 ;
254254
@@ -261,6 +261,16 @@ bool radio_zmq_tx_channel::align(uint64_t timestamp)
261261 // Protect concurrent alignment and transmit.
262262 std::unique_lock<std::mutex> lock (transmit_alignment_mutex);
263263
264+ // If the channel has never transmitted, skip wait.
265+ if (is_tx_enabled && (timeout.count () != 0 )) {
266+ // Otherwise, wait for the transmitter to transmit.
267+ bool is_not_timeout = transmit_alignment_cvar.wait_for (lock, timeout, [&]() { return sample_count >= timestamp; });
268+ if (is_not_timeout) {
269+ return sample_count > timestamp;
270+ }
271+ is_tx_enabled = false ;
272+ }
273+
264274 std::array<cf_t , 1024 > zero_buffer = {};
265275 // Transmit zeros until the sample count reaches the timestamp.
266276 while (sample_count < timestamp && state_fsm.is_running ()) {
@@ -283,7 +293,14 @@ void radio_zmq_tx_channel::transmit(span<radio_sample_type> data)
283293 // Protect concurrent alignment and transmit.
284294 std::unique_lock<std::mutex> lock (transmit_alignment_mutex);
285295
296+ // Actual baseband transmission.
286297 transmit_samples (data);
298+
299+ // Indicate that transmit is enabled.
300+ is_tx_enabled = true ;
301+
302+ // Notify transmission.
303+ transmit_alignment_cvar.notify_all ();
287304}
288305
289306void radio_zmq_tx_channel::stop ()
0 commit comments