Skip to content

Commit 017ab50

Browse files
authored
Intermittent producer test scenarios (#584)
* Intermittent producer test scenarios * Due to timing, removing the checks for the in-order buffering ACK check. This might fail in case of slight timing on a sessions edge * Fixing a build issue with cmake related to Policy CMP0054 * Fixing a build issue with cmake related to Policy CMP0054 * Updating C producer to resolve the CMake compile compatibility issue * Trying to fix windows build
1 parent 0fb9254 commit 017ab50

File tree

7 files changed

+269
-4
lines changed

7 files changed

+269
-4
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ open-source/libsrtp
1313
open-source/libusrsctp
1414
open-source/libwebsockets
1515
open-source/local
16+
open-source/libautoconf
1617
outputs
1718
tags
1819
dependency

.travis.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ matrix:
107107
- name: "Windows MSVC"
108108
os: windows
109109
script:
110-
- choco install nasm strawberryperl gstreamer
110+
- choco install nasm strawberryperl
111+
- choco install gstreamer --version=1.16.2
111112
- choco install gstreamer-devel --version=1.16.2 # gstreamer-devel has not been approved yet. Version number must be explicit to install
112113
- unset CC CC_FOR_BUILD CXX CXX_FOR_BUILD # We want to use MSVC
113114
- export "PATH=/c/Strawberry/perl/site/bin:/c/Strawberry/perl/bin:/c/Strawberry/c/bin:/c/Program Files/NASM:`pwd`/open-source/local/lib:`pwd`/open-source/local/bin:$PATH"

CMake/Dependencies/libkvscproducer-CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ include(ExternalProject)
77
# clone repo only
88
ExternalProject_Add(libkvscproducer-download
99
GIT_REPOSITORY https://github.com/awslabs/amazon-kinesis-video-streams-producer-c.git
10-
GIT_TAG ca3401496223bd536faff4eaa8f14eb01c0642f9
10+
GIT_TAG 22edebfd87f5a38ab8af58da9a42f3d8dc7aebe7
1111
SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/kvscproducer-src"
1212
BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}/kvscproducer-build"
1313
CONFIGURE_COMMAND ""

CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ else()
3939
set(LINKAGE SHARED)
4040
endif()
4141

42-
set(KINESIS_VIDEO_PRODUCER_CPP_SRC "${CMAKE_CURRENT_SOURCE_DIR}")
42+
set(KINESIS_VIDEO_PRODUCER_CPP_SRC ${CMAKE_CURRENT_SOURCE_DIR})
4343
set(KINESIS_VIDEO_OPEN_SOURCE_SRC ${CMAKE_CURRENT_SOURCE_DIR}/open-source)
4444

4545
message(STATUS "Kinesis Video Cpp Producer path is ${KINESIS_VIDEO_PRODUCER_CPP_SRC}")
@@ -121,7 +121,7 @@ if (WIN32)
121121
endif()
122122

123123
############# Enable Sanitizers ############
124-
if("${CMAKE_C_COMPILER_ID}" MATCHES "GNU|Clang")
124+
if(${CMAKE_C_COMPILER_ID} MATCHES "GNU|Clang")
125125
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")
126126

127127
if(ADD_MUCLIBC)

tst/ProducerFunctionalityTest.cpp

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,258 @@ TEST_F(ProducerFunctionalityTest, offline_mode_token_rotation_block_on_space) {
325325

326326
}
327327

328+
/**
329+
* Set short max latency in the stream info.
330+
* Send a few fragments then pause as in an intermittent
331+
* scenario. Put an EoFR before pausing to ensure we terminate the fragment on the backend
332+
* which will issue a persistent ACK, causing the state machine to not rollback
333+
* on the next frame produced after the pause. The pause will cause the connection timeout
334+
* which would then set the state machine to change state to issue a new session. The new
335+
* session will not rollback due to the previous session being closed and persisted ACK
336+
* received. If, however, this is not the case, the rollback would cause a latency pressure.
337+
*/
338+
TEST_F(ProducerFunctionalityTest, realtime_intermittent_no_latency_pressure_eofr) {
339+
// Check if it's run with the env vars set if not bail out
340+
if (!access_key_set_) {
341+
return;
342+
}
343+
344+
CreateProducer();
345+
346+
buffering_ack_in_sequence_ = true;
347+
key_frame_interval_ = 60;
348+
total_frame_count_ = 6 * key_frame_interval_;
349+
350+
UINT64 timestamp = 0;
351+
Frame frame;
352+
frame.duration = 16 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND;
353+
frame.frameData = frameBuffer_;
354+
frame.size = SIZEOF(frameBuffer_);
355+
frame.trackId = DEFAULT_TRACK_ID;
356+
Frame eofr = EOFR_FRAME_INITIALIZER;
357+
358+
// Set the value of the frame buffer
359+
MEMSET(frame.frameData, 0x55, SIZEOF(frameBuffer_));
360+
361+
streams_[0] = CreateTestStream(0, STREAMING_TYPE_REALTIME,
362+
15000,
363+
120);
364+
auto kinesis_video_stream = streams_[0];
365+
366+
for(uint32_t i = 0; i < total_frame_count_; i++) {
367+
frame.index = i;
368+
frame.flags = (frame.index % key_frame_interval_ == 0) ? FRAME_FLAG_KEY_FRAME : FRAME_FLAG_NONE;
369+
370+
// Pause on the 5th
371+
if (i == 5 * key_frame_interval_) {
372+
EXPECT_TRUE(kinesis_video_stream->putFrame(eofr));
373+
374+
// Make sure we hit the connection idle timeout
375+
THREAD_SLEEP(60 * HUNDREDS_OF_NANOS_IN_A_SECOND);
376+
}
377+
378+
timestamp = GETTIME();
379+
frame.decodingTs = timestamp;
380+
frame.presentationTs = timestamp;
381+
382+
std::stringstream strstrm;
383+
strstrm << " TID: 0x" << std::hex << GETTID();
384+
LOG_INFO("Putting frame for stream: " << kinesis_video_stream->getStreamName()
385+
<< strstrm.str()
386+
<< " Id: " << frame.index
387+
<< ", Key Frame: "
388+
<< (((frame.flags & FRAME_FLAG_KEY_FRAME) == FRAME_FLAG_KEY_FRAME)
389+
? "true" : "false")
390+
<< ", Size: " << frame.size
391+
<< ", Dts: " << frame.decodingTs
392+
<< ", Pts: " << frame.presentationTs);
393+
394+
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
395+
timestamp += frame_duration_;
396+
397+
THREAD_SLEEP(30 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
398+
}
399+
400+
THREAD_SLEEP(WAIT_5_SECONDS_FOR_ACKS);
401+
LOG_DEBUG("Stopping the stream: " << kinesis_video_stream->getStreamName());
402+
EXPECT_TRUE(kinesis_video_stream->stopSync()) << "Timed out awaiting for the stream stop notification";
403+
EXPECT_FALSE(frame_dropped_) << "Status of frame drop " << frame_dropped_;
404+
EXPECT_EQ(0, latency_pressure_count_) << "Should not have latency pressure events";
405+
EXPECT_TRUE(STATUS_SUCCEEDED(getErrorStatus())) << "Status of stream error " << getErrorStatus();
406+
kinesis_video_producer_->freeStreams();
407+
streams_[0] = nullptr;
408+
}
409+
410+
/**
411+
* Set short max latency in the stream info.
412+
* Set an automatic intermittent producer case handling (intra-frame timeout/closing fragment).
413+
* Send a few fragments then pause as in an intermittent
414+
* scenario. No EoFR produced before pausing.
415+
* Automatic fragment closing fired on timeout to ensure we terminate the fragment on the backend
416+
* which will issue a persistent ACK, causing the state machine to not rollback
417+
* on the next frame produced after the pause. The pause will cause the connection timeout
418+
* which would then set the state machine to change state to issue a new session. The new
419+
* session will not rollback due to the previous session being closed and persisted ACK
420+
* received. If, however, this is not the case, the rollback would cause a latency pressure.
421+
*/
422+
/**
423+
* TODO: This is disabled until we implement the auto fragment closing feature
424+
*/
425+
TEST_F(ProducerFunctionalityTest, DISABLED_realtime_intermittent_no_latency_pressure_auto) {
426+
// Check if it's run with the env vars set if not bail out
427+
if (!access_key_set_) {
428+
return;
429+
}
430+
431+
CreateProducer();
432+
433+
buffering_ack_in_sequence_ = true;
434+
key_frame_interval_ = 60;
435+
total_frame_count_ = 6 * key_frame_interval_;
436+
437+
UINT64 timestamp = 0;
438+
Frame frame;
439+
frame.duration = 16 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND;
440+
frame.frameData = frameBuffer_;
441+
frame.size = SIZEOF(frameBuffer_);
442+
frame.trackId = DEFAULT_TRACK_ID;
443+
444+
// Set the value of the frame buffer
445+
MEMSET(frame.frameData, 0x55, SIZEOF(frameBuffer_));
446+
447+
streams_[0] = CreateTestStream(0, STREAMING_TYPE_REALTIME,
448+
15000,
449+
120);
450+
auto kinesis_video_stream = streams_[0];
451+
452+
for(uint32_t i = 0; i < total_frame_count_; i++) {
453+
frame.index = i;
454+
frame.flags = (frame.index % key_frame_interval_ == 0) ? FRAME_FLAG_KEY_FRAME : FRAME_FLAG_NONE;
455+
456+
// Pause on the 5th
457+
if (i == 5 * key_frame_interval_) {
458+
// Make sure we hit the connection idle timeout
459+
THREAD_SLEEP(60 * HUNDREDS_OF_NANOS_IN_A_SECOND);
460+
}
461+
462+
timestamp = GETTIME();
463+
frame.decodingTs = timestamp;
464+
frame.presentationTs = timestamp;
465+
466+
std::stringstream strstrm;
467+
strstrm << " TID: 0x" << std::hex << GETTID();
468+
LOG_INFO("Putting frame for stream: " << kinesis_video_stream->getStreamName()
469+
<< strstrm.str()
470+
<< " Id: " << frame.index
471+
<< ", Key Frame: "
472+
<< (((frame.flags & FRAME_FLAG_KEY_FRAME) == FRAME_FLAG_KEY_FRAME)
473+
? "true" : "false")
474+
<< ", Size: " << frame.size
475+
<< ", Dts: " << frame.decodingTs
476+
<< ", Pts: " << frame.presentationTs);
477+
478+
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
479+
timestamp += frame_duration_;
480+
481+
THREAD_SLEEP(30 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
482+
}
483+
484+
THREAD_SLEEP(WAIT_5_SECONDS_FOR_ACKS);
485+
LOG_DEBUG("Stopping the stream: " << kinesis_video_stream->getStreamName());
486+
EXPECT_TRUE(kinesis_video_stream->stopSync()) << "Timed out awaiting for the stream stop notification";
487+
EXPECT_FALSE(frame_dropped_) << "Status of frame drop " << frame_dropped_;
488+
EXPECT_EQ(0, latency_pressure_count_) << "Should not have latency pressure events";
489+
EXPECT_TRUE(STATUS_SUCCEEDED(getErrorStatus())) << "Status of stream error " << getErrorStatus();
490+
EXPECT_TRUE(buffering_ack_in_sequence_); // all fragments should be sent
491+
kinesis_video_producer_->freeStreams();
492+
streams_[0] = nullptr;
493+
}
494+
495+
/**
496+
* Set short max latency in the stream info.
497+
* Send a few fragments then pause as in an intermittent
498+
* scenario. No EoFR produced before pausing.
499+
* The backend doesn't terminate the fragment so no ACKs will be issued.
500+
* The pause will cause the connection timeout
501+
* which would then set the state machine to change state to issue a new session. The new
502+
* session will not rollback due to the previous session being closed and persisted ACK
503+
* received.
504+
* As we haven't received a persistent ACK, the rollback will roll it back to the previous
505+
* fragment and re-stream. The previous fragment timestamps are in the past causing latency
506+
* pressure callback to fire.
507+
*/
508+
TEST_F(ProducerFunctionalityTest, realtime_intermittent_latency_pressure) {
509+
// Check if it's run with the env vars set if not bail out
510+
if (!access_key_set_) {
511+
return;
512+
}
513+
514+
KinesisVideoLogger::getInstance().setLogLevel(log4cplus::DEBUG_LOG_LEVEL);
515+
516+
CreateProducer();
517+
518+
buffering_ack_in_sequence_ = true;
519+
key_frame_interval_ = 60;
520+
total_frame_count_ = 6 * key_frame_interval_;
521+
522+
UINT64 timestamp = 0;
523+
Frame frame;
524+
frame.duration = 16 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND;
525+
frame.frameData = frameBuffer_;
526+
frame.size = SIZEOF(frameBuffer_);
527+
frame.trackId = DEFAULT_TRACK_ID;
528+
529+
// Set the value of the frame buffer
530+
MEMSET(frame.frameData, 0x55, SIZEOF(frameBuffer_));
531+
532+
streams_[0] = CreateTestStream(0, STREAMING_TYPE_REALTIME,
533+
15000,
534+
120);
535+
auto kinesis_video_stream = streams_[0];
536+
537+
for(uint32_t i = 0; i < total_frame_count_; i++) {
538+
frame.index = i;
539+
frame.flags = (frame.index % key_frame_interval_ == 0) ? FRAME_FLAG_KEY_FRAME : FRAME_FLAG_NONE;
540+
541+
// Pause on the 5th
542+
if (i == 5 * key_frame_interval_) {
543+
// Make sure we hit the connection idle timeout
544+
THREAD_SLEEP(60 * HUNDREDS_OF_NANOS_IN_A_SECOND);
545+
}
546+
547+
timestamp = GETTIME();
548+
frame.decodingTs = timestamp;
549+
frame.presentationTs = timestamp;
550+
551+
std::stringstream strstrm;
552+
strstrm << " TID: 0x" << std::hex << GETTID();
553+
LOG_INFO("Putting frame for stream: " << kinesis_video_stream->getStreamName()
554+
<< strstrm.str()
555+
<< " Id: " << frame.index
556+
<< ", Key Frame: "
557+
<< (((frame.flags & FRAME_FLAG_KEY_FRAME) == FRAME_FLAG_KEY_FRAME)
558+
? "true" : "false")
559+
<< ", Size: " << frame.size
560+
<< ", Dts: " << frame.decodingTs
561+
<< ", Pts: " << frame.presentationTs);
562+
563+
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
564+
timestamp += frame_duration_;
565+
566+
THREAD_SLEEP(30 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
567+
}
568+
569+
THREAD_SLEEP(WAIT_5_SECONDS_FOR_ACKS);
570+
LOG_DEBUG("Stopping the stream: " << kinesis_video_stream->getStreamName());
571+
EXPECT_TRUE(kinesis_video_stream->stopSync()) << "Timed out awaiting for the stream stop notification";
572+
EXPECT_FALSE(frame_dropped_) << "Status of frame drop " << frame_dropped_;
573+
EXPECT_NE(0, latency_pressure_count_) << "Should fire latency pressure events";
574+
EXPECT_TRUE(STATUS_SUCCEEDED(getErrorStatus())) << "Status of stream error " << getErrorStatus();
575+
EXPECT_FALSE(buffering_ack_in_sequence_);
576+
kinesis_video_producer_->freeStreams();
577+
streams_[0] = nullptr;
578+
}
579+
328580
}
329581
}
330582
}

tst/ProducerTestFixture.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,15 @@ STATUS TestStreamCallbackProvider::droppedFrameReportHandler(UINT64 custom_data,
7070
STATUS TestStreamCallbackProvider::streamLatencyPressureHandler(UINT64 custom_data, STREAM_HANDLE stream_handle, UINT64 duration) {
7171
UNUSED_PARAM(stream_handle);
7272
LOG_WARN("Reporting stream latency pressure. Current buffer duration " << duration);
73+
74+
ProducerTestBase* testBase;
75+
STATUS ret = getProducerTestBase(custom_data, &testBase);
76+
if (STATUS_FAILED(ret)) {
77+
return ret;
78+
}
79+
80+
testBase->latency_pressure_count_++;
81+
7382
return validateCallback(custom_data);
7483
}
7584

tst/ProducerTestFixture.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ class ProducerTestBase : public ::testing::Test {
180180
defaultRegion_(DEFAULT_AWS_REGION),
181181
caCertPath_(""),
182182
error_status_(STATUS_SUCCESS),
183+
latency_pressure_count_(0),
183184
device_storage_size_(TEST_STORAGE_SIZE_IN_BYTES),
184185
fps_(TEST_FPS),
185186
total_frame_count_(TEST_TOTAL_FRAME_COUNT),
@@ -228,6 +229,7 @@ class ProducerTestBase : public ::testing::Test {
228229
std::atomic_bool storage_overflow_;
229230
std::atomic_bool buffering_ack_in_sequence_;
230231
std::atomic_uint error_status_;
232+
std::atomic_uint latency_pressure_count_;
231233
std::map<UPLOAD_HANDLE, uint64_t> previous_buffering_ack_timestamp_;
232234

233235
protected:

0 commit comments

Comments
 (0)