Skip to content

Commit d459f7e

Browse files
committed
Add a file uploader example
1 parent 5cdb5cf commit d459f7e

File tree

2 files changed

+290
-0
lines changed

2 files changed

+290
-0
lines changed

CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,9 @@ if(BUILD_GSTREAMER_PLUGIN)
220220

221221
add_executable(kinesis_video_gstreamer_audio_video_sample_app samples/kinesis_video_gstreamer_audio_video_sample_app.cpp)
222222
target_link_libraries(kinesis_video_gstreamer_audio_video_sample_app ${GST_APP_LIBRARIES} KinesisVideoProducer)
223+
224+
add_executable(kvs_gstreamer_file_uploader_sample samples/kvs_gstreamer_file_uploader_sample.cpp)
225+
target_link_libraries(kvs_gstreamer_file_uploader_sample ${GST_APP_LIBRARIES})
223226
endif()
224227

225228
if(BUILD_TEST)
Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
#include <gst/gst.h>
2+
#include <string.h>
3+
#include <stdio.h>
4+
#include <string>
5+
#include <sstream>
6+
7+
using namespace std;
8+
9+
#ifdef __cplusplus
10+
extern "C" {
11+
#endif
12+
13+
int gstreamer_init(int, char **);
14+
15+
#ifdef __cplusplus
16+
}
17+
#endif
18+
19+
#define DEFAULT_RETRY_COUNT 3
20+
21+
#define PROPERTY_PREFIX "KVS_"
22+
#define PROPERTY_KEY_MAX_LEN 32
23+
#define KVS_SINK_PLUGIN_NAME "kvssink"
24+
25+
#define STREAM_STATUS_OK 0
26+
#define STREAM_STATUS_FAILED 1
27+
28+
#define CONTENT_TYPE_VIDEO_ONLY 0
29+
#define CONTENT_TYPE_AUDIO_VIDEO 1
30+
31+
#define APP_NAME "kvs_gstreamer_file_uploader_sample"
32+
#define LOG_INFO(fmt, ...) \
33+
do { fprintf(stdout, "[INFO] " APP_NAME ": " fmt "\n", ##__VA_ARGS__); } while(0)
34+
#define LOG_ERROR(fmt, ...) \
35+
do { fprintf(stderr, "[ERROR] " APP_NAME ": " fmt "\n", ##__VA_ARGS__); } while(0)
36+
37+
static const char* AVAILABLE_PROPERTIES[] = {
38+
PROPERTY_PREFIX "ABSOLUTE_FRAGMENT_TIMES", "Use absolute fragment time",
39+
PROPERTY_PREFIX "ACCESS_KEY", "AWS Access Key",
40+
PROPERTY_PREFIX "AVG_BANDWIDTH_BPS", "Average bandwidth bps",
41+
PROPERTY_PREFIX "AWS_REGION", "AWS Region",
42+
PROPERTY_PREFIX "BUFFER_DURATION", "Buffer duration. Unit: seconds",
43+
PROPERTY_PREFIX "FILE_START_TIME", "Epoch time that the file starts in kinesis video stream. By default, current time is used. Unit: Seconds",
44+
PROPERTY_PREFIX "FRAGMENT_ACKS", "Do fragment acks",
45+
PROPERTY_PREFIX "FRAGMENT_DURATION", "Fragment Duration. Unit: miliseconds",
46+
PROPERTY_PREFIX "FRAMERATE", "Framerate",
47+
PROPERTY_PREFIX "IOT_CERTIFICATE", "Use aws iot certificate to obtain credentials",
48+
PROPERTY_PREFIX "MAX_LATENCY", "Max Latency. Unit: seconds",
49+
PROPERTY_PREFIX "RETENTION_PERIOD", "Length of time stream is preserved. Unit: hours",
50+
PROPERTY_PREFIX "ROTATION_PERIOD", "Rotation Period. Unit: seconds",
51+
PROPERTY_PREFIX "SECRET_KEY", "AWS Secret Key",
52+
PROPERTY_PREFIX "STREAM_NAME", "Name of the destination stream",
53+
PROPERTY_PREFIX "STREAMING_TYPE", "Streaming type",
54+
PROPERTY_PREFIX "STORAGE_SIZE", "Storage Size. Unit: MB",
55+
PROPERTY_PREFIX "TIMECODE_SCALE", "Timecode Scale. Unit: milliseconds",
56+
NULL
57+
};
58+
59+
typedef struct _CustomData {
60+
_CustomData():
61+
stream_status(STREAM_STATUS_OK) {}
62+
int stream_status;
63+
int content_type;
64+
string file_path;
65+
// kvssink_str contains a gstreamer pipeline syntax for kvssink plugin. All of the properties are filled
66+
// through environment variables.
67+
//
68+
// For example:
69+
// If the program has been run with the following environment variables:
70+
// KVS_STREAM_NAME=file-uploader-sample
71+
// KVS_MAX_LATENCY=60
72+
//
73+
// kvssink_str is going to be equal to the following:
74+
// kvssink stream-name=file-uploader-sample max-latency=60
75+
string kvssink_str;
76+
} CustomData;
77+
78+
/* This function is called when an error message is posted on the bus */
79+
static void error_cb(GstBus *bus, GstMessage *msg, CustomData *data) {
80+
GError *err;
81+
gchar *debug_info;
82+
83+
/* Print error details on the screen */
84+
gst_message_parse_error(msg, &err, &debug_info);
85+
LOG_ERROR("Error received from element %s: %s", GST_OBJECT_NAME (msg->src), err->message);
86+
LOG_ERROR("Debugging information: %sn", debug_info ? debug_info : "none");
87+
g_clear_error(&err);
88+
g_free(debug_info);
89+
90+
data->stream_status = STREAM_STATUS_FAILED;
91+
}
92+
93+
int gstreamer_init(int argc, char* argv[], CustomData *data) {
94+
GstElement *pipeline;
95+
GstMessage *msg;
96+
GstStateChangeReturn gst_ret;
97+
GError *error = NULL;
98+
string file_path = data->file_path;
99+
const char* demuxer = NULL;
100+
char pipeline_buf[4096];
101+
int ret;
102+
103+
// reset state
104+
data->stream_status = STREAM_STATUS_OK;
105+
106+
/* init GStreamer */
107+
LOG_INFO("Building gstreamer pipeline");
108+
gst_init(&argc, &argv);
109+
110+
string file_suffix = file_path.substr(file_path.size() - 3);
111+
if (file_suffix.compare("mkv") == 0) {
112+
demuxer = "matroskademux";
113+
} else if (file_suffix.compare("mp4") == 0) {
114+
demuxer = "qtdemux";
115+
} else if (file_suffix.compare(".ts") == 0) {
116+
demuxer = "tsdemux";
117+
} else {
118+
LOG_ERROR("File format not supported. Supported ones are mp4, mkv and ts. File suffix: %s", file_suffix.c_str());
119+
return 1;
120+
}
121+
122+
if (data->content_type == CONTENT_TYPE_VIDEO_ONLY) { // video only
123+
ret = snprintf(pipeline_buf, sizeof(pipeline_buf),
124+
"filesrc location=%s ! %s ! h264parse ! video/x-h264,stream-format=avc,alignment=au ! %s",
125+
file_path.c_str(), demuxer, data->kvssink_str.c_str()
126+
);
127+
} else { // audio-video
128+
ret = snprintf(pipeline_buf, sizeof(pipeline_buf),
129+
"filesrc location=%s ! %s name=demuxer "
130+
"demuxer. ! queue ! h264parse ! video/x-h264,stream-format=avc,alignment=au ! %s name=sink "
131+
"demuxer. ! queue ! aacparse ! audio/mpeg,stream-format=raw ! sink.",
132+
file_path.c_str(), demuxer, data->kvssink_str.c_str()
133+
);
134+
}
135+
if (ret < 0) {
136+
LOG_ERROR("Pipeline is too long");
137+
return ret;
138+
}
139+
140+
pipeline = gst_parse_launch(pipeline_buf, &error);
141+
if (error != NULL) {
142+
LOG_ERROR("Failed to construct pipeline: %s", error->message);
143+
g_clear_error(&error);
144+
return 1;
145+
}
146+
147+
/* Instruct the bus to emit signals for each received message, and connect to the interesting signals */
148+
GstBus *bus = gst_element_get_bus(pipeline);
149+
gst_bus_add_signal_watch(bus);
150+
g_signal_connect (G_OBJECT(bus), "message::error", (GCallback) error_cb, data);
151+
gst_object_unref(bus);
152+
153+
/* start streaming */
154+
LOG_INFO("Streaming from file source\n");
155+
gst_ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);
156+
if (gst_ret == GST_STATE_CHANGE_FAILURE) {
157+
LOG_ERROR("Unable to set the pipeline to the playing state.");
158+
gst_object_unref(pipeline);
159+
return 1;
160+
}
161+
162+
/* Wait until error or EOS */
163+
bus = gst_element_get_bus(pipeline);
164+
msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE, (GstMessageType) (GST_MESSAGE_ERROR | GST_MESSAGE_EOS));
165+
166+
/* free resources */
167+
if (msg != NULL) {
168+
gst_message_unref(msg);
169+
}
170+
gst_bus_remove_signal_watch(bus);
171+
gst_object_unref(bus);
172+
gst_element_set_state(pipeline, GST_STATE_NULL);
173+
gst_object_unref(pipeline);
174+
return 0;
175+
}
176+
177+
string build_kvssink_str() {
178+
const char** property;
179+
stringstream ss;
180+
const char *key_raw, *value;
181+
int prefix_len = strlen(PROPERTY_PREFIX);
182+
char key[PROPERTY_KEY_MAX_LEN + 1];
183+
char *ch;
184+
185+
ss << KVS_SINK_PLUGIN_NAME;
186+
for (property = AVAILABLE_PROPERTIES; *property != NULL; property += 2) {
187+
key_raw = property[0];
188+
value = getenv(key_raw);
189+
if (value != NULL) {
190+
LOG_INFO("Found a property. Key: %s Value: %s", key_raw, value);
191+
192+
// Remove property prefix and convert it into proper gstreamer syntax
193+
strncpy(key, key_raw + prefix_len, PROPERTY_KEY_MAX_LEN);
194+
for (ch = key; *ch != '\0'; ch++) {
195+
if (*ch == '_') {
196+
*ch = '-';
197+
} else {
198+
*ch = *ch - 'A' + 'a';
199+
}
200+
}
201+
ss << ' ' << key << '=' << value;
202+
}
203+
}
204+
205+
return ss.str();
206+
}
207+
208+
void print_usage(char* program_path) {
209+
char padding[PROPERTY_KEY_MAX_LEN + 1];
210+
const char **property;
211+
int spaces;
212+
memset(padding, ' ', PROPERTY_KEY_MAX_LEN + 1);
213+
214+
printf(
215+
"Usage\n"
216+
" %1$s <path/to/file.mp4> [video-only|audio-video]\n\n"
217+
"Example\n"
218+
" KVS_MAX_LATENCY=60 %1$s video.mp4\n\n"
219+
"Available Properties\n", program_path);
220+
221+
for (property = AVAILABLE_PROPERTIES; *property != NULL; property += 2) {
222+
spaces = PROPERTY_KEY_MAX_LEN - strlen(property[0]);
223+
if (spaces < 0) {
224+
spaces = 0;
225+
}
226+
padding[spaces] = '\0';
227+
printf(" %s%s%s\n", property[0], padding, property[1]);
228+
padding[spaces] = ' ';
229+
}
230+
}
231+
232+
int main(int argc, char* argv[]) {
233+
if (argc < 2 || argc > 3) {
234+
print_usage(argv[0]);
235+
return 1;
236+
}
237+
238+
CustomData data;
239+
int ret = 0;
240+
int retry_count = DEFAULT_RETRY_COUNT;
241+
int stream_status = STREAM_STATUS_OK;
242+
bool do_retry = true;
243+
244+
if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-h") == 0) {
245+
print_usage(argv[0]);
246+
return 1;
247+
}
248+
data.file_path = argv[1];
249+
data.kvssink_str = build_kvssink_str();
250+
data.content_type = CONTENT_TYPE_VIDEO_ONLY;
251+
252+
if (argc > 2) {
253+
if (strcmp(argv[2], "audio-video") == 0) {
254+
LOG_INFO("Uploading audio and video");
255+
data.content_type = CONTENT_TYPE_AUDIO_VIDEO;
256+
} else if (strcmp(argv[2], "video-only") == 0) {
257+
LOG_INFO("Uploading video only");
258+
} else {
259+
LOG_INFO("Unrecognized upload type. Default to video-only");
260+
}
261+
} else {
262+
LOG_INFO("No upload type specified. Default to video-only");
263+
}
264+
265+
do {
266+
LOG_INFO("Attempt to upload file: %s", data.file_path.c_str());
267+
268+
// control will return after gstreamer_init after file eos or any GST_ERROR was put on the bus.
269+
ret = gstreamer_init(argc, argv, &data);
270+
if (ret != 0) {
271+
LOG_ERROR(
272+
"Failed to initialize gstreamer pipeline. Have you set GST_PLUGIN_PATH properly?\n\n"
273+
" For example: export GST_PLUGIN_PATH=<YourSdkFolderPath>/build:$GST_PLUGIN_PATH");
274+
do_retry = false;
275+
} else if (data.stream_status == STREAM_STATUS_OK) {
276+
LOG_INFO("Persisted successfully. File: %s", data.file_path.c_str());
277+
do_retry = false;
278+
} else if (retry_count == 0) {
279+
LOG_ERROR("Failed to persist %s even after retrying", data.file_path.c_str());
280+
do_retry = false;
281+
} else {
282+
LOG_INFO("Failed to persist %s, retrying...", data.file_path.c_str());
283+
}
284+
} while(do_retry);
285+
286+
return 0;
287+
}

0 commit comments

Comments
 (0)