Skip to content
3 changes: 3 additions & 0 deletions include/st30_pipeline_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ struct st30_frame {
/** TAI timestamp measured right after the first packet of the frame was received */
uint64_t receive_timestamp;

/** frame status, set by lib before notify_frame_done: complete or dropped */
enum st_frame_status status;

/** priv pointer for lib, do not touch this */
void* priv;
};
Expand Down
3 changes: 3 additions & 0 deletions include/st40_pipeline_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ struct st40_frame_info {
/** True if the frame was flagged as interlaced (F bits indicate field 1/2). */
bool interlaced;

/** frame status, set by lib before notify_frame_done: complete or dropped */
enum st_frame_status status;

/** priv pointer for lib, do not touch this */
void* priv;
};
Expand Down
4 changes: 3 additions & 1 deletion include/st_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ enum st_fps {
};

/**
* Frame status type of rx streaming
* frame status type for TX done/drop and RX receive result.
*/
enum st_frame_status {
/** All pixels of the frame were received */
Expand All @@ -85,6 +85,8 @@ enum st_frame_status {
ST_FRAME_STATUS_RECONSTRUCTED,
/** Packets were lost */
ST_FRAME_STATUS_CORRUPTED,
/** Frame was dropped */
ST_FRAME_STATUS_DROPPED,
/** Max value of this enum */
ST_FRAME_STATUS_MAX,
};
Expand Down
137 changes: 88 additions & 49 deletions lib/src/st2110/pipeline/st20_pipeline_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
#include "../../mt_stat.h"

static const char* st20p_tx_frame_stat_name[ST20P_TX_FRAME_STATUS_MAX] = {
"free", "ready", "in_converting", "converted", "in_user", "in_transmitting",
"free", "ready", "in_converting", "converted",
"dropped", "in_user", "in_transmitting",
};

static const char* st20p_tx_frame_stat_name_short[ST20P_TX_FRAME_STATUS_MAX] = {
"F", "R", "IC", "C", "U", "T",
"F", "R", "IC", "C", "D", "U", "T",
};

static const char* tx_st20p_stat_name(enum st20p_tx_frame_status stat) {
Expand Down Expand Up @@ -74,16 +75,94 @@ static struct st20p_tx_frame* tx_st20p_newest_available(
return framebuff_newest;
}

/* Check if a CONVERTED frame is late.
*
* If late, the sequence is:
* 1. Park as DROPPED under lock — invisible to both next_frame (CONVERTED) and
* get_frame (FREE), so no other thread can grab the slot while we clean up.
* 2. Unlock before all callbacks so app callbacks cannot deadlock on ctx->lock.
* 3. Fire notify_frame_done — app sees the frame and can safely release any
* external buffer it owns (EXT_FRAME mode).
* 4. Fire notify_frame_late + USDT probe.
* 5. Re-acquire lock, clear ext-buffer pointers on the framebuff so the next
* get_frame call receives a clean slot with no stale external pointers.
* 6. Advance to FREE, unlock, then notify_frame_available — only at this point
* is the slot visible to get_frame callers.
* 7. Re-acquire for the caller's do-while loop and return true.
*
* Must be called with ctx->lock held. Returns with ctx->lock held. */
static bool tx_st20p_if_frame_late(struct st20p_tx_ctx* ctx,
struct st20p_tx_frame* framebuff) {
struct st_frame* frame = tx_st20p_user_frame(ctx, framebuff);
uint32_t rtp_ts; /* captured under lock for use in USDT after unlock */

/* prerequisite: both flags must be set */
if (!(ctx->ops.flags & ST20P_TX_FLAG_DROP_WHEN_LATE) ||
!(ctx->ops.flags & ST20P_TX_FLAG_USER_PACING))
return false;

/* only TAI timestamps can be directly compared against PTP wall time;
* skip the check for other formats (e.g. ST10_TIMESTAMP_FMT_MEDIA_CLK) */
if (frame->tfmt != ST10_TIMESTAMP_FMT_TAI) return false;

uint64_t frame_tai = frame->timestamp;
uint64_t cur_tai = mt_get_ptp_time(ctx->impl, MTL_PORT_P);
/* one frame period as grace window: the pipeline has inherent processing latency
* (conversion + scheduling) between put_frame and get_next_frame. A frame is only
* truly "late" if it missed its TX window by more than one full frame period. */
uint64_t frame_period_ns = (uint64_t)((double)NS_PER_S / st_frame_rate(ctx->ops.fps));

if (cur_tai < frame_tai + frame_period_ns)
return false; /* within acceptable TX window */

dbg("%s(%d), frame %u late by %" PRId64 "ns (> period %" PRIu64 "ns)\n", __func__,
ctx->idx, framebuff->idx, (int64_t)(cur_tai - frame_tai), frame_period_ns);

ctx->stat_drop_frame++;
rtp_ts = frame->rtp_timestamp;
framebuff->stat = ST20P_TX_FRAME_DROPPED;

mt_pthread_mutex_unlock(&ctx->lock);

dbg("%s(%d), frame %u drop late by %" PRIu64 "ns (> period %" PRIu64 "ns), cur %" PRIu64
" frame %" PRIu64 "\n",
__func__, ctx->idx, framebuff->seq_number, cur_tai - frame_tai, frame_period_ns,
cur_tai, frame_tai);

if (ctx->ops.notify_frame_done && !framebuff->frame_done_cb_called) {
frame->status = ST_FRAME_STATUS_DROPPED;
ctx->ops.notify_frame_done(ctx->ops.priv, frame);
framebuff->frame_done_cb_called = true;
}

if (ctx->ops.notify_frame_late) ctx->ops.notify_frame_late(ctx->ops.priv, 0);
MT_USDT_ST20P_TX_FRAME_DROP(ctx->idx, framebuff->idx, rtp_ts);

mt_pthread_mutex_lock(&ctx->lock);

framebuff->stat = ST20P_TX_FRAME_FREE;
mt_pthread_mutex_unlock(&ctx->lock);

tx_st20p_notify_frame_available(ctx);

mt_pthread_mutex_lock(&ctx->lock);
return true; /* frame was dropped, caller should retry */
}

static int tx_st20p_next_frame(void* priv, uint16_t* next_frame_idx,
struct st20_tx_frame_meta* meta) {
struct st20p_tx_ctx* ctx = priv;
struct st20p_tx_frame* framebuff;
struct st_frame* frame;

if (!ctx->ready) return -EBUSY; /* not ready */

mt_pthread_mutex_lock(&ctx->lock);
framebuff = tx_st20p_newest_available(ctx, ST20P_TX_FRAME_CONVERTED);
/* not any converted frame */
do {
framebuff = tx_st20p_newest_available(ctx, ST20P_TX_FRAME_CONVERTED);
if (!framebuff) break; /* no converted frame available */
} while (tx_st20p_if_frame_late(ctx, framebuff));

if (!framebuff) {
mt_pthread_mutex_unlock(&ctx->lock);
return -EBUSY;
Expand All @@ -92,7 +171,7 @@ static int tx_st20p_next_frame(void* priv, uint16_t* next_frame_idx,
framebuff->stat = ST20P_TX_FRAME_IN_TRANSMITTING;
*next_frame_idx = framebuff->idx;

struct st_frame* frame = tx_st20p_user_frame(ctx, framebuff);
frame = tx_st20p_user_frame(ctx, framebuff);
meta->second_field = frame->second_field;
if (ctx->ops.flags & (ST20P_TX_FLAG_USER_PACING | ST20P_TX_FLAG_USER_TIMESTAMP)) {
meta->tfmt = frame->tfmt;
Expand All @@ -103,52 +182,13 @@ static int tx_st20p_next_frame(void* priv, uint16_t* next_frame_idx,
meta->user_meta_size = framebuff->user_meta_data_size;
}

/* point to next */
mt_pthread_mutex_unlock(&ctx->lock);
dbg("%s(%d), frame %u succ, frame_idx: %u\n", __func__, ctx->idx, framebuff->idx,
framebuff->idx);
MT_USDT_ST20P_TX_FRAME_NEXT(ctx->idx, framebuff->idx);
return 0;
}

int st20p_tx_late_frame_drop(void* handle, uint64_t epoch_skipped) {
struct st20p_tx_ctx* ctx = handle;
int cidx = ctx->idx;
struct st20p_tx_frame* framebuff;

if (ctx->type != MT_ST20_HANDLE_PIPELINE_TX) {
err("%s(%d), invalid type %d\n", __func__, cidx, ctx->type);
return 0;
}

if (!ctx->ready) return -EBUSY; /* not ready */
mt_pthread_mutex_lock(&ctx->lock);
framebuff = tx_st20p_newest_available(ctx, ST20P_TX_FRAME_CONVERTED);
/* not any converted frame */
if (!framebuff) {
mt_pthread_mutex_unlock(&ctx->lock);
return -EBUSY;
}

framebuff->stat = ST20P_TX_FRAME_FREE;
ctx->stat_drop_frame++;
dbg("%s(%d), drop frame %u succ\n", __func__, cidx, framebuff->idx);
mt_pthread_mutex_unlock(&ctx->lock);

if (ctx->ops.notify_frame_late) {
ctx->ops.notify_frame_late(ctx->ops.priv, epoch_skipped);
} else if (ctx->ops.notify_frame_done &&
!framebuff->frame_done_cb_called) { /* notify app which frame done */
ctx->ops.notify_frame_done(ctx->ops.priv, tx_st20p_user_frame(ctx, framebuff));
framebuff->frame_done_cb_called = true;
}

/* notify app can get frame */
tx_st20p_notify_frame_available(ctx);
MT_USDT_ST20P_TX_FRAME_DROP(cidx, framebuff->idx, framebuff->dst.rtp_timestamp);
return 0;
}

static int tx_st20p_frame_done(void* priv, uint16_t frame_idx,
struct st20_tx_frame_meta* meta) {
struct st20p_tx_ctx* ctx = priv;
Expand All @@ -174,6 +214,7 @@ static int tx_st20p_frame_done(void* priv, uint16_t frame_idx,

if (ctx->ops.notify_frame_done &&
!framebuff->frame_done_cb_called) { /* notify app which frame done */
frame->status = ST_FRAME_STATUS_COMPLETE;
ctx->ops.notify_frame_done(ctx->ops.priv, frame);
framebuff->frame_done_cb_called = true;
}
Expand Down Expand Up @@ -337,11 +378,7 @@ static int tx_st20p_create_transport(struct mtl_main_impl* impl, struct st20p_tx
if (ctx->derive && ops->flags & ST20P_TX_FLAG_EXT_FRAME)
ops_tx.flags |= ST20_TX_FLAG_EXT_FRAME;
if (ops->flags & ST20P_TX_FLAG_USER_PACING) ops_tx.flags |= ST20_TX_FLAG_USER_PACING;
if (ops->flags & ST20P_TX_FLAG_DROP_WHEN_LATE) {
ops_tx.notify_frame_late = st20p_tx_late_frame_drop;
} else if (ops->notify_frame_late) {
ops_tx.notify_frame_late = ops->notify_frame_late;
}
if (ops->notify_frame_late) ops_tx.notify_frame_late = ops->notify_frame_late;
if (ops->flags & ST20P_TX_FLAG_USER_TIMESTAMP)
ops_tx.flags |= ST20_TX_FLAG_USER_TIMESTAMP;
if (ops->flags & ST20P_TX_FLAG_ENABLE_VSYNC) ops_tx.flags |= ST20_TX_FLAG_ENABLE_VSYNC;
Expand Down Expand Up @@ -606,6 +643,8 @@ static void tx_st20p_framebuffs_flush(struct st20p_tx_ctx* ctx) {

while (1) {
if (framebuff->stat == ST20P_TX_FRAME_FREE) break;
if (framebuff->stat == ST20P_TX_FRAME_DROPPED)
break; /* dropped, effectively free */
if (framebuff->stat == ST20P_TX_FRAME_IN_TRANSMITTING) {
/* make sure transport to finish the transmit */
/* WA to use sleep here, todo: add a transport API to query the stat */
Expand Down
3 changes: 2 additions & 1 deletion lib/src/st2110/pipeline/st20_pipeline_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ enum st20p_tx_frame_status {
ST20P_TX_FRAME_READY,
ST20P_TX_FRAME_IN_CONVERTING, /* for converting */
ST20P_TX_FRAME_CONVERTED,
ST20P_TX_FRAME_IN_USER, /* in user */
ST20P_TX_FRAME_DROPPED, /* converted but arrived too late; recycled in next_frame */
ST20P_TX_FRAME_IN_USER, /* in user */
ST20P_TX_FRAME_IN_TRANSMITTING, /* for transport */
ST20P_TX_FRAME_STATUS_MAX,
};
Expand Down
1 change: 1 addition & 0 deletions lib/src/st2110/pipeline/st22_pipeline_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ static int rx_st22p_frame_ready(void* priv, void* frame,
framebuff->src.pkts_recv[s_port] = framebuff->dst.pkts_recv[s_port] =
meta->pkts_recv[s_port];
}
framebuff->src.status = framebuff->dst.status = meta->status;

/* ask app to consume src frame directly for derive mode */
if (ctx->derive) {
Expand Down
Loading
Loading