Skip to content

Commit a4e66b3

Browse files
authored
update producer-c version to support automatic intermittent producer (#640)
* update producer-c version to support automatic intermittent producer * fix formatting * make sure to set min client info version, modify test for automatic intermittnet produced both enabled and disabled case * fix test * fix tests * fix timing of tests, mac osx ones were always having issues but went undetected in travis ci due to lack of assert
1 parent 8ef826a commit a4e66b3

File tree

4 files changed

+110
-12
lines changed

4 files changed

+110
-12
lines changed

CMake/Dependencies/libkvscproducer-CMakeLists.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ include(ExternalProject)
77
# clone repo only
88
ExternalProject_Add(libkvscproducer-download
99
GIT_REPOSITORY https://github.com/awslabs/amazon-kinesis-video-streams-producer-c.git
10-
GIT_TAG 22edebfd87f5a38ab8af58da9a42f3d8dc7aebe7
11-
SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/kvscproducer-src"
10+
GIT_TAG 99df2854e1f4d16f44e824a9b3e4885acde15501
11+
SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/kvscproducer-src"
1212
BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}/kvscproducer-build"
1313
CONFIGURE_COMMAND ""
1414
BUILD_COMMAND ""
1515
INSTALL_COMMAND ""
1616
TEST_COMMAND ""
17-
)
17+
)

