|
| 1 | +#include <thread> |
| 2 | +#include <mutex> |
| 3 | +#include <chrono> |
| 4 | +#include <condition_variable> |
| 5 | + |
| 6 | +#include <gst/gst.h> |
| 7 | +#include <glib.h> |
| 8 | + |
| 9 | +#include "gstreamer/gstkvssink.h" |
| 10 | + |
| 11 | +using namespace com::amazonaws::kinesis::video; |
| 12 | +using namespace log4cplus; |
| 13 | + |
| 14 | +/* Modify these values to change start/stop interval. */ |
| 15 | +#define KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS 20 |
| 16 | +#define KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS 40 |
| 17 | + |
| 18 | +#define KVS_GST_TEST_SOURCE_NAME "test-source" |
| 19 | +#define KVS_GST_DEVICE_SOURCE_NAME "device-source" |
| 20 | + |
| 21 | + |
| 22 | +LOGGER_TAG("com.amazonaws.kinesis.video.gstreamer"); |
| 23 | + |
| 24 | +GMainLoop *main_loop = g_main_loop_new(NULL, FALSE); |
| 25 | +std::atomic<bool> terminated(FALSE); |
| 26 | +std::condition_variable cv; |
| 27 | + |
| 28 | +typedef enum _StreamSource { |
| 29 | + TEST_SOURCE, |
| 30 | + DEVICE_SOURCE |
| 31 | +} StreamSource; |
| 32 | + |
| 33 | +typedef struct _CustomData { |
| 34 | + _CustomData() : |
| 35 | + main_loop(NULL), |
| 36 | + pipeline(NULL) {} |
| 37 | + |
| 38 | + GMainLoop *main_loop; |
| 39 | + GstElement *pipeline; |
| 40 | +} CustomData; |
| 41 | + |
| 42 | +void sigint_handler(int sigint) { |
| 43 | + LOG_DEBUG("SIGINT received. Exiting..."); |
| 44 | + terminated = TRUE; |
| 45 | + cv.notify_all(); |
| 46 | + if(main_loop != NULL) { |
| 47 | + g_main_loop_quit(main_loop); |
| 48 | + } |
| 49 | +} |
| 50 | + |
| 51 | +static gboolean |
| 52 | +bus_call(GstBus *bus, GstMessage *msg, gpointer data) |
| 53 | +{ |
| 54 | + GMainLoop *loop = (GMainLoop *) ((CustomData *)data)->main_loop; |
| 55 | + GstElement *pipeline = (GstElement *) ((CustomData *)data)->pipeline; |
| 56 | + |
| 57 | + switch(GST_MESSAGE_TYPE(msg)) { |
| 58 | + case GST_MESSAGE_EOS: { |
| 59 | + LOG_DEBUG("[KVS sample] Received EOS message"); |
| 60 | + cv.notify_all(); |
| 61 | + break; |
| 62 | + } |
| 63 | + |
| 64 | + case GST_MESSAGE_ERROR: { |
| 65 | + gchar *debug; |
| 66 | + GError *error; |
| 67 | + |
| 68 | + gst_message_parse_error(msg, &error, &debug); |
| 69 | + g_free(debug); |
| 70 | + |
| 71 | + LOG_ERROR("[KVS sample] GStreamer error: " << error->message); |
| 72 | + g_error_free(error); |
| 73 | + |
| 74 | + g_main_loop_quit(loop); |
| 75 | + break; |
| 76 | + } |
| 77 | + |
| 78 | + default: { |
| 79 | + break; |
| 80 | + } |
| 81 | + } |
| 82 | + |
| 83 | + return TRUE; |
| 84 | +} |
| 85 | + |
| 86 | +void determine_aws_credentials(GstElement *kvssink, char* streamName) { |
| 87 | + char const *iot_credential_endpoint; |
| 88 | + char const *cert_path; |
| 89 | + char const *private_key_path; |
| 90 | + char const *role_alias; |
| 91 | + char const *ca_cert_path; |
| 92 | + char const *credential_path; |
| 93 | + if(nullptr != (iot_credential_endpoint = GETENV("IOT_GET_CREDENTIAL_ENDPOINT")) && |
| 94 | + nullptr != (cert_path = GETENV("CERT_PATH")) && |
| 95 | + nullptr != (private_key_path = GETENV("PRIVATE_KEY_PATH")) && |
| 96 | + nullptr != (role_alias = GETENV("ROLE_ALIAS")) && |
| 97 | + nullptr != (ca_cert_path = GETENV("CA_CERT_PATH"))) { |
| 98 | + LOG_DEBUG("[KVS sample] Using IoT credentials."); |
| 99 | + // Set the IoT Credentials if provided in envvar. |
| 100 | + GstStructure *iot_credentials = gst_structure_new( |
| 101 | + "iot-certificate", |
| 102 | + "iot-thing-name", G_TYPE_STRING, streamName, |
| 103 | + "endpoint", G_TYPE_STRING, iot_credential_endpoint, |
| 104 | + "cert-path", G_TYPE_STRING, cert_path, |
| 105 | + "key-path", G_TYPE_STRING, private_key_path, |
| 106 | + "ca-path", G_TYPE_STRING, ca_cert_path, |
| 107 | + "role-aliases", G_TYPE_STRING, role_alias, NULL); |
| 108 | + |
| 109 | + g_object_set(G_OBJECT(kvssink), "iot-certificate", iot_credentials, NULL); |
| 110 | + gst_structure_free(iot_credentials); |
| 111 | + // kvssink will search for long term credentials in envvar automatically so no need to include here |
| 112 | + // if no long credentials or IoT credentials provided will look for credential file as last resort. |
| 113 | + } else if(nullptr != (credential_path = GETENV("AWS_CREDENTIAL_PATH"))) { |
| 114 | + LOG_DEBUG("[KVS sample] Using AWS_CREDENTIAL_PATH long term credentials."); |
| 115 | + g_object_set(G_OBJECT(kvssink), "credential-path", credential_path, NULL); |
| 116 | + } else { |
| 117 | + LOG_DEBUG("[KVS sample] Using credentials set by AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY env vars."); |
| 118 | + } |
| 119 | +} |
| 120 | + |
| 121 | +// This function handles the intermittent starting and stopping of the stream in a loop. |
| 122 | +void stopStartLoop(GstElement *pipeline, GstElement *source) { |
| 123 | + std::mutex cv_m; |
| 124 | + std::unique_lock<std::mutex> lck(cv_m); |
| 125 | + |
| 126 | + while(!terminated) { |
| 127 | + // Using cv.wait_for to break sleep early upon signal interrupt. |
| 128 | + if(cv.wait_for(lck, std::chrono::seconds(KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS)) != std::cv_status::timeout) { |
| 129 | + break; |
| 130 | + } |
| 131 | + |
| 132 | + LOG_INFO("[KVS sample] Stopping stream to KVS for " << KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS << " seconds"); |
| 133 | + |
| 134 | + // EOS event pushes frames buffered by the h264enc element down to kvssink. |
| 135 | + GstEvent* eos = gst_event_new_eos(); |
| 136 | + gst_element_send_event(pipeline, eos); |
| 137 | + |
| 138 | + // Wait for the EOS event to return from kvssink to the bus which means all elements are done handling the EOS. |
| 139 | + // We don't want to flush until the EOS is done to ensure all frames buffered in the pipeline have been processed. |
| 140 | + cv.wait(lck); |
| 141 | + |
| 142 | + // Set videotestsrc to paused state because it does not stop producing frames upon EOS, |
| 143 | + // and the frames are not cleared upon flushing. |
| 144 | + if(STRCMPI(GST_ELEMENT_NAME(source), KVS_GST_TEST_SOURCE_NAME) == 0) { |
| 145 | + gst_element_set_state(source, GST_STATE_PAUSED); |
| 146 | + } |
| 147 | + |
| 148 | + // Flushing to remove EOS status. |
| 149 | + GstEvent* flush_start = gst_event_new_flush_start(); |
| 150 | + gst_element_send_event(pipeline, flush_start); |
| 151 | + |
| 152 | + // Using cv.wait_for to break sleep early upon signal interrupt. Checking for termination again before waiting. |
| 153 | + if(terminated || cv.wait_for(lck, std::chrono::seconds(KVS_INTERMITTENT_PAUSED_INTERVAL_SECONDS)) != std::cv_status::timeout) { |
| 154 | + break; |
| 155 | + } |
| 156 | + |
| 157 | + LOG_INFO("[KVS sample] Starting stream to KVS for " << KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS << " seconds"); |
| 158 | + |
| 159 | + // Stop the flush now that we are resuming streaming. |
| 160 | + GstEvent* flush_stop = gst_event_new_flush_stop(true); |
| 161 | + gst_element_send_event(pipeline, flush_stop); |
| 162 | + |
| 163 | + // Set videotestsrc back to playing state. |
| 164 | + if(STRCMPI(GST_ELEMENT_NAME(source), KVS_GST_TEST_SOURCE_NAME) == 0) { |
| 165 | + gst_element_set_state(source, GST_STATE_PLAYING); |
| 166 | + } |
| 167 | + } |
| 168 | + LOG_DEBUG("[KVS sample] Exited stopStartLoop"); |
| 169 | +} |
| 170 | + |
| 171 | +int main(int argc, char *argv[]) |
| 172 | +{ |
| 173 | + signal(SIGINT, sigint_handler); |
| 174 | + |
| 175 | + CustomData customData; |
| 176 | + GstElement *pipeline, *source, *clock_overlay, *video_convert, *source_filter, *encoder, *sink_filter, *kvssink; |
| 177 | + GstCaps *source_caps, *sink_caps; |
| 178 | + GstBus *bus; |
| 179 | + guint bus_watch_id; |
| 180 | + StreamSource source_type; |
| 181 | + char stream_name[MAX_STREAM_NAME_LEN + 1] = {0}; |
| 182 | + |
| 183 | + gst_init(&argc, &argv); |
| 184 | + |
| 185 | + |
| 186 | + /* Parse input arguments */ |
| 187 | + |
| 188 | + // Check for invalid argument count, get stream name. |
| 189 | + if(argc > 3) { |
| 190 | + LOG_ERROR("[KVS sample] Invalid argument count, too many arguments."); |
| 191 | + LOG_INFO("[KVS sample] Usage: " << argv[0] << " <streamName (optional)> <testsrc or devicesrc (optional)>"); |
| 192 | + return -1; |
| 193 | + } else if(argc > 1) { |
| 194 | + STRNCPY(stream_name, argv[1], MAX_STREAM_NAME_LEN); |
| 195 | + } |
| 196 | + |
| 197 | + // Get source type. |
| 198 | + if(argc > 2) { |
| 199 | + if(0 == STRCMPI(argv[2], "testsrc")) { |
| 200 | + LOG_INFO("[KVS sample] Using test source (videotestsrc)"); |
| 201 | + source_type = TEST_SOURCE; |
| 202 | + } else if(0 == STRCMPI(argv[2], "devicesrc")) { |
| 203 | + LOG_INFO("[KVS sample] Using device source (autovideosrc)"); |
| 204 | + source_type = DEVICE_SOURCE; |
| 205 | + } else { |
| 206 | + LOG_ERROR("[KVS sample] Invalid source type"); |
| 207 | + LOG_INFO("[KVS sample] Usage: " << argv[0] << " <streamName (optional)> <testsrc or devicesrc(optional)>"); |
| 208 | + return -1; |
| 209 | + } |
| 210 | + } else { |
| 211 | + LOG_ERROR("[KVS sample] No source specified, defualting to test source (videotestsrc)"); |
| 212 | + source_type = TEST_SOURCE; |
| 213 | + } |
| 214 | + |
| 215 | + |
| 216 | + /* Create GStreamer elements */ |
| 217 | + |
| 218 | + pipeline = gst_pipeline_new("kvs-pipeline"); |
| 219 | + |
| 220 | + /* source */ |
| 221 | + if(source_type == TEST_SOURCE) { |
| 222 | + source = gst_element_factory_make("videotestsrc", KVS_GST_TEST_SOURCE_NAME); |
| 223 | + g_object_set(G_OBJECT(source), |
| 224 | + "is-live", TRUE, |
| 225 | + "pattern", 18, |
| 226 | + "background-color", 0xff003181, |
| 227 | + "foreground-color", 0xffff9900, NULL); |
| 228 | + } else if(source_type == DEVICE_SOURCE) { |
| 229 | + source = gst_element_factory_make("autovideosrc", KVS_GST_DEVICE_SOURCE_NAME); |
| 230 | + } |
| 231 | + |
| 232 | + /* clock overlay */ |
| 233 | + clock_overlay = gst_element_factory_make("clockoverlay", "clock_overlay"); |
| 234 | + g_object_set(G_OBJECT(clock_overlay),"time-format", "%a %B %d, %Y %I:%M:%S %p", NULL); |
| 235 | + |
| 236 | + /* video convert */ |
| 237 | + video_convert = gst_element_factory_make("videoconvert", "video_convert"); |
| 238 | + |
| 239 | + /* source filter */ |
| 240 | + source_filter = gst_element_factory_make("capsfilter", "source-filter"); |
| 241 | + source_caps = gst_caps_new_simple("video/x-raw", "format", G_TYPE_STRING, "I420", NULL); |
| 242 | + g_object_set(G_OBJECT(source_filter), "caps", source_caps, NULL); |
| 243 | + gst_caps_unref(source_caps); |
| 244 | + |
| 245 | + /* encoder */ |
| 246 | + encoder = gst_element_factory_make("x264enc", "encoder"); |
| 247 | + g_object_set(G_OBJECT(encoder), |
| 248 | + "bframes", 0, |
| 249 | + "key-int-max", 120, NULL); |
| 250 | + |
| 251 | + /* sink filter */ |
| 252 | + sink_filter = gst_element_factory_make("capsfilter", "sink-filter"); |
| 253 | + sink_caps = gst_caps_new_simple("video/x-h264", |
| 254 | + "stream-format", G_TYPE_STRING, "avc", |
| 255 | + "alignment", G_TYPE_STRING, "au", |
| 256 | + NULL); |
| 257 | + g_object_set(G_OBJECT(sink_filter), "caps", sink_caps, NULL); |
| 258 | + gst_caps_unref(sink_caps); |
| 259 | + |
| 260 | + /* kvssink */ |
| 261 | + kvssink = gst_element_factory_make("kvssink", "kvssink"); |
| 262 | + if (IS_EMPTY_STRING(stream_name)) { |
| 263 | + LOG_INFO("No stream name specified, using default kvssink stream name.") |
| 264 | + } else { |
| 265 | + g_object_set(G_OBJECT(kvssink), "stream-name", stream_name, NULL); |
| 266 | + } |
| 267 | + determine_aws_credentials(kvssink, stream_name); |
| 268 | + |
| 269 | + |
| 270 | + /* Check that GStreamer elements were all successfully created */ |
| 271 | + |
| 272 | + if(!kvssink) { |
| 273 | + LOG_ERROR("[KVS sample] Failed to create kvssink element"); |
| 274 | + return -1; |
| 275 | + } |
| 276 | + |
| 277 | + if(!pipeline || !source || !clock_overlay || !video_convert || !source_filter || !encoder || !sink_filter) { |
| 278 | + LOG_ERROR("[KVS sample] Not all GStreamer elements could be created."); |
| 279 | + return -1; |
| 280 | + } |
| 281 | + |
| 282 | + // Populate data struct. |
| 283 | + customData.main_loop = main_loop; |
| 284 | + customData.pipeline = pipeline; |
| 285 | + |
| 286 | + // Add a message handler. |
| 287 | + bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); |
| 288 | + bus_watch_id = gst_bus_add_watch(bus, bus_call, &customData); |
| 289 | + gst_object_unref(bus); |
| 290 | + |
| 291 | + // Add elements into the pipeline. |
| 292 | + gst_bin_add_many(GST_BIN(pipeline), |
| 293 | + source, clock_overlay, video_convert, source_filter, encoder, sink_filter, kvssink, NULL); |
| 294 | + |
| 295 | + // Link the elements together. |
| 296 | + if(!gst_element_link_many(source, clock_overlay, video_convert, source_filter, encoder, sink_filter, kvssink, NULL)) { |
| 297 | + LOG_ERROR("[KVS sample] Elements could not be linked"); |
| 298 | + gst_object_unref(pipeline); |
| 299 | + return -1; |
| 300 | + } |
| 301 | + |
| 302 | + // Set the pipeline to playing state. |
| 303 | + LOG_INFO("[KVS sample] Starting stream to KVS for " << KVS_INTERMITTENT_PLAYING_INTERVAL_SECONDS << " seconds"); |
| 304 | + gst_element_set_state(pipeline, GST_STATE_PLAYING); |
| 305 | + |
| 306 | + // Start the stop/start thread for intermittent streaming. |
| 307 | + std::thread stopStartThread(stopStartLoop, pipeline, source); |
| 308 | + |
| 309 | + LOG_INFO("[KVS sample] Starting GStreamer main loop"); |
| 310 | + g_main_loop_run(main_loop); |
| 311 | + |
| 312 | + stopStartThread.join(); |
| 313 | + |
| 314 | + // Application terminated, cleanup. |
| 315 | + LOG_INFO("[KVS sample] Streaming terminated, cleaning up"); |
| 316 | + gst_element_set_state(pipeline, GST_STATE_NULL); |
| 317 | + gst_object_unref(GST_OBJECT(pipeline)); |
| 318 | + g_source_remove(bus_watch_id); |
| 319 | + g_main_loop_unref(main_loop); |
| 320 | + |
| 321 | + return 0; |
| 322 | +} |
0 commit comments