Skip to content

Commit e05ec2d

Browse files
authored
Add: ST40_TX_FLAG_EXACT_USER_PACING (#1343)
The flag works the same as ST20_TX_FLAG_EXACT_USER_PACING: It works together with ST40P_TX_FLAG_USER_PACING and makes the first packet of the frame leave exactly at the user provided timestamp instead of aligning to epochs. Other changes: - Align ancillary pacing with how it is done in video session. - Add exact user pacing noctx tests Addresses: #1318 Signed-off-by: Kasiewicz, Marek <[email protected]>
1 parent ae8921a commit e05ec2d

17 files changed

+415
-166
lines changed

include/st40_api.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@ typedef struct st_rx_ancillary_session_handle_impl* st40_rx_handle;
5959
* If use dedicated queue for TX.
6060
*/
6161
#define ST40_TX_FLAG_DEDICATE_QUEUE (MTL_BIT32(6))
62+
/**
63+
* Flag bit in flags of struct st40_tx_ops.
64+
* Works together with ST40_TX_FLAG_USER_PACING and makes the sender transmit at the
65+
* exact timestamp provided by the user instead of aligning to the internal epoch.
66+
*/
67+
#define ST40_TX_FLAG_EXACT_USER_PACING (MTL_BIT32(7))
6268

6369
/**
6470
* Flag bit in flags of struct st30_rx_ops, for non MTL_PMD_DPDK_USER.

include/st40_pipeline_api.h

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,6 @@ enum st40p_tx_flag {
7070
* lib will wait until timestamp is reached for each frame.
7171
*/
7272
ST40P_TX_FLAG_USER_PACING = (MTL_BIT32(3)),
73-
/**
74-
* Drop frames when the mtl reports late frames (transport can't keep up).
75-
* When late frame is detected, next frame from pipeline is ommited.
76-
* Untill we resume normal frame sending.
77-
*/
78-
ST40P_TX_FLAG_DROP_WHEN_LATE = (MTL_BIT32(7)),
7973
/**
8074
* Flag bit in flags of struct st40_tx_ops.
8175
* If enabled, lib will assign the rtp timestamp to the value in
@@ -92,11 +86,22 @@ enum st40p_tx_flag {
9286
* If use dedicated queue for TX.
9387
*/
9488
ST40P_TX_FLAG_DEDICATE_QUEUE = (MTL_BIT32(6)),
89+
/**
90+
* Drop frames when the mtl reports late frames (transport can't keep up).
91+
* When late frame is detected, next frame from pipeline is ommited.
92+
* Untill we resume normal frame sending.
93+
*/
94+
ST40P_TX_FLAG_DROP_WHEN_LATE = (MTL_BIT32(7)),
9595
/**
9696
* NOT SUPPORTED YET
9797
* Force the numa of the created session, both CPU and memory
9898
*/
9999
ST40P_TX_FLAG_FORCE_NUMA = (MTL_BIT32(8)),
100+
/**
101+
* Works together with ST40P_TX_FLAG_USER_PACING and makes the first packet of the
102+
* frame leave exactly at the user provided timestamp instead of aligning to epochs.
103+
*/
104+
ST40P_TX_FLAG_EXACT_USER_PACING = (MTL_BIT32(9)),
100105
/** Enable the st40p_tx_get_frame block behavior to wait until a frame becomes
101106
available or timeout(default: 1s, use st40p_tx_set_block_timeout to customize)*/
102107
ST40P_TX_FLAG_BLOCK_GET = (MTL_BIT32(15)),

lib/src/st2110/pipeline/st40_pipeline_tx.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ static int tx_st40p_create_transport(struct mtl_main_impl* impl, struct st40p_tx
244244
ops_tx.flags |= ST40_TX_FLAG_USER_TIMESTAMP;
245245

246246
if (ops->flags & ST40P_TX_FLAG_USER_PACING) ops_tx.flags |= ST40_TX_FLAG_USER_PACING;
247+
if (ops->flags & ST40P_TX_FLAG_EXACT_USER_PACING)
248+
ops_tx.flags |= ST40_TX_FLAG_EXACT_USER_PACING;
247249
if (ops->flags & ST40P_TX_FLAG_DROP_WHEN_LATE) {
248250
ops_tx.notify_frame_late = st40p_tx_late_frame_drop;
249251
} else if (ops->notify_frame_late) {

lib/src/st2110/st_header.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -763,7 +763,7 @@ struct st_tx_audio_session_pacing {
763763
uint64_t cur_epochs; /* epoch of current pkt */
764764
/* timestamp for rtp header */
765765
uint32_t rtp_time_stamp;
766-
uint64_t cur_epoch_time;
766+
uint64_t ptp_time_cursor;
767767
/* in ns, tsc time cursor for packet pacing */
768768
uint64_t tsc_time_cursor;
769769
/* ptp time may onward */
@@ -1079,9 +1079,7 @@ struct st_tx_ancillary_session_pacing {
10791079
uint64_t cur_epochs; /* epoch of current frame */
10801080
/* timestamp for rtp header */
10811081
uint32_t rtp_time_stamp;
1082-
/* timestamp for pacing */
1083-
uint32_t pacing_time_stamp;
1084-
uint64_t cur_epoch_time;
1082+
uint64_t ptp_time_cursor;
10851083
double tsc_time_cursor; /* in ns, tsc time cursor for packet pacing */
10861084
/* ptp time may onward */
10871085
uint32_t max_onward_epochs;
@@ -1260,7 +1258,7 @@ struct st_tx_fastmetadata_session_pacing {
12601258
uint32_t rtp_time_stamp;
12611259
/* timestamp for pacing */
12621260
uint32_t pacing_time_stamp;
1263-
uint64_t cur_epoch_time;
1261+
uint64_t ptp_time_cursor;
12641262
double tsc_time_cursor; /* in ns, tsc time cursor for packet pacing */
12651263
/* ptp time may onward */
12661264
uint32_t max_onward_epochs;

lib/src/st2110/st_tx_ancillary_session.c

Lines changed: 126 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
#include "st_tx_ancillary_session.h"
66

7+
#include <math.h>
8+
79
#include "../datapath/mt_queue.h"
810
#include "../mt_log.h"
911
#include "../mt_stat.h"
@@ -223,9 +225,9 @@ static int tx_ancillary_session_init_pacing_epoch(
223225
return 0;
224226
}
225227

226-
static inline double tx_ancillary_pacing_time(
228+
static inline uint64_t tx_ancillary_pacing_time(
227229
struct st_tx_ancillary_session_pacing* pacing, uint64_t epochs) {
228-
return epochs * pacing->frame_time;
230+
return nextafter(epochs * pacing->frame_time, INFINITY);
229231
}
230232

231233
static inline uint32_t tx_ancillary_pacing_time_stamp(
@@ -242,7 +244,12 @@ static uint64_t tx_ancillary_pacing_required_tai(struct st_tx_ancillary_session_
242244
uint64_t required_tai = 0;
243245

244246
if (!(s->ops.flags & ST40_TX_FLAG_USER_PACING)) return 0;
245-
if (!timestamp) return 0;
247+
if (!timestamp) {
248+
if (s->ops.flags & ST40_TX_FLAG_EXACT_USER_PACING) {
249+
err("%s(%d), EXACT_USER_PACING requires non-zero timestamp\n", __func__, s->idx);
250+
}
251+
return 0;
252+
}
246253

247254
if (tfmt == ST10_TIMESTAMP_FMT_MEDIA_CLK) {
248255
if (timestamp > 0xFFFFFFFF) {
@@ -256,86 +263,104 @@ static uint64_t tx_ancillary_pacing_required_tai(struct st_tx_ancillary_session_
256263
return required_tai;
257264
}
258265

259-
static int tx_ancillary_session_sync_pacing(struct mtl_main_impl* impl,
260-
struct st_tx_ancillary_session_impl* s,
261-
bool sync, uint64_t required_tai,
262-
bool second_field) {
266+
static void tx_ancillary_validate_user_timestamp(struct st_tx_ancillary_session_impl* s,
267+
uint64_t requested_epoch,
268+
uint64_t current_epoch) {
269+
if (requested_epoch < current_epoch) {
270+
ST_SESSION_STAT_INC(s, port_user_stats.common, stat_error_user_timestamp);
271+
dbg("%s(%d), user requested transmission time in the past, required_epoch %" PRIu64
272+
", cur_epoch %" PRIu64 "\n",
273+
__func__, s->idx, requested_epoch, current_epoch);
274+
} else if (requested_epoch > current_epoch + (NS_PER_S / s->pacing.frame_time)) {
275+
dbg("%s(%d), requested epoch %" PRIu64
276+
" too far in the future, current epoch %" PRIu64 "\n",
277+
__func__, s->idx, requested_epoch, current_epoch);
278+
ST_SESSION_STAT_INC(s, port_user_stats.common, stat_error_user_timestamp);
279+
}
280+
}
281+
282+
static inline uint64_t tx_ancillary_calc_epoch(struct st_tx_ancillary_session_impl* s,
283+
uint64_t cur_tai, uint64_t required_tai) {
263284
struct st_tx_ancillary_session_pacing* pacing = &s->pacing;
264-
double frame_time = pacing->frame_time;
265-
/* always use MTL_PORT_P for ptp now */
266-
uint64_t ptp_time = mt_get_ptp_time(impl, MTL_PORT_P);
267-
uint64_t next_epochs = pacing->cur_epochs + 1;
268-
uint64_t epochs;
269-
double to_epoch;
270-
bool interlaced = s->ops.interlaced;
285+
uint64_t current_epoch = cur_tai / pacing->frame_time;
286+
uint64_t next_free_epoch = pacing->cur_epochs + 1;
287+
uint64_t epoch = next_free_epoch;
271288

272289
if (required_tai) {
273-
uint64_t ptp_epochs = ptp_time / frame_time;
274-
epochs = (required_tai + frame_time / 2) / frame_time;
275-
dbg("%s(%d), required tai %" PRIu64 " ptp_epochs %" PRIu64 " epochs %" PRIu64 "\n",
276-
__func__, s->idx, required_tai, ptp_epochs, epochs);
277-
if (epochs < ptp_epochs) {
278-
ST_SESSION_STAT_INC(s, port_user_stats.common, stat_error_user_timestamp);
279-
}
280-
} else {
281-
epochs = ptp_time / frame_time;
290+
epoch = (required_tai + pacing->frame_time / 2) / pacing->frame_time;
291+
tx_ancillary_validate_user_timestamp(s, epoch, current_epoch);
282292
}
283293

284-
dbg("%s(%d), epochs %" PRIu64 " %" PRIu64 "\n", __func__, s->idx, epochs,
285-
pacing->cur_epochs);
286-
if (epochs <= pacing->cur_epochs) {
287-
uint64_t diff = pacing->cur_epochs - epochs;
288-
if (diff < pacing->max_onward_epochs) {
289-
/* point to next epoch since if it in the range of onward */
290-
epochs = next_epochs;
294+
if (current_epoch <= next_free_epoch) {
295+
if (next_free_epoch - current_epoch > pacing->max_onward_epochs) {
296+
dbg("%s(%d), onward range exceeded, next_free_epoch %" PRIu64
297+
", current_epoch %" PRIu64 "\n",
298+
__func__, s->idx, next_free_epoch, current_epoch);
299+
ST_SESSION_STAT_ADD(s, port_user_stats.common, stat_epoch_onward,
300+
(next_free_epoch - current_epoch));
291301
}
292-
}
293302

294-
if (interlaced) {
295-
if (second_field) {
296-
ST_SESSION_STAT_INC(s, port_user_stats, stat_interlace_second_field);
297-
} else {
298-
ST_SESSION_STAT_INC(s, port_user_stats, stat_interlace_first_field);
303+
if (!required_tai) epoch = next_free_epoch;
304+
} else {
305+
dbg("%s(%d), frame is late, current_epoch %" PRIu64 " next_free_epoch %" PRIu64 "\n",
306+
__func__, s->idx, current_epoch, next_free_epoch);
307+
ST_SESSION_STAT_ADD(s, port_user_stats.common, stat_epoch_drop,
308+
(current_epoch - next_free_epoch));
309+
310+
if (s->ops.notify_frame_late) {
311+
s->ops.notify_frame_late(s->ops.priv, current_epoch - next_free_epoch);
299312
}
313+
314+
epoch = current_epoch;
300315
}
301316

302-
to_epoch = tx_ancillary_pacing_time(pacing, epochs) - ptp_time;
303-
if (to_epoch < 0) {
317+
return epoch;
318+
}
319+
320+
static int tx_ancillary_session_sync_pacing(struct mtl_main_impl* impl,
321+
struct st_tx_ancillary_session_impl* s,
322+
uint64_t required_tai) {
323+
struct st_tx_ancillary_session_pacing* pacing = &s->pacing;
324+
uint64_t cur_tai = mt_get_ptp_time(impl, MTL_PORT_P);
325+
uint64_t cur_tsc = mt_get_tsc(impl);
326+
uint64_t start_time_tai;
327+
int64_t time_to_tx_ns;
328+
329+
pacing->cur_epochs = tx_ancillary_calc_epoch(s, cur_tai, required_tai);
330+
331+
if ((s->ops.flags & ST40_TX_FLAG_EXACT_USER_PACING) && required_tai) {
332+
start_time_tai = required_tai;
333+
} else {
334+
start_time_tai = tx_ancillary_pacing_time(pacing, pacing->cur_epochs);
335+
}
336+
time_to_tx_ns = (int64_t)start_time_tai - (int64_t)cur_tai;
337+
if (time_to_tx_ns < 0) {
304338
/* time bigger than the assigned epoch time */
305339
ST_SESSION_STAT_INC(s, port_user_stats, stat_epoch_mismatch);
306-
to_epoch = 0; /* send asap */
340+
time_to_tx_ns = 0; /* send asap */
307341
}
308342

309-
if (epochs > next_epochs) {
310-
uint drop = (epochs - next_epochs);
311-
ST_SESSION_STAT_ADD(s, port_user_stats.common, stat_epoch_drop, drop);
343+
pacing->ptp_time_cursor = start_time_tai;
344+
pacing->tsc_time_cursor = (double)cur_tsc + (double)time_to_tx_ns;
345+
dbg("%s(%d), epochs %" PRIu64 " ptp_time_cursor %" PRIu64 " time_to_tx_ns %" PRId64
346+
"\n",
347+
__func__, s->idx, pacing->cur_epochs, pacing->ptp_time_cursor, time_to_tx_ns);
312348

313-
if (s->ops.notify_frame_late) {
314-
s->ops.notify_frame_late(s->ops.priv, drop);
315-
}
316-
}
317-
318-
if (epochs < next_epochs) {
319-
ST_SESSION_STAT_ADD(s, port_user_stats.common, stat_epoch_onward,
320-
(next_epochs - epochs));
321-
}
349+
return 0;
350+
}
322351

323-
pacing->cur_epochs = epochs;
324-
pacing->cur_epoch_time = tx_ancillary_pacing_time(pacing, epochs);
325-
pacing->pacing_time_stamp = tx_ancillary_pacing_time_stamp(pacing, epochs);
326-
pacing->rtp_time_stamp = pacing->pacing_time_stamp;
327-
pacing->tsc_time_cursor = (double)mt_get_tsc(impl) + to_epoch;
328-
dbg("%s(%d), epochs %" PRIu64 " time_stamp %u time_cursor %f to_epoch %f\n", __func__,
329-
s->idx, pacing->cur_epochs, pacing->pacing_time_stamp, pacing->tsc_time_cursor,
330-
to_epoch);
352+
static void tx_ancillary_update_rtp_time_stamp(struct st_tx_ancillary_session_impl* s,
353+
enum st10_timestamp_fmt tfmt,
354+
uint64_t timestamp) {
355+
struct st_tx_ancillary_session_pacing* pacing = &s->pacing;
331356

332-
if (sync) {
333-
dbg("%s(%d), delay to epoch_time %f, cur %" PRIu64 "\n", __func__, s->idx,
334-
pacing->tsc_time_cursor, mt_get_tsc(impl));
335-
mt_tsc_delay_to(impl, pacing->tsc_time_cursor);
357+
if (s->ops.flags & ST40_TX_FLAG_USER_TIMESTAMP) {
358+
pacing->rtp_time_stamp =
359+
st10_get_media_clk(tfmt, timestamp, s->fps_tm.sampling_clock_rate);
360+
} else {
361+
pacing->rtp_time_stamp =
362+
st10_tai_to_media_clk(pacing->ptp_time_cursor, s->fps_tm.sampling_clock_rate);
336363
}
337-
338-
return 0;
339364
}
340365

341366
static int tx_ancillary_session_init_next_meta(struct st_tx_ancillary_session_impl* s,
@@ -619,10 +644,16 @@ static int tx_ancillary_session_rtp_update_packet(struct mtl_main_impl* impl,
619644
second_field = (rfc8331->first_hdr_chunk.f == 0b11) ? true : false;
620645
rfc8331->swapped_first_hdr_chunk = htonl(rfc8331->swapped_first_hdr_chunk);
621646
}
622-
tx_ancillary_session_sync_pacing(impl, s, false, 0, second_field);
623-
}
624-
if (s->ops.flags & ST40_TX_FLAG_USER_TIMESTAMP) {
625-
s->pacing.rtp_time_stamp = ntohl(rtp->tmstamp);
647+
if (s->ops.interlaced) {
648+
if (second_field) {
649+
ST_SESSION_STAT_INC(s, port_user_stats, stat_interlace_second_field);
650+
} else {
651+
ST_SESSION_STAT_INC(s, port_user_stats, stat_interlace_first_field);
652+
}
653+
}
654+
tx_ancillary_session_sync_pacing(impl, s, 0);
655+
tx_ancillary_update_rtp_time_stamp(s, ST10_TIMESTAMP_FMT_MEDIA_CLK,
656+
ntohl(rtp->tmstamp));
626657
}
627658
rtp->tmstamp = htonl(s->pacing.rtp_time_stamp);
628659

@@ -677,10 +708,16 @@ static int tx_ancillary_session_build_packet_chain(struct mtl_main_impl* impl,
677708
second_field = (rfc8331->first_hdr_chunk.f == 0b11) ? true : false;
678709
rfc8331->swapped_first_hdr_chunk = htonl(rfc8331->swapped_first_hdr_chunk);
679710
}
680-
tx_ancillary_session_sync_pacing(impl, s, false, 0, second_field);
681-
}
682-
if (s->ops.flags & ST40_TX_FLAG_USER_TIMESTAMP) {
683-
s->pacing.rtp_time_stamp = ntohl(rtp->base.tmstamp);
711+
if (s->ops.interlaced) {
712+
if (second_field) {
713+
ST_SESSION_STAT_INC(s, port_user_stats, stat_interlace_second_field);
714+
} else {
715+
ST_SESSION_STAT_INC(s, port_user_stats, stat_interlace_first_field);
716+
}
717+
}
718+
tx_ancillary_session_sync_pacing(impl, s, 0);
719+
tx_ancillary_update_rtp_time_stamp(s, ST10_TIMESTAMP_FMT_MEDIA_CLK,
720+
ntohl(rtp->base.tmstamp));
684721
}
685722
rtp->base.tmstamp = htonl(s->pacing.rtp_time_stamp);
686723
rtp->swapped_first_hdr_chunk = htonl(rtp->swapped_first_hdr_chunk);
@@ -847,13 +884,17 @@ static int tx_ancillary_session_tasklet_frame(struct mtl_main_impl* impl,
847884
uint64_t required_tai = tx_ancillary_pacing_required_tai(s, frame->tc_meta.tfmt,
848885
frame->tc_meta.timestamp);
849886
bool second_field = frame->tc_meta.second_field;
850-
tx_ancillary_session_sync_pacing(impl, s, false, required_tai, second_field);
851-
if (ops->flags & ST40_TX_FLAG_USER_TIMESTAMP &&
852-
(frame->ta_meta.tfmt == ST10_TIMESTAMP_FMT_MEDIA_CLK)) {
853-
pacing->rtp_time_stamp = (uint32_t)frame->tc_meta.timestamp;
887+
if (s->ops.interlaced) {
888+
if (second_field) {
889+
ST_SESSION_STAT_INC(s, port_user_stats, stat_interlace_second_field);
890+
} else {
891+
ST_SESSION_STAT_INC(s, port_user_stats, stat_interlace_first_field);
892+
}
854893
}
894+
tx_ancillary_session_sync_pacing(impl, s, required_tai);
895+
tx_ancillary_update_rtp_time_stamp(s, frame->tc_meta.tfmt, frame->tc_meta.timestamp);
855896
frame->tc_meta.tfmt = ST10_TIMESTAMP_FMT_TAI;
856-
frame->tc_meta.timestamp = pacing->cur_epoch_time;
897+
frame->tc_meta.timestamp = pacing->ptp_time_cursor;
857898
frame->tc_meta.rtp_timestamp = pacing->rtp_time_stamp;
858899
/* init to next field */
859900
if (ops->interlaced) {
@@ -1929,6 +1970,13 @@ static int tx_ancillary_ops_check(struct st40_tx_ops* ops) {
19291970
return -EINVAL;
19301971
}
19311972

1973+
if ((ops->flags & ST40_TX_FLAG_EXACT_USER_PACING) &&
1974+
!(ops->flags & ST40_TX_FLAG_USER_PACING)) {
1975+
err("%s, invalid flags 0x%x, need set USER_PACING with EXACT_USER_PACING\n", __func__,
1976+
ops->flags);
1977+
return -EINVAL;
1978+
}
1979+
19321980
return 0;
19331981
}
19341982

lib/src/st2110/st_tx_audio_session.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ static int tx_audio_session_sync_pacing(struct mtl_main_impl* impl,
338338
pacing->cur_epochs = epochs;
339339

340340
if (required_tai) {
341-
pacing->cur_epoch_time = required_tai + pkt_time; // prepare next packet
341+
pacing->ptp_time_cursor = required_tai + pkt_time; // prepare next packet
342342
/*
343343
* Cast [double] to intermediate [uint64_t] to extract 32 least significant bits.
344344
* If calculated time stored in [double] is larger than max uint32_t,
@@ -349,7 +349,7 @@ static int tx_audio_session_sync_pacing(struct mtl_main_impl* impl,
349349
pacing->rtp_time_stamp =
350350
((uint64_t)((required_tai / pkt_time) * pacing->pkt_time_sampling) & 0xffffffff);
351351
} else {
352-
pacing->cur_epoch_time = tx_audio_pacing_time(pacing, epochs);
352+
pacing->ptp_time_cursor = tx_audio_pacing_time(pacing, epochs);
353353
pacing->rtp_time_stamp = tx_audio_pacing_time_stamp(pacing, epochs);
354354
}
355355

@@ -783,7 +783,7 @@ static int tx_audio_session_tasklet_frame(struct mtl_main_impl* impl,
783783
pacing->rtp_time_stamp = (uint32_t)frame->ta_meta.timestamp;
784784
}
785785
frame->ta_meta.tfmt = ST10_TIMESTAMP_FMT_TAI;
786-
frame->ta_meta.timestamp = pacing->cur_epoch_time;
786+
frame->ta_meta.timestamp = pacing->ptp_time_cursor;
787787
frame->ta_meta.rtp_timestamp = pacing->rtp_time_stamp;
788788
s->calculate_time_cursor = false; /* clear */
789789
}

0 commit comments

Comments
 (0)