diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 58d4c071..7e796f37 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -79,8 +79,8 @@ jobs: id-token: write contents: read env: - CC: /usr/local/bin/gcc-13 - CXX: /usr/local/bin/g++-13 + CC: /usr/local/bin/gcc-12 + CXX: /usr/local/bin/g++-12 AWS_KVS_LOG_LEVEL: 2 steps: - name: Clone repository diff --git a/samples/include.h b/samples/include.h index 98c2ee9c..e50b4522 100644 --- a/samples/include.h +++ b/samples/include.h @@ -6,6 +6,10 @@ #define KVS_ADD_METADATA_VALUE "value" #define KVS_ADD_METADATA_PERSISTENT "persist" +#define KVS_ADD_EVENT_METADATA_G_STRUCT_NAME "kvs-add-event-metadata" +#define KVS_ADD_EVENT_METADATA_EVENT "event" +#define KVS_ADD_EVENT_METADATA_STREAM_EVENT_METADATA "stream-event-metadata" + #define STATUS_KVS_GSTREAMER_SAMPLE_BASE 0x00080000 #define STATUS_KVS_GSTREAMER_SAMPLE_ERROR STATUS_KVS_GSTREAMER_SAMPLE_BASE + 0x00000001 #define STATUS_KVS_GSTREAMER_SAMPLE_INTERRUPTED STATUS_KVS_GSTREAMER_SAMPLE_BASE + 0x00000002 diff --git a/samples/kvssink_gstreamer_sample.cpp b/samples/kvssink_gstreamer_sample.cpp index c8b6fdc6..e098d3de 100644 --- a/samples/kvssink_gstreamer_sample.cpp +++ b/samples/kvssink_gstreamer_sample.cpp @@ -50,7 +50,9 @@ typedef struct _CustomData { last_unpersisted_file_idx(0), stream_status(STATUS_SUCCESS), put_fragment_metadata_frequency_seconds(2), + deliver_images_frequency_seconds(5), fragment_metadata_timer_id(0), + deliver_images_timer_id(0), base_pts(0), max_frame_pts(0), key_frame_pts(0), @@ -67,7 +69,9 @@ typedef struct _CustomData { char *stream_name; mutex file_list_mtx; int put_fragment_metadata_frequency_seconds; + int deliver_images_frequency_seconds; int fragment_metadata_timer_id; + int deliver_images_timer_id; int metadata_counter = 0; bool persist_flag = true; @@ -256,17 +260,17 @@ void determine_credentials(GstElement *kvssink, CustomData *data) { nullptr != (private_key_path = getenv("PRIVATE_KEY_PATH")) && nullptr != (role_alias = getenv("ROLE_ALIAS")) && nullptr != (ca_cert_path = getenv("CA_CERT_PATH"))) { - // set the IoT Credentials if provided in envvar - GstStructure *iot_credentials = gst_structure_new( - "iot-certificate", - "iot-thing-name", G_TYPE_STRING, data->stream_name, - "endpoint", G_TYPE_STRING, iot_credential_endpoint, - "cert-path", G_TYPE_STRING, cert_path, - "key-path", G_TYPE_STRING, private_key_path, - "ca-path", G_TYPE_STRING, ca_cert_path, - "role-aliases", G_TYPE_STRING, role_alias, NULL); - - g_object_set(G_OBJECT (kvssink), "iot-certificate", iot_credentials, NULL); + // set the IoT Credentials if provided in envvar + GstStructure *iot_credentials = gst_structure_new( + "iot-certificate", + "iot-thing-name", G_TYPE_STRING, data->stream_name, + "endpoint", G_TYPE_STRING, iot_credential_endpoint, + "cert-path", G_TYPE_STRING, cert_path, + "key-path", G_TYPE_STRING, private_key_path, + "ca-path", G_TYPE_STRING, ca_cert_path, + "role-aliases", G_TYPE_STRING, role_alias, NULL); + + g_object_set(G_OBJECT (kvssink), "iot-certificate", iot_credentials, NULL); gst_structure_free(iot_credentials); // kvssink will search for long term credentials in envvar automatically so no need to include here // if no long credentials or IoT credentials provided will look for credential file as last resort @@ -288,6 +292,18 @@ bool put_fragment_metadata(GstElement* element, const std::string name, const st return gst_element_send_event(element, event); } +/* +This function creates a GstStructure and uses it to trigger the GST_EVENT_CUSTOM_DOWNSTREAM for put_event_metadata +*/ +bool put_event_metadata(GstElement* element, const uint32_t metadataEvent, const PStreamEventMetadata pStreamEventMetadata) { + GstStructure *metadata = gst_structure_new_empty(KVS_ADD_EVENT_METADATA_G_STRUCT_NAME); + gst_structure_set(metadata, KVS_ADD_EVENT_METADATA_EVENT, G_TYPE_UINT, metadataEvent, + KVS_ADD_EVENT_METADATA_STREAM_EVENT_METADATA, G_TYPE_POINTER, pStreamEventMetadata, NULL); + GstEvent* event = gst_event_new_custom(GST_EVENT_CUSTOM_DOWNSTREAM, metadata); + LOG_TRACE("Emit the images to be delivered event with structure: " << std::string(gst_structure_to_string (metadata))); + return gst_element_send_event(element, event); +} + /* Function to put fragment metadata: name, value, and persist values This is a sample function. This function alternates between putting persistent and non-persistent metadata @@ -333,6 +349,19 @@ static void put_metadata(GstElement* element) { } } +/* +Function to deliver images by calling put_event_metadata. Customers can write their own "deliver_images" or other similar function +to put event metadata. Currently this function is being called every five seconds using a timer. This logic can be modified +and the trigger for this function can also be customized by the customer +*/ +static void deliver_images(GstElement* element) { + if (!put_event_metadata(element, STREAM_EVENT_TYPE_IMAGE_GENERATION, NULL)) { + g_source_remove(data_global.deliver_images_timer_id); + data_global.deliver_images_timer_id = 0; + LOG_WARN("Failed to deliver image, removing timer"); + } +} + int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElement *pipeline, GstElement *kvssink) { bool vtenc = false, isOnRpi = false; @@ -795,7 +824,8 @@ int gstreamer_init(int argc, char *argv[], CustomData *data) { // Create a GStreamer timer to generate and put fragment metadata tags every 2 seconds data->fragment_metadata_timer_id = g_timeout_add_seconds(data->put_fragment_metadata_frequency_seconds, reinterpret_cast(put_metadata), kvssink); - + data->deliver_images_timer_id = g_timeout_add_seconds(data->deliver_images_frequency_seconds, reinterpret_cast(deliver_images), kvssink); + /* start streaming */ gst_ret = gst_element_set_state(pipeline, GST_STATE_PLAYING); if (gst_ret == GST_STATE_CHANGE_FAILURE) { diff --git a/src/KinesisVideoStream.cpp b/src/KinesisVideoStream.cpp index e3063da9..36019308 100644 --- a/src/KinesisVideoStream.cpp +++ b/src/KinesisVideoStream.cpp @@ -185,6 +185,16 @@ bool KinesisVideoStream::putFragmentMetadata(const std::string &name, const std: return true; } +bool KinesisVideoStream::putEventMetadata(uint32_t event, PStreamEventMetadata pStreamEventMetadata){ + STATUS status = ::putKinesisVideoEventMetadata(stream_handle_, event, pStreamEventMetadata); + if (STATUS_FAILED(status)) { + LOG_ERROR("Failed to put event: " << status << " for " << this->stream_name_); + return false; + } + + return true; +} + KinesisVideoStream::~KinesisVideoStream() { free(); } diff --git a/src/KinesisVideoStream.h b/src/KinesisVideoStream.h index 429cf92e..5fd61142 100644 --- a/src/KinesisVideoStream.h +++ b/src/KinesisVideoStream.h @@ -83,6 +83,18 @@ class KinesisVideoStream { */ bool putFragmentMetadata(const std::string& name, const std::string& value, bool persistent = true); + /* + * Inserts a KVS event(s) accompanied by optional metadata (key/value string pairs) into the stream. + * Multiple events can be submitted at once by using bitwise OR of event types, or multiple calls of this + * function with different unique events. + * @param 1 uint32_t - the type of event(s), a value from STREAM_EVENT_TYPE enum. If + * if you want to submit multiple events in one call it is suggested to use bit-wise + * OR combination from STREAM_EVENT_TYPE enum. + * @param 2 PStreamEventMetadata - pointer to struct with optional metadata. This metadata will be applied + * to all events included in THIS function call. + */ + bool putEventMetadata(uint32_t event, PStreamEventMetadata pStreamEventMetadata); + /** * Initializes the track identified by trackId with a hex-encoded codec private data * and puts the stream in a state that it is ready to receive frames via putFrame(). diff --git a/src/gstreamer/gstkvssink.cpp b/src/gstreamer/gstkvssink.cpp index 15835c5f..e48ac11f 100644 --- a/src/gstreamer/gstkvssink.cpp +++ b/src/gstreamer/gstkvssink.cpp @@ -122,6 +122,10 @@ GST_DEBUG_CATEGORY_STATIC (gst_kvs_sink_debug); #define KVS_ADD_METADATA_VALUE "value" #define KVS_ADD_METADATA_PERSISTENT "persist" +#define KVS_ADD_EVENT_METADATA_G_STRUCT_NAME "kvs-add-event-metadata" +#define KVS_ADD_EVENT_METADATA_EVENT "event" +#define KVS_ADD_EVENT_METADATA_STREAM_EVENT_METADATA "stream-event-metadata" + #define KVS_CLIENT_USER_AGENT_NAME "AWS-SDK-KVS-CPP-CLIENT" #define DEFAULT_AUDIO_TRACK_NAME "audio" @@ -1158,34 +1162,47 @@ gst_kvs_sink_handle_sink_event (GstCollectPads *pads, } case GST_EVENT_CUSTOM_DOWNSTREAM: { const GstStructure *structure = gst_event_get_structure(event); - std::string metadata_name, metadata_value; + uint32_t streamMetadataEvent; gboolean persistent; - bool is_persist; + PStreamEventMetadata pStreamEventMetadata; + + if (gst_structure_has_name(structure, KVS_ADD_METADATA_G_STRUCT_NAME) && + NULL != gst_structure_get_string(structure, KVS_ADD_METADATA_NAME) && + NULL != gst_structure_get_string(structure, KVS_ADD_METADATA_VALUE) && + gst_structure_get_boolean(structure, KVS_ADD_METADATA_PERSISTENT, &persistent)) { + + std::string metadata_name, metadata_value; + + LOG_TRACE("Received kvs-add-metadata event for " << kvssink->stream_name); + + metadata_name = std::string(gst_structure_get_string(structure, KVS_ADD_METADATA_NAME)); + metadata_value = std::string(gst_structure_get_string(structure, KVS_ADD_METADATA_VALUE)); + + persistent = (bool) persistent; - if (!gst_structure_has_name(structure, KVS_ADD_METADATA_G_STRUCT_NAME) || - NULL == gst_structure_get_string(structure, KVS_ADD_METADATA_NAME) || - NULL == gst_structure_get_string(structure, KVS_ADD_METADATA_VALUE) || - !gst_structure_get_boolean(structure, KVS_ADD_METADATA_PERSISTENT, &persistent)) { + if (!data->kinesis_video_stream->putFragmentMetadata(metadata_name, metadata_value, persistent)) { + ret = FALSE; + LOG_WARN("Failed to putFragmentMetadata for name: " << metadata_name << ", value: " << metadata_value << ", persistent: " << persistent << " for " << kvssink->stream_name); + } + + } else if (gst_structure_has_name(structure, KVS_ADD_EVENT_METADATA_G_STRUCT_NAME) && + gst_structure_get_uint(structure, KVS_ADD_EVENT_METADATA_EVENT, &streamMetadataEvent) && + gst_structure_get(structure, KVS_ADD_EVENT_METADATA_STREAM_EVENT_METADATA, G_TYPE_POINTER, &pStreamEventMetadata, NULL)) { + + LOG_TRACE("Received kvs-add-event-metadata event for " << kvssink->stream_name); + + if(!data->kinesis_video_stream->putEventMetadata(streamMetadataEvent, pStreamEventMetadata)) { + ret = FALSE; + LOG_WARN("Failed to putEventMetadata for " << kvssink->stream_name); + } + } else { ret = FALSE; LOG_WARN("Event structure is invalid or it contains an invalid field(s): " << std::string(gst_structure_to_string (structure)) << " for " << kvssink->stream_name); - goto CleanUp; } - LOG_TRACE("Received kvs-add-metadata event for " << kvssink->stream_name); - - metadata_name = std::string(gst_structure_get_string(structure, KVS_ADD_METADATA_NAME)); - metadata_value = std::string(gst_structure_get_string(structure, KVS_ADD_METADATA_VALUE)); - is_persist = persistent; - bool result = data->kinesis_video_stream->putFragmentMetadata(metadata_name, metadata_value, is_persist); - gst_event_unref (event); event = NULL; - if (!result) { - ret = FALSE; - LOG_WARN("Failed to putFragmentMetadata for name: " << metadata_name << ", value: " << metadata_value << ", persistent: " << is_persist << " for " << kvssink->stream_name); - } - break; } case GST_EVENT_EOS: { @@ -1370,6 +1387,7 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads, std::chrono::nanoseconds(buf->pts), std::chrono::nanoseconds(buf->dts), kinesis_video_flags, track_id, data->frame_count); data->frame_count++; + } else { LOG_WARN("GStreamer buffer is invalid for " << kvssink->stream_name);