22#include < mutex>
33#include < chrono>
44#include < condition_variable>
5+ #include < atomic>
6+ #include < csignal>
7+ #include < cstdlib>
58
69#include < gst/gst.h>
710#include < glib.h>
1114using namespace com ::amazonaws::kinesis::video;
1215using namespace log4cplus ;
1316
17+ #define DEFAULT_SAMPLE_DURATION_SECONDS 320
18+
1419/* Modify these values to change start/stop interval. */
1520#define KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS 20
1621#define KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS 40
@@ -43,26 +48,24 @@ void sigint_handler(int sigint) {
4348 LOG_DEBUG (" SIGINT received. Exiting..." );
4449 terminated = TRUE ;
4550 cv.notify_all ();
46- if (main_loop != NULL ) {
51+ if (main_loop != NULL ) {
4752 g_main_loop_quit (main_loop);
4853 }
4954}
5055
51- static gboolean
52- bus_call (GstBus *bus, GstMessage *msg, gpointer data)
53- {
56+ static gboolean bus_call (GstBus *bus, GstMessage *msg, gpointer data) {
5457 GMainLoop *loop = (GMainLoop *) ((CustomData *)data)->main_loop ;
5558 GstElement *pipeline = (GstElement *) ((CustomData *)data)->pipeline ;
5659
57- switch (GST_MESSAGE_TYPE (msg)) {
60+ switch (GST_MESSAGE_TYPE (msg)) {
5861 case GST_MESSAGE_EOS: {
5962 LOG_DEBUG (" [KVS sample] Received EOS message" );
6063 cv.notify_all ();
6164 break ;
6265 }
6366
6467 case GST_MESSAGE_ERROR: {
65- gchar *debug;
68+ gchar *debug;
6669 GError *error;
6770
6871 gst_message_parse_error (msg, &error, &debug);
@@ -90,7 +93,7 @@ void determine_aws_credentials(GstElement *kvssink, char* streamName) {
9093 char const *role_alias;
9194 char const *ca_cert_path;
9295 char const *credential_path;
93- if (nullptr != (iot_credential_endpoint = GETENV (" IOT_GET_CREDENTIAL_ENDPOINT" )) &&
96+ if (nullptr != (iot_credential_endpoint = GETENV (" IOT_GET_CREDENTIAL_ENDPOINT" )) &&
9497 nullptr != (cert_path = GETENV (" CERT_PATH" )) &&
9598 nullptr != (private_key_path = GETENV (" PRIVATE_KEY_PATH" )) &&
9699 nullptr != (role_alias = GETENV (" ROLE_ALIAS" )) &&
@@ -110,7 +113,7 @@ void determine_aws_credentials(GstElement *kvssink, char* streamName) {
110113 gst_structure_free (iot_credentials);
111114 // kvssink will search for long term credentials in envvar automatically so no need to include here
112115 // if no long credentials or IoT credentials provided will look for credential file as last resort.
113- } else if (nullptr != (credential_path = GETENV (" AWS_CREDENTIAL_PATH" ))) {
116+ } else if (nullptr != (credential_path = GETENV (" AWS_CREDENTIAL_PATH" ))) {
114117 LOG_DEBUG (" [KVS sample] Using AWS_CREDENTIAL_PATH long term credentials." );
115118 g_object_set (G_OBJECT (kvssink), " credential-path" , credential_path, NULL );
116119 } else {
@@ -123,9 +126,9 @@ void stopStartLoop(GstElement *pipeline, GstElement *source) {
123126 std::mutex cv_m;
124127 std::unique_lock<std::mutex> lck (cv_m);
125128
126- while (!terminated) {
129+ while (!terminated) {
127130 // Using cv.wait_for to break sleep early upon signal interrupt.
128- if (cv.wait_for (lck, std::chrono::seconds (KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS)) != std::cv_status::timeout) {
131+ if (cv.wait_for (lck, std::chrono::seconds (KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS)) != std::cv_status::timeout) {
129132 break ;
130133 }
131134
@@ -141,7 +144,7 @@ void stopStartLoop(GstElement *pipeline, GstElement *source) {
141144
142145 // Set videotestsrc to paused state because it does not stop producing frames upon EOS,
143146 // and the frames are not cleared upon flushing.
144- if (STRCMPI (GST_ELEMENT_NAME (source), KVS_GST_TEST_SOURCE_NAME) == 0 ) {
147+ if (STRCMPI (GST_ELEMENT_NAME (source), KVS_GST_TEST_SOURCE_NAME) == 0 ) {
145148 gst_element_set_state (source, GST_STATE_PAUSED);
146149 }
147150
@@ -150,26 +153,26 @@ void stopStartLoop(GstElement *pipeline, GstElement *source) {
150153 gst_element_send_event (pipeline, flush_start);
151154
152155 // Using cv.wait_for to break sleep early upon signal interrupt. Checking for termination again before waiting.
153- if (terminated || cv.wait_for (lck, std::chrono::seconds (KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS)) != std::cv_status::timeout) {
156+ if (terminated || cv.wait_for (lck, std::chrono::seconds (KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS)) != std::cv_status::timeout) {
154157 break ;
155158 }
156159
157160 LOG_INFO (" [KVS sample] Starting stream to KVS for " << KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS << " seconds" );
158161
159162 // Stop the flush now that we are resuming streaming.
160- GstEvent* flush_stop = gst_event_new_flush_stop (true );
163+ GstEvent* flush_stop = gst_event_new_flush_stop (TRUE );
161164 gst_element_send_event (pipeline, flush_stop);
162165
163166 // Set videotestsrc back to playing state.
164- if (STRCMPI (GST_ELEMENT_NAME (source), KVS_GST_TEST_SOURCE_NAME) == 0 ) {
167+ if (STRCMPI (GST_ELEMENT_NAME (source), KVS_GST_TEST_SOURCE_NAME) == 0 ) {
165168 gst_element_set_state (source, GST_STATE_PLAYING);
166169 }
167170 }
171+
168172 LOG_DEBUG (" [KVS sample] Exited stopStartLoop" );
169173}
170174
171- int main (int argc, char *argv[])
172- {
175+ int main (int argc, char *argv[]) {
173176 signal (SIGINT, sigint_handler);
174177
175178 CustomData customData;
@@ -180,58 +183,67 @@ int main(int argc, char *argv[])
180183 StreamSource source_type;
181184 char stream_name[MAX_STREAM_NAME_LEN + 1 ] = {0 };
182185
186+ int runtime_duration_seconds = DEFAULT_SAMPLE_DURATION_SECONDS;
187+
183188 gst_init (&argc, &argv);
184189
185190
186191 /* Parse input arguments */
187192
188193 // Check for invalid argument count, get stream name.
189- if (argc > 3 ) {
194+ if (argc > 3 ) {
190195 LOG_ERROR (" [KVS sample] Invalid argument count, too many arguments." );
191- LOG_INFO (" [KVS sample] Usage: " << argv[0 ] << " <streamName (optional)> <testsrc or devicesrc (optional)>" );
196+ LOG_INFO (" [KVS sample] Usage: " << argv[0 ] << " <streamName (optional)> <testsrc or devicesrc (optional)> <runtime seconds (optional)> " );
192197 return -1 ;
193- } else if (argc > 1 ) {
198+ } else if (argc > 1 ) {
194199 STRNCPY (stream_name, argv[1 ], MAX_STREAM_NAME_LEN);
195200 }
196201
197202 // Get source type.
198- if (argc > 2 ) {
199- if (0 == STRCMPI (argv[2 ], " testsrc" )) {
203+ if (argc > 2 ) {
204+ if (0 == STRCMPI (argv[2 ], " testsrc" )) {
200205 LOG_INFO (" [KVS sample] Using test source (videotestsrc)" );
201206 source_type = TEST_SOURCE;
202- } else if (0 == STRCMPI (argv[2 ], " devicesrc" )) {
207+ } else if (0 == STRCMPI (argv[2 ], " devicesrc" )) {
203208 LOG_INFO (" [KVS sample] Using device source (autovideosrc)" );
204209 source_type = DEVICE_SOURCE;
205210 } else {
206211 LOG_ERROR (" [KVS sample] Invalid source type" );
207- LOG_INFO (" [KVS sample] Usage: " << argv[0 ] << " <streamName (optional)> <testsrc or devicesrc(optional)>" );
212+ LOG_INFO (" [KVS sample] Usage: " << argv[0 ] << " <streamName (optional)> <testsrc or devicesrc(optional)> <runtime seconds (optional)> " );
208213 return -1 ;
209214 }
210215 } else {
211- LOG_ERROR (" [KVS sample] No source specified, defualting to test source (videotestsrc)" );
216+ LOG_INFO (" [KVS sample] No source specified, defaulting to test source (videotestsrc)" );
212217 source_type = TEST_SOURCE;
213218 }
214219
220+ if (argc > 3 ) {
221+ runtime_duration_seconds = atoi (argv[3 ]);
222+ if (runtime_duration_seconds <= 0 ) {
223+ LOG_ERROR (" [KVS sample] Invalid runtime duration: must be a positive integer." );
224+ return -1 ;
225+ }
226+ }
215227
216228 /* Create GStreamer elements */
217229
218230 pipeline = gst_pipeline_new (" kvs-pipeline" );
219231
220232 /* source */
221- if (source_type == TEST_SOURCE) {
233+ if (source_type == TEST_SOURCE) {
222234 source = gst_element_factory_make (" videotestsrc" , KVS_GST_TEST_SOURCE_NAME);
223235 g_object_set (G_OBJECT (source),
224236 " is-live" , TRUE ,
225237 " pattern" , 18 ,
226238 " background-color" , 0xff003181 ,
227239 " foreground-color" , 0xffff9900 , NULL );
228- } else if (source_type == DEVICE_SOURCE) {
240+ } else {
229241 source = gst_element_factory_make (" autovideosrc" , KVS_GST_DEVICE_SOURCE_NAME);
230242 }
231243
232244 /* clock overlay */
233245 clock_overlay = gst_element_factory_make (" clockoverlay" , " clock_overlay" );
234- g_object_set (G_OBJECT (clock_overlay)," time-format" , " %a %B %d, %Y %I:%M:%S %p" , NULL );
246+ g_object_set (G_OBJECT (clock_overlay), " time-format" , " %a %B %d, %Y %I:%M:%S %p" , NULL );
235247
236248 /* video convert */
237249 video_convert = gst_element_factory_make (" videoconvert" , " video_convert" );
@@ -269,12 +281,12 @@ int main(int argc, char *argv[])
269281
270282 /* Check that GStreamer elements were all successfully created */
271283
272- if (!kvssink) {
284+ if (!kvssink) {
273285 LOG_ERROR (" [KVS sample] Failed to create kvssink element" );
274286 return -1 ;
275287 }
276288
277- if (!pipeline || !source || !clock_overlay || !video_convert || !source_filter || !encoder || !sink_filter) {
289+ if (!pipeline || !source || !clock_overlay || !video_convert || !source_filter || !encoder || !sink_filter) {
278290 LOG_ERROR (" [KVS sample] Not all GStreamer elements could be created." );
279291 return -1 ;
280292 }
@@ -293,7 +305,7 @@ int main(int argc, char *argv[])
293305 source, clock_overlay, video_convert, source_filter, encoder, sink_filter, kvssink, NULL );
294306
295307 // Link the elements together.
296- if (!gst_element_link_many (source, clock_overlay, video_convert, source_filter, encoder, sink_filter, kvssink, NULL )) {
308+ if (!gst_element_link_many (source, clock_overlay, video_convert, source_filter, encoder, sink_filter, kvssink, NULL )) {
297309 LOG_ERROR (" [KVS sample] Elements could not be linked" );
298310 gst_object_unref (pipeline);
299311 return -1 ;
@@ -305,11 +317,24 @@ int main(int argc, char *argv[])
305317
306318 // Start the stop/start thread for intermittent streaming.
307319 std::thread stopStartThread (stopStartLoop, pipeline, source);
308-
320+
321+ std::thread timerThread ([runtime_duration_seconds]() {
322+ std::this_thread::sleep_for (std::chrono::seconds (runtime_duration_seconds));
323+ if (!terminated) {
324+ LOG_INFO (" [KVS sample] Reached maximum runtime of " << runtime_duration_seconds << " seconds. Terminating." );
325+ terminated = TRUE ;
326+ cv.notify_all ();
327+ if (main_loop != NULL ) {
328+ g_main_loop_quit (main_loop);
329+ }
330+ }
331+ });
332+
309333 LOG_INFO (" [KVS sample] Starting GStreamer main loop" );
310334 g_main_loop_run (main_loop);
311335
312336 stopStartThread.join ();
337+ timerThread.join ();
313338
314339 // Application terminated, cleanup.
315340 LOG_INFO (" [KVS sample] Streaming terminated, cleaning up" );
0 commit comments