Skip to content

Commit cd66e77

Browse files
Add: Guard mechanism against all-redundant state (#1311)
If the session ancillary seq_id or any other session timestamp somehow moves backward in time, the MTL RX will wait indefinitely for that threshold to be reached. While this state should never occur under normal conditions, we can implement a threshold guard that accepts the packet after a while.
1 parent 5ee67a3 commit cd66e77

File tree

5 files changed

+80
-6
lines changed

5 files changed

+80
-6
lines changed

lib/src/st2110/st_header.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@
7878

7979
#define ST_SESSION_STAT_TIMEOUT_US (10)
8080

81+
#define ST_SESSION_REDUNDANT_ERROR_THRESHOLD (20)
82+
8183
#define ST_SESSION_STAT_INC(s, struct, stat) \
8284
do { \
8385
(s)->stat++; \
@@ -637,6 +639,11 @@ struct st_rx_video_session_impl {
637639
/* rtp info */
638640
struct rte_ring* rtps_ring;
639641

642+
/* Redundant packet threshold guard: Accept packets after error threshold
643+
* to prevent deadlock when streams reset or have large timestamp jumps.
644+
* Handles edge case of 2^31 timestamp wraparound (highly unlikely). */
645+
int redundant_error_cnt[MTL_SESSION_PORT_MAX];
646+
640647
/* record two frames in case pkts out of order within marker */
641648
struct st_rx_video_slot_impl slots[ST_VIDEO_RX_REC_NUM_OFO];
642649
int slot_idx;
@@ -1020,6 +1027,11 @@ struct st_rx_audio_session_impl {
10201027
int session_seq_id; /* global session seq id to track continuity across redundant */
10211028
int latest_seq_id[MTL_SESSION_PORT_MAX]; /* latest seq id */
10221029

1030+
/* Redundant packet threshold guard: Accept packets after error threshold
1031+
* to prevent deadlock when streams reset or have large timestamp jumps.
1032+
* Handles edge case of 2^31 timestamp wraparound (highly unlikely). */
1033+
int redundant_error_cnt[MTL_SESSION_PORT_MAX];
1034+
10231035
uint32_t first_pkt_rtp_ts; /* rtp time stamp for the first pkt */
10241036
int64_t tmstamp;
10251037
size_t frame_recv_size;
@@ -1188,6 +1200,12 @@ struct st_rx_ancillary_session_impl {
11881200
int session_seq_id; /* global session seq id to track continuity across redundant */
11891201
int latest_seq_id[MTL_SESSION_PORT_MAX]; /* latest seq id */
11901202

1203+
/* Redundant packet threshold guard: Accept packets after error threshold
1204+
* to prevent deadlock when streams reset or have large timestamp or seq_id jumps.
1205+
* Handles edge case of 2^31 timestamp wraparound (highly unlikely)
1206+
* and 2^15 seq_id wraparound (unlikely). */
1207+
int redundant_error_cnt[MTL_SESSION_PORT_MAX];
1208+
11911209
struct mt_rtcp_rx* rtcp_rx[MTL_SESSION_PORT_MAX];
11921210

11931211
int64_t tmstamp;
@@ -1361,6 +1379,12 @@ struct st_rx_fastmetadata_session_impl {
13611379
int session_seq_id; /* global session seq id to track continuity across redundant */
13621380
int latest_seq_id[MTL_SESSION_PORT_MAX]; /* latest seq id */
13631381

1382+
/* Redundant packet threshold guard: Accept packets after error threshold
1383+
* to prevent deadlock when streams reset or have large timestamp or seq_id jumps.
1384+
* Handles edge case of 2^31 timestamp wraparound (highly unlikely)
1385+
* and 2^15 seq_id wraparound (unlikely). */
1386+
int redundant_error_cnt[MTL_SESSION_PORT_MAX];
1387+
13641388
struct mt_rtcp_rx* rtcp_rx[MTL_SESSION_PORT_MAX];
13651389

13661390
/* the timestamp */

lib/src/st2110/st_rx_ancillary_session.c

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,20 @@ static int rx_ancillary_session_handle_pkt(struct mtl_main_impl* impl,
156156
s_port, tmstamp, s->tmstamp);
157157
}
158158

159+
s->redundant_error_cnt[s_port]++;
159160
ST_SESSION_STAT_INC(s, port_user_stats, stat_pkts_redundant);
160-
return -EIO;
161+
162+
for (int i = 0; i < s->ops.num_port; i++) {
163+
if (s->redundant_error_cnt[i] < ST_SESSION_REDUNDANT_ERROR_THRESHOLD) {
164+
return -EIO;
165+
}
166+
}
167+
warn(
168+
"%s(%d), redundant error threshold reached, accept packet seq %u (old seq_id "
169+
"%d), timestamp %u (old timestamp %ld)\n",
170+
__func__, s->idx, seq_id, s->session_seq_id, tmstamp, s->tmstamp);
161171
}
172+
s->redundant_error_cnt[s_port] = 0;
162173

163174
/* hole in seq id packets going into the session check if the seq_id of the session is
164175
* consistent */

lib/src/st2110/st_rx_audio_session.c

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,15 @@ static int rx_audio_session_handle_frame_pkt(struct mtl_main_impl* impl,
311311
dbg("%s(%d,%d), drop as pkt seq_id %u (%u) or tmstamp %u (%ld) is old\n", __func__,
312312
s->idx, s_port, seq_id, s->latest_seq_id[s_port], tmstamp, s->tmstamp);
313313
ST_SESSION_STAT_INC(s, port_user_stats, stat_pkts_redundant);
314-
return -EIO;
314+
for (int i = 0; i < s->ops.num_port; i++) {
315+
if (s->redundant_error_cnt[i] < ST_SESSION_REDUNDANT_ERROR_THRESHOLD) {
316+
return -EIO;
317+
}
318+
}
319+
warn("%s(%d), redundant error threshold reached, accept packet tmstamp (%d) %ld\n",
320+
__func__, s->idx, tmstamp, s->tmstamp);
315321
}
322+
s->redundant_error_cnt[s_port] = 0;
316323
s->tmstamp = tmstamp;
317324

318325
/* hole in seq id packets going into the session check if the seq_id of the session is
@@ -470,8 +477,17 @@ static int rx_audio_session_handle_rtp_pkt(struct mtl_main_impl* impl,
470477
s->idx, s_port, seq_id, s->latest_seq_id[s_port], tmstamp, s->tmstamp);
471478
s->stat_pkts_redundant++;
472479
ST_SESSION_STAT_INC(s, port_user_stats, stat_pkts_redundant);
473-
return -EIO;
480+
for (int i = 0; i < s->ops.num_port; i++) {
481+
if (s->redundant_error_cnt[i] < ST_SESSION_REDUNDANT_ERROR_THRESHOLD) {
482+
return -EIO;
483+
}
484+
}
485+
486+
/* should never happen */
487+
warn("%s(%d), redundant error threshold reached, accept packet tmstamp (%d) %ld\n",
488+
__func__, s->idx, tmstamp, s->tmstamp);
474489
}
490+
s->redundant_error_cnt[s_port] = 0;
475491
s->tmstamp = tmstamp;
476492

477493
/* hole in seq id packets going into the session check if the seq_id of the session is

lib/src/st2110/st_rx_fastmetadata_session.c

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,17 @@ static int rx_fastmetadata_session_handle_pkt(struct mtl_main_impl* impl,
140140
}
141141

142142
ST_SESSION_STAT_INC(s, port_user_stats, stat_pkts_redundant);
143-
return -EIO;
143+
for (int i = 0; i < s->ops.num_port; i++) {
144+
if (s->redundant_error_cnt[i] < ST_SESSION_REDUNDANT_ERROR_THRESHOLD) {
145+
return -EIO;
146+
}
147+
}
148+
warn(
149+
"%s(%d), redundant error threshold reached, accept packet seq %u (old seq_id "
150+
"%d), timestamp %u (old timestamp %ld)\n",
151+
__func__, s->idx, seq_id, s->session_seq_id, tmstamp, s->tmstamp);
144152
}
153+
s->redundant_error_cnt[s_port] = 0;
145154

146155
/* hole in seq id packets going into the session check if the seq_id of the session is
147156
* consistent */

lib/src/st2110/st_rx_video_session.c

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,8 +1122,16 @@ static struct st_rx_video_slot_impl* rv_slot_by_tmstamp(
11221122
}
11231123

11241124
if (timestamp_is_in_the_past) {
1125-
dbg("%s(%d): tmstamp %u is in the past, drop it\n", __func__, s->idx, tmstamp);
1126-
return NULL;
1125+
for (int i = 0; i < s->ops.num_port; i++) {
1126+
if (s->redundant_error_cnt[i] < ST_SESSION_REDUNDANT_ERROR_THRESHOLD) {
1127+
dbg("%s(%d): tmstamp %u is in the past, drop it\n", __func__, s->idx, tmstamp);
1128+
return NULL;
1129+
}
1130+
}
1131+
1132+
/* should never happen */
1133+
warn("%s(%d), redundant error threshold reached, accept packet tmstamp %d\n",
1134+
__func__, s->idx, tmstamp);
11271135
}
11281136

11291137
dbg("%s(%d): new tmstamp %u\n", __func__, s->idx, tmstamp);
@@ -1561,8 +1569,10 @@ static int rv_handle_frame_pkt(struct st_rx_video_session_impl* s, struct rte_mb
15611569
if (exist_ts && !slot->frame) {
15621570
ST_SESSION_STAT_INC(s, port_user_stats, stat_pkts_redundant_dropped);
15631571
slot->pkts_recv_per_port[s_port]++;
1572+
s->redundant_error_cnt[s_port]++;
15641573
return 0;
15651574
}
1575+
s->redundant_error_cnt[s_port] = 0;
15661576

15671577
if ((!slot || !slot->frame) && !exist_ts) {
15681578
ST_SESSION_STAT_INC(s, port_user_stats, stat_pkts_no_slot);
@@ -1979,8 +1989,10 @@ static int rv_handle_st22_pkt(struct st_rx_video_session_impl* s, struct rte_mbu
19791989
if (exist_ts && !slot->frame) {
19801990
ST_SESSION_STAT_INC(s, port_user_stats, stat_pkts_redundant_dropped);
19811991
slot->pkts_recv_per_port[s_port]++;
1992+
s->redundant_error_cnt[s_port]++;
19821993
return 0;
19831994
}
1995+
s->redundant_error_cnt[s_port] = 0;
19841996

19851997
if ((!slot || !slot->frame) && !exist_ts) {
19861998
ST_SESSION_STAT_INC(s, port_user_stats, stat_pkts_no_slot);
@@ -2150,9 +2162,11 @@ static int rv_handle_hdr_split_pkt(struct st_rx_video_session_impl* s,
21502162
/* Based on rv_slot_by_tmstamp - exist_ts is only true when slot is found */
21512163
if (exist_ts && !slot->frame) {
21522164
ST_SESSION_STAT_INC(s, port_user_stats, stat_pkts_redundant_dropped);
2165+
s->redundant_error_cnt[s_port]++;
21532166
slot->pkts_recv_per_port[s_port]++;
21542167
return 0;
21552168
}
2169+
s->redundant_error_cnt[s_port] = 0;
21562170

21572171
if ((!slot || !slot->frame) && !exist_ts) {
21582172
ST_SESSION_STAT_INC(s, port_user_stats, stat_pkts_no_slot);

0 commit comments

Comments
 (0)