src/DefaultDeviceInfoProvider.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ DefaultDeviceInfoProvider::DefaultDeviceInfoProvider(const std::string &custom_u
2727
cert_path_(cert_path){
2828
memset(&device_info_, 0, sizeof(device_info_));
2929
device_info_.version = DEVICE_INFO_CURRENT_VERSION;
30+
device_info_.clientInfo.version = CLIENT_INFO_CURRENT_VERSION;
3031

3132
// Set the device name
3233
const string &device_id = "Kinesis_Video_Device";

tst/ProducerFunctionalityTest.cpp

Lines changed: 100 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,7 @@ TEST_F(ProducerFunctionalityTest, DISABLED_realtime_intermittent_no_latency_pres
493493
}
494494

495495
/**
496+
* This test assumes client info AUTOMATIC_STREAMING_FLAGS set to AUTOMATIC_STREAMING_ALWAYS_CONTINUOUS
496497
* Set short max latency in the stream info.
497498
* Send a few fragments then pause as in an intermittent
498499
* scenario. No EoFR produced before pausing.
@@ -513,15 +514,18 @@ TEST_F(ProducerFunctionalityTest, realtime_intermittent_latency_pressure) {
513514

514515
KinesisVideoLogger::getInstance().setLogLevel(log4cplus::DEBUG_LOG_LEVEL);
515516

516-
CreateProducer();
517+
CreateProducer(false, AUTOMATIC_STREAMING_ALWAYS_CONTINUOUS);
517518

518519
buffering_ack_in_sequence_ = true;
519520
key_frame_interval_ = 60;
520521
total_frame_count_ = 6 * key_frame_interval_;
522+
frame_duration_ = 16 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND;
521523

524+
UINT64 startTime = 0;
522525
UINT64 timestamp = 0;
526+
UINT64 delta = 0;
523527
Frame frame;
524-
frame.duration = 16 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND;
528+
frame.duration = frame_duration_;
525529
frame.frameData = frameBuffer_;
526530
frame.size = SIZEOF(frameBuffer_);
527531
frame.trackId = DEFAULT_TRACK_ID;
@@ -534,17 +538,23 @@ TEST_F(ProducerFunctionalityTest, realtime_intermittent_latency_pressure) {
534538
120);
535539
auto kinesis_video_stream = streams_[0];
536540

541+
startTime = GETTIME();
537542
for(uint32_t i = 0; i < total_frame_count_; i++) {
538543
frame.index = i;
539544
frame.flags = (frame.index % key_frame_interval_ == 0) ? FRAME_FLAG_KEY_FRAME : FRAME_FLAG_NONE;
540545

541546
// Pause on the 5th
542547
if (i == 5 * key_frame_interval_) {
543548
// Make sure we hit the connection idle timeout
549+
UINT64 start = GETTIME();
544550
THREAD_SLEEP(60 * HUNDREDS_OF_NANOS_IN_A_SECOND);
551+
delta = GETTIME() - start;
552+
} else if ( i < 5 * key_frame_interval_ ) {
553+
// This should not flip to false until after the 60s sleep when we start putting frames
554+
EXPECT_TRUE(buffering_ack_in_sequence_);
545555
}
546556

547-
timestamp = GETTIME();
557+
timestamp = startTime + i*frame_duration_ + delta;
548558
frame.decodingTs = timestamp;
549559
frame.presentationTs = timestamp;
550560

@@ -561,9 +571,8 @@ TEST_F(ProducerFunctionalityTest, realtime_intermittent_latency_pressure) {
561571
<< ", Pts: " << frame.presentationTs);
562572

563573
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
564-
timestamp += frame_duration_;
565574

566-
THREAD_SLEEP(30 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
575+
THREAD_SLEEP(frame_duration_);
567576
}
568577

569578
THREAD_SLEEP(WAIT_5_SECONDS_FOR_ACKS);
@@ -577,6 +586,92 @@ TEST_F(ProducerFunctionalityTest, realtime_intermittent_latency_pressure) {
577586
streams_[0] = nullptr;
578587
}
579588

589+
/**
590+
* This test assumes client info AUTOMATIC_STREAMING_FLAGS set to AUTOMATIC_STREAMING_INTERMITTENT_PRODUCER (default)
591+
* Set short max latency in the stream info.
592+
* Send a few fragments then pause as in an intermittent
593+
* scenario. EoFR should be automatically produced after pause
594+
* Backend should NOT timeout so we should get an ACK
595+
*/
596+
TEST_F(ProducerFunctionalityTest, realtime_auto_intermittent_latency_pressure) {
597+
// Check if it's run with the env vars set if not bail out
598+
if (!access_key_set_) {
599+
return;
600+
}
601+
602+
KinesisVideoLogger::getInstance().setLogLevel(log4cplus::DEBUG_LOG_LEVEL);
603+
604+
CreateProducer();
605+
606+
buffering_ack_in_sequence_ = true;
607+
key_frame_interval_ = 60;
608+
total_frame_count_ = 6 * key_frame_interval_;
609+
frame_duration_ = 16 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND;
610+
611+
UINT64 startTime = 0;
612+
UINT64 timestamp = 0;
613+
UINT64 delta = 0;
614+
Frame frame;
615+
frame.duration = frame_duration_;
616+
frame.frameData = frameBuffer_;
617+
frame.size = SIZEOF(frameBuffer_);
618+
frame.trackId = DEFAULT_TRACK_ID;
619+
620+
// Set the value of the frame buffer
621+
MEMSET(frame.frameData, 0x55, SIZEOF(frameBuffer_));
622+
623+
streams_[0] = CreateTestStream(0, STREAMING_TYPE_REALTIME,
624+
15000,
625+
120);
626+
auto kinesis_video_stream = streams_[0];
627+
628+
startTime = GETTIME();
629+
for(uint32_t i = 0; i < total_frame_count_; i++) {
630+
frame.index = i;
631+
frame.flags = (frame.index % key_frame_interval_ == 0) ? FRAME_FLAG_KEY_FRAME : FRAME_FLAG_NONE;
632+
633+
// Pause on the 5th
634+
if (i == 5 * key_frame_interval_) {
635+
// Make sure we hit the connection idle timeout
636+
UINT64 start = GETTIME();
637+
THREAD_SLEEP(60 * HUNDREDS_OF_NANOS_IN_A_SECOND);
638+
delta = GETTIME() - start;
639+
}
640+
641+
timestamp = startTime + i*frame_duration_ + delta;
642+
frame.decodingTs = timestamp;
643+
frame.presentationTs = timestamp;
644+
645+
std::stringstream strstrm;
646+
strstrm << " TID: 0x" << std::hex << GETTID();
647+
LOG_INFO("Putting frame for stream: " << kinesis_video_stream->getStreamName()
648+
<< strstrm.str()
649+
<< " Id: " << frame.index
650+
<< ", Key Frame: "
651+
<< (((frame.flags & FRAME_FLAG_KEY_FRAME) == FRAME_FLAG_KEY_FRAME)
652+
? "true" : "false")
653+
<< ", Size: " << frame.size
654+
<< ", Dts: " << frame.decodingTs
655+
<< ", Pts: " << frame.presentationTs);
656+
657+
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));
658+
659+
THREAD_SLEEP(frame_duration_);
660+
}
661+
662+
THREAD_SLEEP(WAIT_5_SECONDS_FOR_ACKS);
663+
LOG_DEBUG("Stopping the stream: " << kinesis_video_stream->getStreamName());
664+
EXPECT_TRUE(kinesis_video_stream->stopSync()) << "Timed out awaiting for the stream stop notification";
665+
EXPECT_FALSE(frame_dropped_) << "Status of frame drop " << frame_dropped_;
666+
EXPECT_EQ(0, latency_pressure_count_) << "Should fire latency pressure events";
667+
EXPECT_TRUE(STATUS_SUCCEEDED(getErrorStatus())) << "Status of stream error " << getErrorStatus();
668+
EXPECT_TRUE(buffering_ack_in_sequence_);
669+
kinesis_video_producer_->freeStreams();
670+
streams_[0] = nullptr;
671+
}
672+
673+
674+
580675
}
581676
}
582677
}

