Skip to content

Commit 4b7f361

Browse files
committed
Clean start/stop copy (#1256)
* Starting * Started clean sample * more * Streaming to KVS working * Intermittent streaming working. * Add comments, cleanup * EOS stream message working * Improvements * Revert changes to original sample * more * more * Cleanup kvssink * Cleanup intermittent sample and kvssink * more * Remove rtsp related things * Switch to cv wait for all waits * Address some PR comments * Link with kvspicUtils * kvssink to send eofr upon eos, fix testsrc not stopping issue * Address review comments * Don't require stream name arg * Add sample instructions to README.md * Fix typos in ReadMe, add language to code blocks * Install pkgconfiglite * Address comments * Fix double space typo * nit ReadMe change
1 parent b2e15a6 commit 4b7f361

File tree

3 files changed

+391
-35
lines changed

3 files changed

+391
-35
lines changed

CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,9 @@ if(BUILD_GSTREAMER_PLUGIN)
237237
add_executable(kvssink_gstreamer_sample samples/kvssink_gstreamer_sample.cpp)
238238
target_link_libraries(kvssink_gstreamer_sample ${GST_APP_LIBRARIES} KinesisVideoProducer)
239239

240+
add_executable(kvssink_intermittent_sample samples/kvssink_intermittent_sample.cpp )
241+
target_link_libraries(kvssink_intermittent_sample ${GST_APP_LIBRARIES} KinesisVideoProducer)
242+
240243
add_executable(kvs_gstreamer_sample samples/kvs_gstreamer_sample.cpp)
241244
target_link_libraries(kvs_gstreamer_sample ${GST_APP_LIBRARIES} KinesisVideoProducer kvspic)
242245

Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
#include <thread>
2+
#include <mutex>
3+
#include <chrono>
4+
#include <condition_variable>
5+
6+
#include <gst/gst.h>
7+
#include <glib.h>
8+
9+
#include "gstreamer/gstkvssink.h"
10+
11+
using namespace com::amazonaws::kinesis::video;
12+
using namespace log4cplus;
13+
14+
/* Modify these values to change start/stop interval. */
15+
#define KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS 20
16+
#define KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS 40
17+
18+
#define KVS_GST_TEST_SOURCE_NAME "test-source"
19+
#define KVS_GST_DEVICE_SOURCE_NAME "device-source"
20+
21+
22+
LOGGER_TAG("com.amazonaws.kinesis.video.gstreamer");
23+
24+
GMainLoop *main_loop = g_main_loop_new(NULL, FALSE);
25+
std::atomic<bool> terminated(FALSE);
26+
std::condition_variable cv;
27+
28+
typedef enum _StreamSource {
29+
TEST_SOURCE,
30+
DEVICE_SOURCE
31+
} StreamSource;
32+
33+
typedef struct _CustomData {
34+
_CustomData() :
35+
main_loop(NULL),
36+
pipeline(NULL) {}
37+
38+
GMainLoop *main_loop;
39+
GstElement *pipeline;
40+
} CustomData;
41+
42+
void sigint_handler(int sigint) {
43+
LOG_DEBUG("SIGINT received. Exiting...");
44+
terminated = TRUE;
45+
cv.notify_all();
46+
if(main_loop != NULL) {
47+
g_main_loop_quit(main_loop);
48+
}
49+
}
50+
51+
static gboolean
52+
bus_call(GstBus *bus, GstMessage *msg, gpointer data)
53+
{
54+
GMainLoop *loop = (GMainLoop *) ((CustomData *)data)->main_loop;
55+
GstElement *pipeline = (GstElement *) ((CustomData *)data)->pipeline;
56+
57+
switch(GST_MESSAGE_TYPE(msg)) {
58+
case GST_MESSAGE_EOS: {
59+
LOG_DEBUG("[KVS sample] Received EOS message");
60+
cv.notify_all();
61+
break;
62+
}
63+
64+
case GST_MESSAGE_ERROR: {
65+
gchar *debug;
66+
GError *error;
67+
68+
gst_message_parse_error(msg, &error, &debug);
69+
g_free(debug);
70+
71+
LOG_ERROR("[KVS sample] GStreamer error: " << error->message);
72+
g_error_free(error);
73+
74+
g_main_loop_quit(loop);
75+
break;
76+
}
77+
78+
default: {
79+
break;
80+
}
81+
}
82+
83+
return TRUE;
84+
}
85+
86+
void determine_aws_credentials(GstElement *kvssink, char* streamName) {
87+
char const *iot_credential_endpoint;
88+
char const *cert_path;
89+
char const *private_key_path;
90+
char const *role_alias;
91+
char const *ca_cert_path;
92+
char const *credential_path;
93+
if(nullptr != (iot_credential_endpoint = GETENV("IOT_GET_CREDENTIAL_ENDPOINT")) &&
94+
nullptr != (cert_path = GETENV("CERT_PATH")) &&
95+
nullptr != (private_key_path = GETENV("PRIVATE_KEY_PATH")) &&
96+
nullptr != (role_alias = GETENV("ROLE_ALIAS")) &&
97+
nullptr != (ca_cert_path = GETENV("CA_CERT_PATH"))) {
98+
LOG_DEBUG("[KVS sample] Using IoT credentials.");
99+
// Set the IoT Credentials if provided in envvar.
100+
GstStructure *iot_credentials = gst_structure_new(
101+
"iot-certificate",
102+
"iot-thing-name", G_TYPE_STRING, streamName,
103+
"endpoint", G_TYPE_STRING, iot_credential_endpoint,
104+
"cert-path", G_TYPE_STRING, cert_path,
105+
"key-path", G_TYPE_STRING, private_key_path,
106+
"ca-path", G_TYPE_STRING, ca_cert_path,
107+
"role-aliases", G_TYPE_STRING, role_alias, NULL);
108+
109+
g_object_set(G_OBJECT(kvssink), "iot-certificate", iot_credentials, NULL);
110+
gst_structure_free(iot_credentials);
111+
// kvssink will search for long term credentials in envvar automatically so no need to include here
112+
// if no long credentials or IoT credentials provided will look for credential file as last resort.
113+
} else if(nullptr != (credential_path = GETENV("AWS_CREDENTIAL_PATH"))) {
114+
LOG_DEBUG("[KVS sample] Using AWS_CREDENTIAL_PATH long term credentials.");
115+
g_object_set(G_OBJECT(kvssink), "credential-path", credential_path, NULL);
116+
} else {
117+
LOG_DEBUG("[KVS sample] Using credentials set by AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY env vars.");
118+
}
119+
}
120+
121+
// This function handles the intermittent starting and stopping of the stream in a loop.
122+
void stopStartLoop(GstElement *pipeline, GstElement *source) {
123+
std::mutex cv_m;
124+
std::unique_lock<std::mutex> lck(cv_m);
125+
126+
while(!terminated) {
127+
// Using cv.wait_for to break sleep early upon signal interrupt.
128+
if(cv.wait_for(lck, std::chrono::seconds(KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS)) != std::cv_status::timeout) {
129+
break;
130+
}
131+
132+
LOG_INFO("[KVS sample] Stopping stream to KVS for " << KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS << " seconds");
133+
134+
// EOS event pushes frames buffered by the h264enc element down to kvssink.
135+
GstEvent* eos = gst_event_new_eos();
136+
gst_element_send_event(pipeline, eos);
137+
138+
// Wait for the EOS event to return from kvssink to the bus which means all elements are done handling the EOS.
139+
// We don't want to flush until the EOS is done to ensure all frames buffered in the pipeline have been processed.
140+
cv.wait(lck);
141+
142+
// Set videotestsrc to paused state because it does not stop producing frames upon EOS,
143+
// and the frames are not cleared upon flushing.
144+
if(STRCMPI(GST_ELEMENT_NAME(source), KVS_GST_TEST_SOURCE_NAME) == 0) {
145+
gst_element_set_state(source, GST_STATE_PAUSED);
146+
}
147+
148+
// Flushing to remove EOS status.
149+
GstEvent* flush_start = gst_event_new_flush_start();
150+
gst_element_send_event(pipeline, flush_start);
151+
152+
// Using cv.wait_for to break sleep early upon signal interrupt. Checking for termination again before waiting.
153+
if(terminated || cv.wait_for(lck, std::chrono::seconds(KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS)) != std::cv_status::timeout) {
154+
break;
155+
}
156+
157+
LOG_INFO("[KVS sample] Starting stream to KVS for " << KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS << " seconds");
158+
159+
// Stop the flush now that we are resuming streaming.
160+
GstEvent* flush_stop = gst_event_new_flush_stop(true);
161+
gst_element_send_event(pipeline, flush_stop);
162+
163+
// Set videotestsrc back to playing state.
164+
if(STRCMPI(GST_ELEMENT_NAME(source), KVS_GST_TEST_SOURCE_NAME) == 0) {
165+
gst_element_set_state(source, GST_STATE_PLAYING);
166+
}
167+
}
168+
LOG_DEBUG("[KVS sample] Exited stopStartLoop");
169+
}
170+
171+
int main(int argc, char *argv[])
172+
{
173+
signal(SIGINT, sigint_handler);
174+
175+
CustomData customData;
176+
GstElement *pipeline, *source, *clock_overlay, *video_convert, *source_filter, *encoder, *sink_filter, *kvssink;
177+
GstCaps *source_caps, *sink_caps;
178+
GstBus *bus;
179+
guint bus_watch_id;
180+
StreamSource source_type;
181+
char stream_name[MAX_STREAM_NAME_LEN + 1] = {0};
182+
183+
gst_init(&argc, &argv);
184+
185+
186+
/* Parse input arguments */
187+
188+
// Check for invalid argument count, get stream name.
189+
if(argc > 3) {
190+
LOG_ERROR("[KVS sample] Invalid argument count, too many arguments.");
191+
LOG_INFO("[KVS sample] Usage: " << argv[0] << " <streamName (optional)> <testsrc or devicesrc (optional)>");
192+
return -1;
193+
} else if(argc > 1) {
194+
STRNCPY(stream_name, argv[1], MAX_STREAM_NAME_LEN);
195+
}
196+
197+
// Get source type.
198+
if(argc > 2) {
199+
if(0 == STRCMPI(argv[2], "testsrc")) {
200+
LOG_INFO("[KVS sample] Using test source (videotestsrc)");
201+
source_type = TEST_SOURCE;
202+
} else if(0 == STRCMPI(argv[2], "devicesrc")) {
203+
LOG_INFO("[KVS sample] Using device source (autovideosrc)");
204+
source_type = DEVICE_SOURCE;
205+
} else {
206+
LOG_ERROR("[KVS sample] Invalid source type");
207+
LOG_INFO("[KVS sample] Usage: " << argv[0] << " <streamName (optional)> <testsrc or devicesrc(optional)>");
208+
return -1;
209+
}
210+
} else {
211+
LOG_ERROR("[KVS sample] No source specified, defualting to test source (videotestsrc)");
212+
source_type = TEST_SOURCE;
213+
}
214+
215+
216+
/* Create GStreamer elements */
217+
218+
pipeline = gst_pipeline_new("kvs-pipeline");
219+
220+
/* source */
221+
if(source_type == TEST_SOURCE) {
222+
source = gst_element_factory_make("videotestsrc", KVS_GST_TEST_SOURCE_NAME);
223+
g_object_set(G_OBJECT(source),
224+
"is-live", TRUE,
225+
"pattern", 18,
226+
"background-color", 0xff003181,
227+
"foreground-color", 0xffff9900, NULL);
228+
} else if(source_type == DEVICE_SOURCE) {
229+
source = gst_element_factory_make("autovideosrc", KVS_GST_DEVICE_SOURCE_NAME);
230+
}
231+
232+
/* clock overlay */
233+
clock_overlay = gst_element_factory_make("clockoverlay", "clock_overlay");
234+
g_object_set(G_OBJECT(clock_overlay),"time-format", "%a %B %d, %Y %I:%M:%S %p", NULL);
235+
236+
/* video convert */
237+
video_convert = gst_element_factory_make("videoconvert", "video_convert");
238+
239+
/* source filter */
240+
source_filter = gst_element_factory_make("capsfilter", "source-filter");
241+
source_caps = gst_caps_new_simple("video/x-raw", "format", G_TYPE_STRING, "I420", NULL);
242+
g_object_set(G_OBJECT(source_filter), "caps", source_caps, NULL);
243+
gst_caps_unref(source_caps);
244+
245+
/* encoder */
246+
encoder = gst_element_factory_make("x264enc", "encoder");
247+
g_object_set(G_OBJECT(encoder),
248+
"bframes", 0,
249+
"key-int-max", 120, NULL);
250+
251+
/* sink filter */
252+
sink_filter = gst_element_factory_make("capsfilter", "sink-filter");
253+
sink_caps = gst_caps_new_simple("video/x-h264",
254+
"stream-format", G_TYPE_STRING, "avc",
255+
"alignment", G_TYPE_STRING, "au",
256+
NULL);
257+
g_object_set(G_OBJECT(sink_filter), "caps", sink_caps, NULL);
258+
gst_caps_unref(sink_caps);
259+
260+
/* kvssink */
261+
kvssink = gst_element_factory_make("kvssink", "kvssink");
262+
if (IS_EMPTY_STRING(stream_name)) {
263+
LOG_INFO("No stream name specified, using default kvssink stream name.")
264+
} else {
265+
g_object_set(G_OBJECT(kvssink), "stream-name", stream_name, NULL);
266+
}
267+
determine_aws_credentials(kvssink, stream_name);
268+
269+
270+
/* Check that GStreamer elements were all successfully created */
271+
272+
if(!kvssink) {
273+
LOG_ERROR("[KVS sample] Failed to create kvssink element");
274+
return -1;
275+
}
276+
277+
if(!pipeline || !source || !clock_overlay || !video_convert || !source_filter || !encoder || !sink_filter) {
278+
LOG_ERROR("[KVS sample] Not all GStreamer elements could be created.");
279+
return -1;
280+
}
281+
282+
// Populate data struct.
283+
customData.main_loop = main_loop;
284+
customData.pipeline = pipeline;
285+
286+
// Add a message handler.
287+
bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
288+
bus_watch_id = gst_bus_add_watch(bus, bus_call, &customData);
289+
gst_object_unref(bus);
290+
291+
// Add elements into the pipeline.
292+
gst_bin_add_many(GST_BIN(pipeline),
293+
source, clock_overlay, video_convert, source_filter, encoder, sink_filter, kvssink, NULL);
294+
295+
// Link the elements together.
296+
if(!gst_element_link_many(source, clock_overlay, video_convert, source_filter, encoder, sink_filter, kvssink, NULL)) {
297+
LOG_ERROR("[KVS sample] Elements could not be linked");
298+
gst_object_unref(pipeline);
299+
return -1;
300+
}
301+
302+
// Set the pipeline to playing state.
303+
LOG_INFO("[KVS sample] Starting stream to KVS for " << KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS << " seconds");
304+
gst_element_set_state(pipeline, GST_STATE_PLAYING);
305+
306+
// Start the stop/start thread for intermittent streaming.
307+
std::thread stopStartThread(stopStartLoop, pipeline, source);
308+
309+
LOG_INFO("[KVS sample] Starting GStreamer main loop");
310+
g_main_loop_run(main_loop);
311+
312+
stopStartThread.join();
313+
314+
// Application terminated, cleanup.
315+
LOG_INFO("[KVS sample] Streaming terminated, cleaning up");
316+
gst_element_set_state(pipeline, GST_STATE_NULL);
317+
gst_object_unref(GST_OBJECT(pipeline));
318+
g_source_remove(bus_watch_id);
319+
g_main_loop_unref(main_loop);
320+
321+
return 0;
322+
}

0 commit comments

Comments
 (0)