Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ jobs:
id-token: write
contents: read
env:
CC: /usr/local/bin/gcc-13
CXX: /usr/local/bin/g++-13
CC: /usr/local/bin/gcc-12
CXX: /usr/local/bin/g++-12
AWS_KVS_LOG_LEVEL: 2
steps:
- name: Clone repository
Expand Down
4 changes: 4 additions & 0 deletions samples/include.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
#define KVS_ADD_METADATA_VALUE "value"
#define KVS_ADD_METADATA_PERSISTENT "persist"

#define KVS_ADD_EVENT_METADATA_G_STRUCT_NAME "kvs-add-event-metadata"
#define KVS_ADD_EVENT_METADATA_EVENT "event"
#define KVS_ADD_EVENT_METADATA_STREAM_EVENT_METADATA "stream-event-metadata"

#define STATUS_KVS_GSTREAMER_SAMPLE_BASE 0x00080000
#define STATUS_KVS_GSTREAMER_SAMPLE_ERROR STATUS_KVS_GSTREAMER_SAMPLE_BASE + 0x00000001
#define STATUS_KVS_GSTREAMER_SAMPLE_INTERRUPTED STATUS_KVS_GSTREAMER_SAMPLE_BASE + 0x00000002
Expand Down
54 changes: 42 additions & 12 deletions samples/kvssink_gstreamer_sample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ typedef struct _CustomData {
last_unpersisted_file_idx(0),
stream_status(STATUS_SUCCESS),
put_fragment_metadata_frequency_seconds(2),
deliver_images_frequency_seconds(5),
fragment_metadata_timer_id(0),
deliver_images_timer_id(0),
base_pts(0),
max_frame_pts(0),
key_frame_pts(0),
Expand All @@ -67,7 +69,9 @@ typedef struct _CustomData {
char *stream_name;
mutex file_list_mtx;
int put_fragment_metadata_frequency_seconds;
int deliver_images_frequency_seconds;
int fragment_metadata_timer_id;
int deliver_images_timer_id;
int metadata_counter = 0;
bool persist_flag = true;

Expand Down Expand Up @@ -256,17 +260,17 @@ void determine_credentials(GstElement *kvssink, CustomData *data) {
nullptr != (private_key_path = getenv("PRIVATE_KEY_PATH")) &&
nullptr != (role_alias = getenv("ROLE_ALIAS")) &&
nullptr != (ca_cert_path = getenv("CA_CERT_PATH"))) {
// set the IoT Credentials if provided in envvar
GstStructure *iot_credentials = gst_structure_new(
"iot-certificate",
"iot-thing-name", G_TYPE_STRING, data->stream_name,
"endpoint", G_TYPE_STRING, iot_credential_endpoint,
"cert-path", G_TYPE_STRING, cert_path,
"key-path", G_TYPE_STRING, private_key_path,
"ca-path", G_TYPE_STRING, ca_cert_path,
"role-aliases", G_TYPE_STRING, role_alias, NULL);
g_object_set(G_OBJECT (kvssink), "iot-certificate", iot_credentials, NULL);
// set the IoT Credentials if provided in envvar
GstStructure *iot_credentials = gst_structure_new(
"iot-certificate",
"iot-thing-name", G_TYPE_STRING, data->stream_name,
"endpoint", G_TYPE_STRING, iot_credential_endpoint,
"cert-path", G_TYPE_STRING, cert_path,
"key-path", G_TYPE_STRING, private_key_path,
"ca-path", G_TYPE_STRING, ca_cert_path,
"role-aliases", G_TYPE_STRING, role_alias, NULL);
g_object_set(G_OBJECT (kvssink), "iot-certificate", iot_credentials, NULL);
gst_structure_free(iot_credentials);
// kvssink will search for long term credentials in envvar automatically so no need to include here
// if no long credentials or IoT credentials provided will look for credential file as last resort
Expand All @@ -288,6 +292,18 @@ bool put_fragment_metadata(GstElement* element, const std::string name, const st
return gst_element_send_event(element, event);
}

/*
This function creates a GstStructure and uses it to trigger the GST_EVENT_CUSTOM_DOWNSTREAM for put_event_metadata
*/
bool put_event_metadata(GstElement* element, const uint32_t metadataEvent, const PStreamEventMetadata pStreamEventMetadata) {
GstStructure *metadata = gst_structure_new_empty(KVS_ADD_EVENT_METADATA_G_STRUCT_NAME);
gst_structure_set(metadata, KVS_ADD_EVENT_METADATA_EVENT, G_TYPE_UINT, metadataEvent,
KVS_ADD_EVENT_METADATA_STREAM_EVENT_METADATA, G_TYPE_POINTER, pStreamEventMetadata, NULL);
GstEvent* event = gst_event_new_custom(GST_EVENT_CUSTOM_DOWNSTREAM, metadata);
LOG_TRACE("Emit the images to be delivered event with structure: " << std::string(gst_structure_to_string (metadata)));
return gst_element_send_event(element, event);
}

/*
Function to put fragment metadata: name, value, and persist values
This is a sample function. This function alternates between putting persistent and non-persistent metadata
Expand Down Expand Up @@ -333,6 +349,19 @@ static void put_metadata(GstElement* element) {
}
}

/*
Function to deliver images by calling put_event_metadata. Customers can write their own "deliver_images" or other similar function
to put event metadata. Currently this function is being called every five seconds using a timer. This logic can be modified
and the trigger for this function can also be customized by the customer
*/
static void deliver_images(GstElement* element) {
if (!put_event_metadata(element, STREAM_EVENT_TYPE_IMAGE_GENERATION, NULL)) {
g_source_remove(data_global.deliver_images_timer_id);
data_global.deliver_images_timer_id = 0;
LOG_WARN("Failed to deliver image, removing timer");
}
}

int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElement *pipeline, GstElement *kvssink) {

bool vtenc = false, isOnRpi = false;
Expand Down Expand Up @@ -795,7 +824,8 @@ int gstreamer_init(int argc, char *argv[], CustomData *data) {

// Create a GStreamer timer to generate and put fragment metadata tags every 2 seconds
data->fragment_metadata_timer_id = g_timeout_add_seconds(data->put_fragment_metadata_frequency_seconds, reinterpret_cast<GSourceFunc>(put_metadata), kvssink);

data->deliver_images_timer_id = g_timeout_add_seconds(data->deliver_images_frequency_seconds, reinterpret_cast<GSourceFunc>(deliver_images), kvssink);

/* start streaming */
gst_ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);
if (gst_ret == GST_STATE_CHANGE_FAILURE) {
Expand Down
10 changes: 10 additions & 0 deletions src/KinesisVideoStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,16 @@ bool KinesisVideoStream::putFragmentMetadata(const std::string &name, const std:
return true;
}

bool KinesisVideoStream::putEventMetadata(uint32_t event, PStreamEventMetadata pStreamEventMetadata){
STATUS status = ::putKinesisVideoEventMetadata(stream_handle_, event, pStreamEventMetadata);
if (STATUS_FAILED(status)) {
LOG_ERROR("Failed to put event: " << status << " for " << this->stream_name_);
return false;
}

return true;
}

KinesisVideoStream::~KinesisVideoStream() {
free();
}
Expand Down
12 changes: 12 additions & 0 deletions src/KinesisVideoStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ class KinesisVideoStream {
*/
bool putFragmentMetadata(const std::string& name, const std::string& value, bool persistent = true);

/*
* Inserts a KVS event(s) accompanied by optional metadata (key/value string pairs) into the stream.
* Multiple events can be submitted at once by using bitwise OR of event types, or multiple calls of this
* function with different unique events.
* @param 1 uint32_t - the type of event(s), a value from STREAM_EVENT_TYPE enum. If
* if you want to submit multiple events in one call it is suggested to use bit-wise
* OR combination from STREAM_EVENT_TYPE enum.
* @param 2 PStreamEventMetadata - pointer to struct with optional metadata. This metadata will be applied
* to all events included in THIS function call.
*/
bool putEventMetadata(uint32_t event, PStreamEventMetadata pStreamEventMetadata);

/**
* Initializes the track identified by trackId with a hex-encoded codec private data
* and puts the stream in a state that it is ready to receive frames via putFrame().
Expand Down
56 changes: 37 additions & 19 deletions src/gstreamer/gstkvssink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ GST_DEBUG_CATEGORY_STATIC (gst_kvs_sink_debug);
#define KVS_ADD_METADATA_VALUE "value"
#define KVS_ADD_METADATA_PERSISTENT "persist"

#define KVS_ADD_EVENT_METADATA_G_STRUCT_NAME "kvs-add-event-metadata"
#define KVS_ADD_EVENT_METADATA_EVENT "event"
#define KVS_ADD_EVENT_METADATA_STREAM_EVENT_METADATA "stream-event-metadata"

#define KVS_CLIENT_USER_AGENT_NAME "AWS-SDK-KVS-CPP-CLIENT"

#define DEFAULT_AUDIO_TRACK_NAME "audio"
Expand Down Expand Up @@ -1158,34 +1162,47 @@ gst_kvs_sink_handle_sink_event (GstCollectPads *pads,
}
case GST_EVENT_CUSTOM_DOWNSTREAM: {
const GstStructure *structure = gst_event_get_structure(event);
std::string metadata_name, metadata_value;
uint32_t streamMetadataEvent;
gboolean persistent;
bool is_persist;
PStreamEventMetadata pStreamEventMetadata;

if (gst_structure_has_name(structure, KVS_ADD_METADATA_G_STRUCT_NAME) &&
NULL != gst_structure_get_string(structure, KVS_ADD_METADATA_NAME) &&
NULL != gst_structure_get_string(structure, KVS_ADD_METADATA_VALUE) &&
gst_structure_get_boolean(structure, KVS_ADD_METADATA_PERSISTENT, &persistent)) {

std::string metadata_name, metadata_value;

LOG_TRACE("Received kvs-add-metadata event for " << kvssink->stream_name);

metadata_name = std::string(gst_structure_get_string(structure, KVS_ADD_METADATA_NAME));
metadata_value = std::string(gst_structure_get_string(structure, KVS_ADD_METADATA_VALUE));

persistent = (bool) persistent;

if (!gst_structure_has_name(structure, KVS_ADD_METADATA_G_STRUCT_NAME) ||
NULL == gst_structure_get_string(structure, KVS_ADD_METADATA_NAME) ||
NULL == gst_structure_get_string(structure, KVS_ADD_METADATA_VALUE) ||
!gst_structure_get_boolean(structure, KVS_ADD_METADATA_PERSISTENT, &persistent)) {
if (!data->kinesis_video_stream->putFragmentMetadata(metadata_name, metadata_value, persistent)) {
ret = FALSE;
LOG_WARN("Failed to putFragmentMetadata for name: " << metadata_name << ", value: " << metadata_value << ", persistent: " << persistent << " for " << kvssink->stream_name);
}

} else if (gst_structure_has_name(structure, KVS_ADD_EVENT_METADATA_G_STRUCT_NAME) &&
gst_structure_get_uint(structure, KVS_ADD_EVENT_METADATA_EVENT, &streamMetadataEvent) &&
gst_structure_get(structure, KVS_ADD_EVENT_METADATA_STREAM_EVENT_METADATA, G_TYPE_POINTER, &pStreamEventMetadata, NULL)) {

LOG_TRACE("Received kvs-add-event-metadata event for " << kvssink->stream_name);

if(!data->kinesis_video_stream->putEventMetadata(streamMetadataEvent, pStreamEventMetadata)) {
ret = FALSE;
LOG_WARN("Failed to putEventMetadata for " << kvssink->stream_name);
}
} else {
ret = FALSE;
LOG_WARN("Event structure is invalid or it contains an invalid field(s): " << std::string(gst_structure_to_string (structure)) << " for " << kvssink->stream_name);
goto CleanUp;
}
LOG_TRACE("Received kvs-add-metadata event for " << kvssink->stream_name);

metadata_name = std::string(gst_structure_get_string(structure, KVS_ADD_METADATA_NAME));
metadata_value = std::string(gst_structure_get_string(structure, KVS_ADD_METADATA_VALUE));
is_persist = persistent;

bool result = data->kinesis_video_stream->putFragmentMetadata(metadata_name, metadata_value, is_persist);

gst_event_unref (event);
event = NULL;

if (!result) {
ret = FALSE;
LOG_WARN("Failed to putFragmentMetadata for name: " << metadata_name << ", value: " << metadata_value << ", persistent: " << is_persist << " for " << kvssink->stream_name);
}

break;
}
case GST_EVENT_EOS: {
Expand Down Expand Up @@ -1370,6 +1387,7 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads,
std::chrono::nanoseconds(buf->pts),
std::chrono::nanoseconds(buf->dts), kinesis_video_flags, track_id, data->frame_count);
data->frame_count++;

}
else {
LOG_WARN("GStreamer buffer is invalid for " << kvssink->stream_name);
Expand Down