tst/ProducerTestFixture.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,15 @@ class TestStreamCallbackProvider : public StreamCallbackProvider {
135135

136136
class TestDeviceInfoProvider : public DefaultDeviceInfoProvider {
137137
uint64_t device_storage_size_;
138+
AUTOMATIC_STREAMING_FLAGS automaticStreamingFlags_;
138139
public:
139-
TestDeviceInfoProvider(uint64_t device_storage_size): device_storage_size_(device_storage_size) {}
140+
TestDeviceInfoProvider(uint64_t device_storage_size, AUTOMATIC_STREAMING_FLAGS automaticStreamingFlags): device_storage_size_(device_storage_size), automaticStreamingFlags_(automaticStreamingFlags) {}
140141

141142
device_info_t getDeviceInfo() override {
142143
auto device_info = DefaultDeviceInfoProvider::getDeviceInfo();
143144
device_info.storageInfo.storageSize = (UINT64) device_storage_size_;
144145
device_info.streamCount = TEST_STREAM_COUNT;
146+
device_info.clientInfo.automaticStreamingFlags = automaticStreamingFlags_;
145147
return device_info;
146148
}
147149
};
@@ -307,16 +309,16 @@ class ProducerTestBase : public ::testing::Test {
307309
}
308310

309311

310-
void CreateProducer(bool cachingEndpoingProvider = false) {
312+
void CreateProducer(bool cachingEndpointProvider = false, AUTOMATIC_STREAMING_FLAGS automaticStreamingFlags = AUTOMATIC_STREAMING_INTERMITTENT_PRODUCER) {
311313
// Create the producer client
312314
CreateCredentialProvider();
313-
device_provider_.reset(new TestDeviceInfoProvider(device_storage_size_));
315+
device_provider_.reset(new TestDeviceInfoProvider(device_storage_size_, automaticStreamingFlags));
314316
client_callback_provider_.reset(new TestClientCallbackProvider(this));
315317
stream_callback_provider_.reset(new TestStreamCallbackProvider(this));
316318

317319
try {
318320
std::unique_ptr<DefaultCallbackProvider> defaultCallbackProvider;
319-
if (cachingEndpoingProvider) {
321+
if (cachingEndpointProvider) {
320322
defaultCallbackProvider.reset(new CachingEndpointOnlyCallbackProvider(
321323
move(client_callback_provider_),
322324
move(stream_callback_provider_),

0 commit comments

Comments
 (0)