Skip to content

Commit ce185a2

Browse files
committed
support adding stream tags in the plugin
1 parent 6e7c281 commit ce185a2

File tree

7 files changed

+97
-33
lines changed

7 files changed

+97
-33
lines changed

kinesis-video-gstreamer-plugin/plugin-src/KvsSinkIotCertCredentialProvider.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
#define CURL_ERROR_SIZE 256
88
#define HTTP_OK 200
99
#define SERVICE_CALL_PREFIX "https://"
10-
#define ROLE_ALIASES "/role-aliases"
10+
#define ROLE_ALIASES_PATH "/role-aliases"
1111
#define CREDENTIAL_SERVICE "/credentials"
1212

1313
LOGGER_TAG("com.amazonaws.kinesis.video.gstkvs");
@@ -43,7 +43,7 @@ void KvsSinkIotCertCredentialProvider::updateCredentials(Credentials &credential
4343
error_buffer[0] = '\0';
4444
const std::string service_url = SERVICE_CALL_PREFIX +
4545
iot_get_credential_endpoint_ +
46-
ROLE_ALIASES + '/' +
46+
ROLE_ALIASES_PATH + '/' +
4747
role_alias_ +
4848
CREDENTIAL_SERVICE;
4949

kinesis-video-gstreamer-plugin/plugin-src/Util/KvsSinkUtil.cpp

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ static const time_t time_point = std::time(NULL);
1414
static const long timezone_offset =
1515
static_cast<long> (std::mktime(std::gmtime(&time_point)) - std::mktime(std::localtime(&time_point)));
1616

17-
static gboolean set_params(GQuark field_id, const GValue *value, gpointer user_data) {
18-
std::map<std::string, std::string> &iot_cert_params = *(std::map<std::string, std::string> *)user_data;
17+
18+
gboolean setParams(GQuark field_id, const GValue *value, gpointer g_ptr_user_map) {
19+
std::map<std::string, std::string> *target_map = reinterpret_cast<std::map<std::string, std::string> *>(g_ptr_user_map);
1920
std::string field_str = std::string(g_quark_to_string (field_id));
2021
std::string value_str;
2122
gboolean ret = TRUE;
@@ -28,33 +29,41 @@ static gboolean set_params(GQuark field_id, const GValue *value, gpointer user_d
2829

2930
value_str = std::string(g_value_get_string(value));
3031

31-
if (iot_param_set.count(field_str) == 0 || value_str.empty()) {
32-
LOG_ERROR("Invalid field: " << field_str << " , value: " << value_str);
32+
if (value_str.empty() || field_str.empty()) {
33+
LOG_ERROR("Field and value should not be empty. field: " << field_str << " , value: " << value_str);
3334
ret = FALSE;
3435
goto CleanUp;
3536
}
3637

37-
iot_cert_params[field_str] = value_str;
38+
target_map->insert(std::pair<std::string,std::string>(field_str, value_str));
3839

3940
CleanUp:
4041
return ret;
4142
}
4243

43-
4444
namespace kvs_sink_util {
4545

46-
gboolean parse_gstructure(GstStructure *g_struct, std::map<std::string, std::string> &iot_cert_params) {
46+
gboolean gstructToMap(GstStructure *g_struct, std::map<std::string, std::string> *user_map) {
47+
std::map<std::string, std::string> temp;
48+
gboolean ret = gst_structure_foreach (g_struct, setParams, user_map);
49+
if (ret) { // if conversion failed, user_map will be unchanged
50+
user_map->insert(temp.begin(), temp.end());
51+
}
52+
return ret;
53+
}
54+
55+
gboolean parseIotCredentialGstructure(GstStructure *g_struct, std::map<std::string, std::string> &iot_cert_params) {
4756
gboolean ret;
4857
std::set<std::string> params_key_set;
4958

50-
ret = gst_structure_foreach (g_struct, set_params, &iot_cert_params);
59+
ret = gstructToMap(g_struct, &iot_cert_params);
5160

5261
if (ret == FALSE) {
5362
goto CleanUp;
5463
}
5564

5665
for(std::map<std::string, std::string>::iterator it = iot_cert_params.begin(); it != iot_cert_params.end();
57-
++it) {
66+
++it) {
5867
params_key_set.insert(it->first);
5968
}
6069
if (params_key_set != iot_param_set) {
@@ -73,12 +82,12 @@ bool parseTimeStr(std::string time_str, std::chrono::duration<uint64_t> &time_ob
7382
bool res = true;
7483
std::tm timeinfo = std::tm();
7584

76-
#if defined(__GNUC__) && (__GNUC__ < 5) && !defined(__APPLE__)
77-
res = strptime(time_str.c_str(), "%Y-%m-%dT%H:%M:%SZ", &timeinfo) != NULL ? true : false;
78-
#else
79-
std::istringstream iss(time_str);
80-
res = iss >> std::get_time(&timeinfo, "%Y-%m-%dT%H:%M:%SZ") ? true : false;
81-
#endif
85+
#if defined(__GNUC__) && (__GNUC__ < 5) && !defined(__APPLE__)
86+
res = strptime(time_str.c_str(), "%Y-%m-%dT%H:%M:%SZ", &timeinfo) != NULL ? true : false;
87+
#else
88+
std::istringstream iss(time_str);
89+
res = iss >> std::get_time(&timeinfo, "%Y-%m-%dT%H:%M:%SZ") ? true : false;
90+
#endif
8291

8392
if (res) {
8493
std::time_t tt = std::mktime(&timeinfo);

kinesis-video-gstreamer-plugin/plugin-src/Util/KvsSinkUtil.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616

1717
namespace kvs_sink_util{
1818

19-
gboolean parse_gstructure(GstStructure *g_struct, std::map<std::string, std::string> &iot_cert_params);
19+
gboolean gstructToMap(GstStructure *g_struct, std::map<std::string, std::string> *user_map);
20+
21+
gboolean parseIotCredentialGstructure(GstStructure *g_struct,
22+
std::map<std::string, std::string> &iot_cert_params);
2023

2124
bool parseTimeStr(std::string time_str, std::chrono::duration<uint64_t> &time_obj);
2225
}

kinesis-video-gstreamer-plugin/plugin-src/gstkvssink.cpp

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ enum {
155155
#define KVS_ADD_METADATA_NAME "name"
156156
#define KVS_ADD_METADATA_VALUE "value"
157157
#define KVS_ADD_METADATA_PERSISTENT "persist"
158+
#define KVS_CLIENT_USER_AGENT_NAME "AWS-SDK-KVS-CLIENT"
158159

159160
enum {
160161
PROP_0,
@@ -195,7 +196,8 @@ enum {
195196
PROP_FRAME_TIMESTAMP,
196197
PROP_STORAGE_SIZE,
197198
PROP_CREDENTIAL_FILE_PATH,
198-
PROP_IOT_CERTIFICATE
199+
PROP_IOT_CERTIFICATE,
200+
PROP_STREAM_TAGS
199201
};
200202

201203
#define GST_TYPE_KVS_SINK_FRAME_TIMESTAMP_TYPE (gst_kvs_sink_frame_timestamp_type_get_type())
@@ -325,7 +327,7 @@ void kinesis_video_producer_init(GstKvsSink *sink) {
325327
credential_provider = make_unique<KvsSinkStaticCredentialProvider>(*sink->credentials_, sink->rotation_period);
326328
} else if (sink->iot_certificate) {
327329
std::map<std::string, std::string> iot_cert_params;
328-
gboolean ret = kvs_sink_util::parse_gstructure(sink->iot_certificate, iot_cert_params);
330+
gboolean ret = kvs_sink_util::parseIotCredentialGstructure(sink->iot_certificate, iot_cert_params);
329331
g_assert_true(ret);
330332
credential_provider = make_unique<KvsSinkIotCertCredentialProvider>(iot_cert_params[IOT_GET_CREDENTIAL_ENDPOINT],
331333
iot_cert_params[CERTIFICATE_PATH],
@@ -340,14 +342,28 @@ void kinesis_video_producer_init(GstKvsSink *sink) {
340342
move(client_callback_provider),
341343
move(stream_callback_provider),
342344
move(credential_provider),
343-
region_str);
345+
region_str,
346+
"",
347+
KVS_CLIENT_USER_AGENT_NAME);
344348
}
345349

346350
void create_kinesis_video_stream(GstKvsSink *sink) {
347351
auto data = sink->data;
352+
353+
map<string, string> *p_stream_tags = nullptr;
354+
map<string, string> stream_tags;
355+
if (sink->stream_tags) {
356+
gboolean ret;
357+
ret = kvs_sink_util::gstructToMap(sink->stream_tags, &stream_tags);
358+
if (!ret) {
359+
LOG_WARN("Failed to parse stream tags");
360+
} else {
361+
p_stream_tags = &stream_tags;
362+
}
363+
}
348364
auto stream_definition = make_unique<StreamDefinition>(sink->stream_name,
349365
hours(sink->retention_period_hours),
350-
nullptr,
366+
p_stream_tags,
351367
sink->kms_key_id,
352368
sink->streaming_type,
353369
sink->content_type,
@@ -608,6 +624,11 @@ gst_kvs_sink_class_init(GstKvsSinkClass *klass) {
608624
"Use aws iot certificate to obtain credentials",
609625
GST_TYPE_STRUCTURE, (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
610626

627+
g_object_class_install_property (gobject_class, PROP_STREAM_TAGS,
628+
g_param_spec_boxed ("stream-tags", "Stream Tags",
629+
"key-value pair that you can define and assign to each stream",
630+
GST_TYPE_STRUCTURE, (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
631+
611632

612633
/**
613634
* GstKvsSink::handoff:
@@ -719,6 +740,9 @@ gst_kvs_sink_finalize(GObject *object) {
719740
if (sink->iot_certificate) {
720741
gst_structure_free (sink->iot_certificate);
721742
}
743+
if (sink->stream_tags) {
744+
gst_structure_free (sink->stream_tags);
745+
}
722746
delete [] sink->frame_data;
723747
G_OBJECT_CLASS (parent_class)->finalize(object);
724748
}
@@ -842,12 +866,21 @@ static void gst_kvs_sink_set_property(GObject *object, guint prop_id,
842866
case PROP_IOT_CERTIFICATE: {
843867
const GstStructure *s = gst_value_get_structure(value);
844868

845-
if (sink->iot_certificate)
869+
if (sink->iot_certificate) {
846870
gst_structure_free(sink->iot_certificate);
847-
871+
}
848872
sink->iot_certificate = s ? gst_structure_copy(s) : NULL;
849873
break;
850874
}
875+
case PROP_STREAM_TAGS: {
876+
const GstStructure *s = gst_value_get_structure(value);
877+
878+
if (sink->stream_tags) {
879+
gst_structure_free(sink->stream_tags);
880+
}
881+
sink->stream_tags = s ? gst_structure_copy(s) : NULL;
882+
break;
883+
}
851884
default:
852885
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
853886
break;
@@ -971,6 +1004,9 @@ static void gst_kvs_sink_get_property(GObject *object, guint prop_id, GValue *va
9711004
case PROP_IOT_CERTIFICATE:
9721005
gst_value_set_structure (value, sink->iot_certificate);
9731006
break;
1007+
case PROP_STREAM_TAGS:
1008+
gst_value_set_structure (value, sink->stream_tags);
1009+
break;
9741010
default:
9751011
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
9761012
break;

kinesis-video-gstreamer-plugin/plugin-src/gstkvssink.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ struct _GstKvsSink {
119119
guint storage_size;
120120
gchar *credential_file_path;
121121
GstStructure *iot_certificate;
122+
GstStructure *stream_tags;
122123

123124
unique_ptr<Credentials> credentials_;
124125
shared_ptr<CustomData> data;

kinesis-video-producer/src/DefaultCallbackProvider.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ const std::string KINESIS_VIDEO_SERVICE_NAME = "kinesisvideo";
3232
const std::string DEFAULT_CONTROL_PLANE_URI = "https://kinesisvideo.us-west-2.amazonaws.com";
3333
const std::string CONTROL_PLANE_URI_PREFIX = "https://";
3434
const std::string CONTROL_PLANE_URI_POSTFIX = ".amazonaws.com";
35-
const std::string DEFAULT_USER_AGENT_NAME = "AWS-SDK-KVS-PRODUCER";
35+
const std::string DEFAULT_USER_AGENT_NAME = "AWS-SDK-KVS";
3636
}
3737

3838
namespace com { namespace amazonaws { namespace kinesis { namespace video {

kinesis-video-producer/tst/ProducerApiTest.cpp

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ PVOID staticProducerRoutine(PVOID arg)
1313
}
1414

1515
PVOID ProducerTestBase::basicProducerRoutine(KinesisVideoStream* kinesis_video_stream) {
16-
UINT32 index = 0;
16+
UINT32 index = 0, persistentMetadataIndex = 0;
1717
UINT64 timestamp = 0;
1818
Frame frame;
19+
std::string metadataNameStr;
1920

2021
// Loop until cancelled
2122
frame.duration = FRAME_DURATION_IN_MICROS * HUNDREDS_OF_NANOS_IN_A_MICROSECOND;
@@ -51,18 +52,32 @@ PVOID ProducerTestBase::basicProducerRoutine(KinesisVideoStream* kinesis_video_s
5152
<< ", Dts: " << frame.decodingTs
5253
<< ", Pts: " << frame.presentationTs);
5354

54-
// Apply some metadata every 20th frame
55+
// Apply some non-persistent metadata every few frames
5556
if (frame.index % 20 == 0) {
56-
std::ostringstream metadataName;
57+
std::ostringstream metadataName, metadataValue;
5758
metadataName << "MetadataNameForFrame_" << frame.index;
58-
std::ostringstream metadataValue;
5959
metadataValue << "MetadataValueForFrame_" << frame.index;
6060
EXPECT_TRUE(kinesis_video_stream->putFragmentMetadata(metadataName.str(), metadataValue.str(), false));
61+
}
62+
63+
// Apply some persistent metadata on a larger intervals to span fragments
64+
if (frame.index % 60 == 0) {
65+
std::ostringstream metadataName, metadataValue;
66+
std::string metadataValueStr;
67+
68+
metadataName << "PersistentMetadataName_" << persistentMetadataIndex;
69+
metadataValue << "PersistentMetadataValue_" << persistentMetadataIndex;
70+
71+
// Set or clear persistent metadata every other time.
72+
if (persistentMetadataIndex % 2 == 0) {
73+
metadataNameStr = metadataName.str();
74+
metadataValueStr = metadataValue.str();
75+
} else {
76+
metadataValueStr = std::string();
77+
}
6178

62-
// Set or clear persistent metadata
63-
metadataName << "PersistentMetadataNameForFrame_" << frame.index;
64-
metadataValue << "PersistentMetadataValueForFrame_" << frame.index;
65-
EXPECT_TRUE(kinesis_video_stream->putFragmentMetadata(metadataName.str(), (frame.index % 2 == 0) ? metadataValue.str() : std::string(), true));
79+
persistentMetadataIndex++;
80+
EXPECT_TRUE(kinesis_video_stream->putFragmentMetadata(metadataNameStr, metadataValueStr, true));
6681
}
6782

6883
EXPECT_TRUE(kinesis_video_stream->putFrame(frame));

0 commit comments

Comments
 (0)