Skip to content

Commit b99b37e

Browse files
kvssink PutFrame to Return Status Rather than Bool (#1260) Old develop branch PR: #1119
* PutFrame to Return Status Rather than Bool * add warn on bus, have put frame return actual status * CI tests account for changed putFrame return value * More CI putFrame checks update * Uncomment if block * Update GST warning type * Update .gitignore * Update gstkvssink.cpp --------- Co-authored-by: Hassan Sahibzada <[email protected]> * Remove duplicate line in .gitignore * Create a new function for backwards compat, address comments --------- Co-authored-by: Hassan Sahibzada <[email protected]>
1 parent 27c6a8d commit b99b37e

10 files changed

+63
-37
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ open-source/
1010
outputs
1111
tags
1212
dependency
13-
.vs
13+
.vs
14+
.vscode/

samples/kvs_gstreamer_audio_video_sample.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ static GstFlowReturn on_new_sample(GstElement *sink, CustomData *data) {
501501
create_kinesis_video_frame(&frame, std::chrono::nanoseconds(buffer->pts), std::chrono::nanoseconds(buffer->dts),
502502
kinesis_video_flags, info.data, info.size, track_id);
503503

504-
data->kinesis_video_stream->putFrame(frame);
504+
data->kinesis_video_stream->statusPutFrame(frame);
505505

506506
// Sample to demonstrate how event metadata tags can be generated for fragment(s)
507507
// Ref: https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/notifications.html

samples/kvs_gstreamer_multistream_sample.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,11 +196,11 @@ void create_kinesis_video_frame(Frame *frame, const nanoseconds &pts, const nano
196196
frame->trackId = DEFAULT_TRACK_ID;
197197
}
198198

199-
bool put_frame(shared_ptr<KinesisVideoStream> kinesis_video_stream, void *data, size_t len, const nanoseconds &pts,
199+
STATUS put_frame(shared_ptr<KinesisVideoStream> kinesis_video_stream, void *data, size_t len, const nanoseconds &pts,
200200
const nanoseconds &dts, FRAME_FLAGS flags) {
201201
Frame frame;
202202
create_kinesis_video_frame(&frame, pts, dts, flags, data, len);
203-
return kinesis_video_stream->putFrame(frame);
203+
return kinesis_video_stream->statusPutFrame(frame);
204204
}
205205

206206
static GstFlowReturn on_new_sample(GstElement *sink, CustomData *data) {

samples/kvs_gstreamer_sample.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -323,10 +323,10 @@ void create_kinesis_video_frame(Frame *frame, const nanoseconds &pts, const nano
323323
frame->trackId = DEFAULT_TRACK_ID;
324324
}
325325

326-
bool put_frame(shared_ptr<KinesisVideoStream> kinesis_video_stream, void *data, size_t len, const nanoseconds &pts, const nanoseconds &dts, FRAME_FLAGS flags) {
326+
STATUS put_frame(shared_ptr<KinesisVideoStream> kinesis_video_stream, void *data, size_t len, const nanoseconds &pts, const nanoseconds &dts, FRAME_FLAGS flags) {
327327
Frame frame;
328328
create_kinesis_video_frame(&frame, pts, dts, flags, data, len);
329-
return kinesis_video_stream->putFrame(frame);
329+
return kinesis_video_stream->statusPutFrame(frame);
330330
}
331331

332332
static GstFlowReturn on_new_sample(GstElement *sink, CustomData *data) {

src/KinesisVideoStream.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,17 @@ KinesisVideoStream::KinesisVideoStream(const KinesisVideoProducer& kinesis_video
1919
}
2020
}
2121

22+
// Added statusPutFrame function but leaving the putFrame function as is to still return a bool for backward compatibility.
2223
bool KinesisVideoStream::putFrame(KinesisVideoFrame& frame) const {
24+
STATUS status = statusPutFrame(frame);
25+
if (STATUS_FAILED(status)) {
26+
return false;
27+
}
28+
29+
return true;
30+
}
31+
32+
STATUS KinesisVideoStream::statusPutFrame(KinesisVideoFrame& frame) const {
2333
if (debug_dump_frame_info_) {
2434
LOG_DEBUG("[" << this->stream_name_ << "] pts: " << frame.presentationTs << ", dts: " << frame.decodingTs << ", duration: " << frame.duration << ", size: " << frame.size << ", trackId: " << frame.trackId
2535
<< ", isKey: " << CHECK_FRAME_FLAG_KEY_FRAME(frame.flags));
@@ -29,7 +39,7 @@ bool KinesisVideoStream::putFrame(KinesisVideoFrame& frame) const {
2939
STATUS status = putKinesisVideoFrame(stream_handle_, &frame);
3040
if (STATUS_FAILED(status)) {
3141
LOG_ERROR("Put frame for " << this->stream_name_ << " failed with 0x" << std::hex << status);
32-
return false;
42+
return status;
3343
}
3444

3545
// Print metrics on every key-frame
@@ -60,9 +70,7 @@ bool KinesisVideoStream::putFrame(KinesisVideoFrame& frame) const {
6070
LOG_ERROR("Failed to get metrics. Error: " << err.what());
6171
}
6272
}
63-
64-
// Even if metrics fail, we do not want to return false for putFrame. We just log the error
65-
return true;
73+
return status;
6674
}
6775

6876
bool KinesisVideoStream::start(const std::string& hexEncodedCodecPrivateData, uint64_t trackId) {

src/KinesisVideoStream.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,14 @@ class KinesisVideoStream {
6262
*/
6363
bool putFrame(KinesisVideoFrame& frame) const;
6464

65+
/**
66+
* Does putFrame, but returns a STATUS rather than a failure/success bool.
67+
*
68+
* @param frame The frame to be packaged and streamed.
69+
* @return STATUS of the putKinesisVideoFrame call.
70+
*/
71+
STATUS statusPutFrame(KinesisVideoFrame& frame) const;
72+
6573
/**
6674
* Gets the stream metrics.
6775
*

src/common/PutFrameHelper.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ PutFrameHelper::PutFrameHelper(
2121
}
2222

2323
void PutFrameHelper::putFrameMultiTrack(Frame frame, bool isVideo) {
24-
if (!kinesis_video_stream->putFrame(frame)) {
24+
if (kinesis_video_stream->statusPutFrame(frame) != STATUS_SUCCESS) {
2525
put_frame_status = false;
2626
LOG_WARN("Failed to put normal frame");
2727
}
@@ -46,7 +46,7 @@ bool PutFrameHelper::putFrameFailed() {
4646

4747
void PutFrameHelper::putEofr() {
4848
Frame frame = EOFR_FRAME_INITIALIZER;
49-
if (!kinesis_video_stream->putFrame(frame)) {
49+
if (kinesis_video_stream->statusPutFrame(frame) != STATUS_SUCCESS) {
5050
put_frame_status = false;
5151
LOG_WARN("Failed to put eofr frame");
5252
}

src/gstreamer/gstkvssink.cpp

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1243,14 +1243,17 @@ void create_kinesis_video_frame(Frame *frame, const nanoseconds &pts, const nano
12431243
frame->trackId = static_cast<UINT64>(track_id);
12441244
}
12451245

1246-
bool put_frame(shared_ptr<KvsSinkCustomData> data, void *frame_data, size_t len, const nanoseconds &pts,
1246+
STATUS
1247+
put_frame(std::shared_ptr<KvsSinkCustomData> data, void *frame_data, size_t len, const nanoseconds &pts,
12471248
const nanoseconds &dts, FRAME_FLAGS flags, uint64_t track_id, uint32_t index) {
12481249

1250+
STATUS put_frame_status = STATUS_SUCCESS;
12491251
Frame frame;
1252+
12501253
create_kinesis_video_frame(&frame, pts, dts, flags, frame_data, len, track_id, index);
1251-
bool ret = data->kinesis_video_stream->putFrame(frame);
1252-
if (data->get_metrics && ret) {
1253-
if (CHECK_FRAME_FLAG_KEY_FRAME(flags) || data->on_first_frame) {
1254+
put_frame_status = data->kinesis_video_stream->statusPutFrame(frame);
1255+
if (data->get_metrics && STATUS_SUCCEEDED(put_frame_status)) {
1256+
if (CHECK_FRAME_FLAG_KEY_FRAME(flags) || data->on_first_frame) {
12541257
KvsSinkMetric *kvs_sink_metric = new KvsSinkMetric();
12551258
kvs_sink_metric->stream_metrics = data->kinesis_video_stream->getMetrics();
12561259
kvs_sink_metric->client_metrics = data->kinesis_video_producer->getMetrics();
@@ -1261,7 +1264,7 @@ bool put_frame(shared_ptr<KvsSinkCustomData> data, void *frame_data, size_t len,
12611264
delete kvs_sink_metric;
12621265
}
12631266
}
1264-
return ret;
1267+
return put_frame_status;
12651268
}
12661269

12671270
static GstFlowReturn
@@ -1279,6 +1282,7 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads,
12791282
uint64_t track_id;
12801283
FRAME_FLAGS kinesis_video_flags = FRAME_FLAG_NONE;
12811284
GstMapInfo info;
1285+
STATUS put_frame_status = STATUS_SUCCESS;
12821286

12831287
info.data = NULL;
12841288
// eos reached
@@ -1389,9 +1393,9 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads,
13891393
}
13901394
}
13911395

1392-
put_frame(kvssink->data, info.data, info.size,
1393-
std::chrono::nanoseconds(buf->pts),
1394-
std::chrono::nanoseconds(buf->dts), kinesis_video_flags, track_id, data->frame_count);
1396+
put_frame_status = put_frame(data, info.data, info.size,
1397+
std::chrono::nanoseconds(buf->pts),
1398+
std::chrono::nanoseconds(buf->dts), kinesis_video_flags, track_id, data->frame_count);
13951399
data->frame_count++;
13961400
} else {
13971401
LOG_WARN("GStreamer buffer is invalid for " << kvssink->stream_name);
@@ -1405,6 +1409,11 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads,
14051409
if (buf != NULL) {
14061410
gst_buffer_unref (buf);
14071411
}
1412+
1413+
if (STATUS_FAILED(put_frame_status)) {
1414+
GST_ELEMENT_WARNING (kvssink, RESOURCE, WRITE, (NULL),
1415+
("put frame error occurred. Status: 0x%08x", put_frame_status));
1416+
}
14081417

14091418
return ret;
14101419
}

tst/ProducerApiTest.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,11 @@ PVOID ProducerTestBase::basicProducerRoutine(KinesisVideoStream* kinesis_video_s
114114
// Simulate EoFr first
115115
if (frame.index % 50 == 0 && frame.index != 0) {
116116
Frame eofr = EOFR_FRAME_INITIALIZER;
117-
EXPECT_TRUE(kinesis_video_stream->putFrame(eofr));
117+
EXPECT_EQ(STATUS_SUCCESS, kinesis_video_stream->statusPutFrame(eofr));
118118
}
119119
#endif
120120

121-
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
121+
EXPECT_EQ(STATUS_SUCCESS, kinesis_video_stream->statusPutFrame(frame));
122122

123123
// Sleep a while for non-offline modes
124124
if (streaming_type != STREAMING_TYPE_OFFLINE) {
@@ -308,7 +308,7 @@ TEST_F(ProducerApiTest, create_produce_start_stop_stream)
308308
<< ", Dts: " << frame.decodingTs
309309
<< ", Pts: " << frame.presentationTs);
310310

311-
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
311+
EXPECT_EQ(STATUS_SUCCESS, kinesis_video_stream->statusPutFrame(frame));
312312

313313
THREAD_SLEEP(frame_duration_);
314314
}
@@ -374,7 +374,7 @@ TEST_F(ProducerApiTest, create_produce_start_stop_stream_endpoint_cached)
374374
<< ", Dts: " << frame.decodingTs
375375
<< ", Pts: " << frame.presentationTs);
376376

377-
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
377+
EXPECT_EQ(STATUS_SUCCESS, kinesis_video_stream->statusPutFrame(frame));
378378

379379
THREAD_SLEEP(frame_duration_);
380380
}
@@ -440,7 +440,7 @@ TEST_F(ProducerApiTest, create_produce_start_stop_stream_all_cached)
440440
<< ", Dts: " << frame.decodingTs
441441
<< ", Pts: " << frame.presentationTs);
442442

443-
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
443+
EXPECT_EQ(STATUS_SUCCESS, kinesis_video_stream->statusPutFrame(frame));
444444

445445
THREAD_SLEEP(frame_duration_);
446446
}
@@ -506,7 +506,7 @@ TEST_F(ProducerApiTest, create_produce_start_stop_reset_stream_endpoint_cached)
506506
<< ", Dts: " << frame.decodingTs
507507
<< ", Pts: " << frame.presentationTs);
508508

509-
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
509+
EXPECT_EQ(STATUS_SUCCESS, kinesis_video_stream->statusPutFrame(frame));
510510

511511
THREAD_SLEEP(frame_duration_);
512512
}
@@ -574,7 +574,7 @@ TEST_F(ProducerApiTest, create_produce_start_stop_reset_stream_all_cached)
574574
<< ", Dts: " << frame.decodingTs
575575
<< ", Pts: " << frame.presentationTs);
576576

577-
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
577+
EXPECT_EQ(STATUS_SUCCESS, kinesis_video_stream->statusPutFrame(frame));
578578

579579
THREAD_SLEEP(frame_duration_);
580580
}

tst/ProducerFunctionalityTest.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ TEST_F(ProducerFunctionalityTest, offline_upload_limited_buffer_duration) {
6868
<< ", Dts: " << frame.decodingTs
6969
<< ", Pts: " << frame.presentationTs);
7070

71-
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
71+
EXPECT_EQ(STATUS_SUCCESS, kinesis_video_stream->statusPutFrame(frame));
7272
timestamp += frame_duration_;
7373
}
7474

@@ -127,7 +127,7 @@ TEST_F(ProducerFunctionalityTest, offline_upload_limited_storage) {
127127
<< ", Dts: " << frame.decodingTs
128128
<< ", Pts: " << frame.presentationTs);
129129

130-
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
130+
EXPECT_EQ(STATUS_SUCCESS, kinesis_video_stream->statusPutFrame(frame));
131131
timestamp += frame_duration_;
132132
}
133133

@@ -192,7 +192,7 @@ TEST_F(ProducerFunctionalityTest, intermittent_file_upload) {
192192
<< ", Dts: " << frame.decodingTs
193193
<< ", Pts: " << frame.presentationTs);
194194

195-
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
195+
EXPECT_EQ(STATUS_SUCCESS, kinesis_video_stream->statusPutFrame(frame));
196196
timestamp += frame_duration_;
197197

198198
// pause at the last frame of each clip
@@ -257,7 +257,7 @@ TEST_F(ProducerFunctionalityTest, high_fragment_rate_file_upload) {
257257
<< ", Dts: " << frame.decodingTs
258258
<< ", Pts: " << frame.presentationTs);
259259

260-
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
260+
EXPECT_EQ(STATUS_SUCCESS, kinesis_video_stream->statusPutFrame(frame));
261261
timestamp += frame_duration_;
262262
}
263263

@@ -316,7 +316,7 @@ TEST_F(ProducerFunctionalityTest, offline_mode_token_rotation_block_on_space) {
316316
<< ", Dts: " << frame.decodingTs
317317
<< ", Pts: " << frame.presentationTs);
318318

319-
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
319+
EXPECT_EQ(STATUS_SUCCESS, kinesis_video_stream->statusPutFrame(frame));
320320
timestamp += frame_duration_;
321321
}
322322

@@ -376,7 +376,7 @@ TEST_F(ProducerFunctionalityTest, realtime_intermittent_no_latency_pressure_eofr
376376

377377
// Pause on the 5th
378378
if (i == 5 * key_frame_interval_) {
379-
EXPECT_TRUE(kinesis_video_stream->putFrame(eofr));
379+
EXPECT_EQ(STATUS_SUCCESS, kinesis_video_stream->statusPutFrame(eofr));
380380

381381
// Make sure we hit the connection idle timeout
382382
THREAD_SLEEP(60 * HUNDREDS_OF_NANOS_IN_A_SECOND);
@@ -398,7 +398,7 @@ TEST_F(ProducerFunctionalityTest, realtime_intermittent_no_latency_pressure_eofr
398398
<< ", Dts: " << frame.decodingTs
399399
<< ", Pts: " << frame.presentationTs);
400400

401-
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
401+
EXPECT_EQ(STATUS_SUCCESS, kinesis_video_stream->statusPutFrame(frame));
402402
timestamp += frame_duration_;
403403

404404
THREAD_SLEEP(30 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
@@ -483,7 +483,7 @@ TEST_F(ProducerFunctionalityTest, DISABLED_realtime_intermittent_no_latency_pres
483483
<< ", Dts: " << frame.decodingTs
484484
<< ", Pts: " << frame.presentationTs);
485485

486-
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
486+
EXPECT_EQ(STATUS_SUCCESS, kinesis_video_stream->statusPutFrame(frame));
487487
timestamp += frame_duration_;
488488

489489
THREAD_SLEEP(30 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
@@ -579,7 +579,7 @@ TEST_F(ProducerFunctionalityTest, realtime_intermittent_latency_pressure) {
579579
<< ", Dts: " << frame.decodingTs
580580
<< ", Pts: " << frame.presentationTs);
581581

582-
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
582+
EXPECT_EQ(STATUS_SUCCESS, kinesis_video_stream->statusPutFrame(frame));
583583

584584
THREAD_SLEEP(frame_duration_);
585585
}
@@ -664,7 +664,7 @@ TEST_F(ProducerFunctionalityTest, realtime_auto_intermittent_latency_pressure) {
664664
<< ", Dts: " << frame.decodingTs
665665
<< ", Pts: " << frame.presentationTs);
666666

667-
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
667+
EXPECT_EQ(STATUS_SUCCESS, kinesis_video_stream->statusPutFrame(frame));
668668

669669
THREAD_SLEEP(frame_duration_);
670670
}

0 commit comments

Comments
 (0)