Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit ad89b18

Browse files
authored
Add vp8 support for analytics (#1031)
1 parent 1479038 commit ad89b18

File tree

9 files changed

+241
-74
lines changed

9 files changed

+241
-74
lines changed

source/agent/analytics/analytics-agent.js

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,6 @@ createInternalConnection(connectionId, direction, internalOpt) {
8383
this.onStatus(options.controller, connectionId, 'out', status);
8484

8585
const newStreamId = algo + options.media.video.from;
86-
const streamInfo = {
87-
type: 'analytics',
88-
media: {video: Object.assign({}, videoFormat, videoParameters)},
89-
analyticsId: connectionId,
90-
locality: {agent:this.agentId, node:this.rpcId},
91-
};
9286

9387
const pluginName = this.algorithms[algo].name;
9488
let codec = videoFormat.codec;
@@ -102,14 +96,11 @@ createInternalConnection(connectionId, direction, internalOpt) {
10296
log.debug('resolution:',resolution,'framerate:',framerate,'keyFrameInterval:',
10397
keyFrameInterval, 'bitrate:',bitrate);
10498

105-
this.engine.setOutputParam(codec,resolution,framerate,bitrate,keyFrameInterval,algo,pluginName);
99+
this.engine.setInputParam(codec,resolution,framerate,bitrate,keyFrameInterval,algo,pluginName);
106100
if (this.engine.createPipeline() < 0) {
107101
return Promise.reject('Create pipeline failed');
108102
}
109103

110-
streamInfo.media.video.bitrate = bitrate;
111-
this.onStreamGenerated(options.controller, newStreamId, streamInfo);
112-
113104
if (this.engine.addElementMany() < 0) {
114105
return Promise.reject('Link element failed');
115106
}
@@ -125,6 +116,43 @@ createInternalConnection(connectionId, direction, internalOpt) {
125116
notifyStatus(options.controller, connectionId, 'out', {type: 'failed', reason: 'Analytics error: ' + error});
126117
});
127118

119+
var generateStream = this.onStreamGenerated;
120+
const streamInfo = {
121+
type: 'analytics',
122+
media: {video: Object.assign({}, videoFormat, videoParameters)},
123+
analyticsId: connectionId,
124+
locality: {agent:this.agentId, node:this.rpcId},
125+
};
126+
this.engine.addEventListener('streamadded', function (data) {
127+
log.debug('GStreamer pipeline stream generated:', data);
128+
var params;
129+
try {
130+
params = JSON.parse(data)
131+
streamInfo.media.video.bitrate = bitrate;
132+
if (params.codec) {
133+
streamInfo.media.video.codec = params.codec;
134+
}
135+
136+
if (params.profile) {
137+
if (params.profile === "constrained-baseline") {
138+
streamInfo.media.video.profile = 'CB';
139+
} else if (params.profile === "baseline") {
140+
streamInfo.media.video.profile = 'B';
141+
} else if (params.profile === "main") {
142+
streamInfo.media.video.profile = 'M';
143+
} else if (params.profile === "high") {
144+
streamInfo.media.video.profile = 'H';
145+
} else {
146+
log.error("Not supported profile:", params.profile);
147+
return;
148+
}
149+
}
150+
generateStream(options.controller, newStreamId, streamInfo);
151+
} catch (e) {
152+
log.error("Parse stream added data with error:", e);
153+
}
154+
});
155+
128156
return Promise.resolve();
129157
}
130158
return Promise.reject('Connection already exist: ' + connectionId);

source/agent/analytics/videoGstPipeline/GstInternalIn.cpp

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
#include "GstInternalIn.h"
66
#include <gst/gst.h>
7-
#include <stdio.h>
87

98
static void dump(void* index, uint8_t* buf, int len)
109
{
@@ -18,6 +17,18 @@ static void dump(void* index, uint8_t* buf, int len)
1817
}
1918
}
2019

20+
static void mem_put_le16(uint8_t *mem, unsigned int val) {
21+
mem[0] = val;
22+
mem[1] = val >> 8;
23+
}
24+
25+
static void mem_put_le32(uint8_t *mem, unsigned int val) {
26+
mem[0] = val;
27+
mem[1] = val >> 8;
28+
mem[2] = val >> 16;
29+
mem[3] = val >> 24;
30+
}
31+
2132
DEFINE_LOGGER(GstInternalIn, "GstInternalIn");
2233
GstInternalIn::GstInternalIn(GstAppSrc *data, unsigned int minPort, unsigned int maxPort, std::string ticket)
2334
{
@@ -33,6 +44,8 @@ GstInternalIn::GstInternalIn(GstAppSrc *data, unsigned int minPort, unsigned int
3344
m_needKeyFrame = true;
3445
m_start = false;
3546
m_dumpIn = false;
47+
num_frames = 0;
48+
m_framerate = 30;
3649
char* pIn = std::getenv("DUMP_ANALYTICS_IN");
3750
if(pIn != NULL) {
3851
ELOG_INFO("Dump analytics in stream");
@@ -52,6 +65,13 @@ unsigned int GstInternalIn::getListeningPort()
5265

5366
void GstInternalIn::setPushData(bool status){
5467
m_start = status;
68+
if (!m_start) {
69+
m_needKeyFrame = true;
70+
}
71+
}
72+
73+
void GstInternalIn::setFramerate(int framerate) {
74+
m_framerate = framerate;
5575
}
5676

5777
void GstInternalIn::onFeedback(const owt_base::FeedbackMsg& msg)
@@ -97,16 +117,56 @@ void GstInternalIn::onTransportData(char* buf, int len)
97117
}
98118
}
99119

120+
121+
size_t allocLength = payloadLength + headerLength;
122+
uint8_t ivf_header[32] = {0};
123+
uint8_t ivf_frame_header[12] = {0};
124+
size_t ivf_header_length = 0;
125+
size_t ivf_frame_header_length = 0;
126+
127+
if (frame->format == owt_base::FRAME_FORMAT_VP8) {
128+
if (num_frames == 0) {
129+
ivf_header[0] = 'D';
130+
ivf_header[1] = 'K';
131+
ivf_header[2] = 'I';
132+
ivf_header[3] = 'F';
133+
134+
mem_put_le16(ivf_header+4, 0);
135+
mem_put_le16(ivf_header+6, 32);
136+
ivf_header[8] = 'V';
137+
ivf_header[9] = 'P';
138+
ivf_header[10] = '8';
139+
ivf_header[11] = '0';
140+
mem_put_le16(ivf_header+12, frame->additionalInfo.video.width);
141+
mem_put_le16(ivf_header+14, frame->additionalInfo.video.height);
142+
mem_put_le32(ivf_header+16, m_framerate);
143+
mem_put_le32(ivf_header+20, 1);
144+
mem_put_le32(ivf_header+24, num_frames);
145+
mem_put_le32(ivf_header+28, 0);
146+
ivf_header_length = 32;
147+
}
148+
mem_put_le32(ivf_frame_header, payloadLength);
149+
mem_put_le32(ivf_frame_header+4, num_frames);
150+
ivf_frame_header_length = 12;
151+
}
152+
100153
/* Create a new empty buffer */
101-
buffer = gst_buffer_new_and_alloc (payloadLength + headerLength);
154+
buffer = gst_buffer_new_and_alloc (payloadLength + ivf_header_length + ivf_frame_header_length);
102155
gst_buffer_map(buffer, &map, GST_MAP_WRITE);
103-
memcpy(map.data, frame, headerLength);
104-
memcpy(map.data + headerLength, frame->payload, payloadLength);
156+
if (frame->format == owt_base::FRAME_FORMAT_VP8) {
157+
if (num_frames == 0) {
158+
memcpy(map.data, ivf_header, ivf_header_length);
159+
}
160+
memcpy(map.data + ivf_header_length, ivf_frame_header, ivf_frame_header_length);
161+
}
162+
163+
memcpy(map.data + ivf_header_length + ivf_frame_header_length, frame->payload, payloadLength);
105164

106165
if(m_dumpIn) {
107166
dump(this, map.data, map.size);
108167
}
109168

169+
110170
gst_buffer_unmap(buffer, &map);
111171
g_signal_emit_by_name(appsrc, "push-buffer", buffer, &ret);
112172

@@ -116,6 +176,7 @@ void GstInternalIn::onTransportData(char* buf, int len)
116176
ELOG_DEBUG("Push buffer to appsrc got error\n");
117177
m_start=false;
118178
}
179+
num_frames++;
119180

120181
break;
121182
}

source/agent/analytics/videoGstPipeline/GstInternalIn.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,14 @@ class GstInternalIn : public owt_base::RawTransportListener{
2929
void onTransportError() { }
3030
void onTransportConnected() { }
3131
void setPushData(bool status);
32+
void setFramerate(int framerate);
3233

3334
private:
3435
bool m_start;
3536
bool m_needKeyFrame;
3637
bool m_dumpIn;
38+
size_t num_frames;
39+
int m_framerate;
3740
GstAppSrc *appsrc;
3841
boost::shared_ptr<owt_base::RawTransportInterface> m_transport;
3942
};

source/agent/analytics/videoGstPipeline/VideoGstAnalyzer.cc

Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ static void dump(void* index, uint8_t* buf, int len)
7979
}
8080
}
8181

82+
8283
VideoGstAnalyzer::VideoGstAnalyzer(EventRegistry *handle) : m_asyncHandle(handle)
8384
{
8485
ELOG_INFO("Init");
@@ -142,7 +143,6 @@ gboolean VideoGstAnalyzer::StreamEventCallBack(GstBus *bus, GstMessage *message,
142143
g_main_loop_quit(pStreamObj->loop);
143144
break;
144145
case GST_MESSAGE_TAG:{
145-
/* end-of-stream */
146146
GstTagList *tags = NULL;
147147
gst_message_parse_tag (message, &tags);
148148

@@ -151,14 +151,58 @@ gboolean VideoGstAnalyzer::StreamEventCallBack(GstBus *bus, GstMessage *message,
151151
break;
152152
}
153153
case GST_MESSAGE_QOS:{
154-
/* end-of-stream */
155154
ELOG_DEBUG("Got QOS message from %s \n",message->src->name);
156155
break;
157156
}
158157
case GST_MESSAGE_STATE_CHANGED:{
159158
GstState old_state, new_state, pending_state;
160159
gst_message_parse_state_changed(message, &old_state, &new_state, &pending_state);
161-
ELOG_DEBUG("State change from %d to %d, play:%d \n",old_state, new_state, GST_STATE_PAUSED);
160+
ELOG_DEBUG("State change from %d to %d, from:%d \n",old_state, new_state, message->src->name);
161+
if (strcmp(message->src->name,"appsink") == 0 && new_state == GST_STATE_PLAYING) {
162+
GstPad *pad = NULL;
163+
GstCaps *caps = NULL;
164+
pad = gst_element_get_static_pad (pStreamObj->sink, "sink");
165+
if (!pad) {
166+
ELOG_ERROR("Could not retrieve sink pad of sink element\n");
167+
} else {
168+
caps = gst_pad_get_current_caps (pad);
169+
if (!caps)
170+
caps = gst_pad_query_caps (pad, NULL);
171+
172+
/* Print and free */
173+
const gchar *type;
174+
int width, height;
175+
std::string str;
176+
GstStructure *structure;
177+
structure = gst_caps_get_structure (caps, 0);
178+
ELOG_DEBUG("Caps for the sink pad type is:%s %s\n",gst_structure_get_name(structure), gst_caps_to_string(caps));
179+
type = gst_structure_get_name(structure);
180+
if (strstr(type, "h264") != NULL) {
181+
str.append("{\"codec\":\"h264\",\"profile\":\"");
182+
str.append(gst_structure_get_string(structure, "profile"));
183+
str.append("\",");
184+
pStreamObj->outputcodec = "h264";
185+
} else if (strstr(type, "vp8") != NULL) {
186+
str.append("{\"codec\":\"vp8\",");
187+
pStreamObj->outputcodec = "vp8";
188+
}
189+
190+
gst_structure_get_int (structure, "width", &width);
191+
gst_structure_get_int (structure, "height", &height);
192+
193+
str.append("\"width\":");
194+
str.append(std::to_string(width));
195+
str.append(",\"height\":");
196+
str.append(std::to_string(height));
197+
str.append("}");
198+
pStreamObj->notifyAsyncEvent("streamadded", str.c_str());
199+
200+
gst_caps_unref (caps);
201+
gst_object_unref (pad);
202+
}
203+
204+
}
205+
162206
break;
163207
}
164208
default:
@@ -219,7 +263,7 @@ int VideoGstAnalyzer::createPipeline()
219263
{"inputwidth", std::to_string(width)},
220264
{"inputheight", std::to_string(height)},
221265
{"inputframerate", std::to_string(framerate)},
222-
{"inputcodec", codec},
266+
{"inputcodec", inputcodec},
223267
{"pipelinename", algo} };
224268
pipeline_->PipelineConfig(plugin_config_map);
225269

@@ -257,7 +301,6 @@ void VideoGstAnalyzer::stop_feed (GstElement * source, gpointer data)
257301

258302
void VideoGstAnalyzer::new_sample_from_sink (GstElement * source, gpointer data)
259303
{
260-
ELOG_DEBUG("Got new sample from sink\n");
261304
VideoGstAnalyzer* pStreamObj = static_cast<VideoGstAnalyzer*>(data);
262305
GstSample *sample;
263306
GstBuffer *buffer;
@@ -272,12 +315,23 @@ void VideoGstAnalyzer::new_sample_from_sink (GstElement * source, gpointer data)
272315
owt_base::Frame outFrame;
273316
memset(&outFrame, 0, sizeof(outFrame));
274317

275-
outFrame.format = owt_base::FRAME_FORMAT_H264;
318+
if (pStreamObj->outputcodec.compare("vp8") == 0) {
319+
outFrame.format = owt_base::FRAME_FORMAT_VP8;
320+
outFrame.additionalInfo.video.isKeyFrame = isVp8KeyFrame(map.data, map.size);
321+
} else if (pStreamObj->outputcodec.find("h264") != std::string::npos) {
322+
outFrame.format = owt_base::FRAME_FORMAT_H264;
323+
outFrame.additionalInfo.video.isKeyFrame = isH264KeyFrame(map.data, map.size);
324+
} else {
325+
printf("Not support codec:%s\n", pStreamObj->outputcodec.c_str());
326+
gst_buffer_unmap(buffer, &map);
327+
gst_sample_unref(sample);
328+
return;
329+
}
330+
331+
outFrame.timeStamp = (pStreamObj->m_frameCount++) * 1000 / pStreamObj->framerate * 90;
276332
outFrame.length = map.size;
277333
outFrame.additionalInfo.video.width = pStreamObj->width;
278334
outFrame.additionalInfo.video.height = pStreamObj->height;
279-
outFrame.additionalInfo.video.isKeyFrame = isH264KeyFrame(map.data, map.size);
280-
outFrame.timeStamp = (pStreamObj->m_frameCount++) * 1000 / 30 * 90;
281335

282336
outFrame.payload = map.data;
283337

@@ -353,8 +407,8 @@ int VideoGstAnalyzer::setPlaying()
353407
}
354408

355409
void VideoGstAnalyzer::emitListenTo(int minPort, int maxPort, std::string ticket) {
356-
ELOG_DEBUG("Listening\n");
357410
m_internalin.reset(new GstInternalIn((GstAppSrc*)source, minPort, maxPort, ticket));
411+
m_internalin->setFramerate(this->framerate);
358412
}
359413

360414
void VideoGstAnalyzer::addOutput(int connectionID, owt_base::FrameDestination* out)
@@ -397,10 +451,10 @@ int VideoGstAnalyzer::getListeningPort()
397451
return listeningPort;
398452
}
399453

400-
void VideoGstAnalyzer::setOutputParam(std::string codec, int width, int height,
454+
void VideoGstAnalyzer::setInputParam(std::string codec, int width, int height,
401455
int framerate, int bitrate, int kfi, std::string algo, std::string libraryName)
402456
{
403-
this->codec = codec;
457+
this->inputcodec = codec;
404458
this->width = width;
405459
this->height = height;
406460
this->framerate = framerate;

source/agent/analytics/videoGstPipeline/VideoGstAnalyzer.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class VideoGstAnalyzer : public EventRegistry {
3535

3636
int addElementMany();
3737

38-
void setOutputParam(std::string codec, int width, int height,
38+
void setInputParam(std::string codec, int width, int height,
3939
int framerate, int bitrate, int kfi, std::string algo, std::string pluginName);
4040

4141
void stopLoop();
@@ -86,7 +86,8 @@ class VideoGstAnalyzer : public EventRegistry {
8686
int m_frameCount;
8787

8888
//param
89-
std::string codec;
89+
std::string inputcodec;
90+
std::string outputcodec;
9091
std::string algo,libraryName;
9192
std::string resolution;
9293
int width,height;

source/agent/analytics/videoGstPipeline/VideoGstAnalyzerWrap.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ void VideoGstAnalyzerWrap::Init(Handle<Object> exports, Handle<Object> module) {
3030
NODE_SET_PROTOTYPE_METHOD(tpl, "emitListenTo", emitListenTo);
3131
NODE_SET_PROTOTYPE_METHOD(tpl, "addElementMany", addElementMany);
3232
NODE_SET_PROTOTYPE_METHOD(tpl, "setPlaying", setPlaying);
33-
NODE_SET_PROTOTYPE_METHOD(tpl, "setOutputParam", setOutputParam);
33+
NODE_SET_PROTOTYPE_METHOD(tpl, "setInputParam", setInputParam);
3434
NODE_SET_PROTOTYPE_METHOD(tpl, "disconnect", disconnect);
3535
NODE_SET_PROTOTYPE_METHOD(tpl, "addOutput", addOutput);
3636
NODE_SET_PROTOTYPE_METHOD(tpl, "addEventListener", addEventListener);
@@ -144,7 +144,7 @@ void VideoGstAnalyzerWrap::disconnect(const FunctionCallbackInfo<Value>& args){
144144
me->disconnect(out);
145145
}
146146

147-
void VideoGstAnalyzerWrap::setOutputParam(const FunctionCallbackInfo<Value>& args){
147+
void VideoGstAnalyzerWrap::setInputParam(const FunctionCallbackInfo<Value>& args){
148148
Isolate* isolate = Isolate::GetCurrent();
149149
HandleScope scope(isolate);
150150
VideoGstAnalyzerWrap* obj = ObjectWrap::Unwrap<VideoGstAnalyzerWrap>(args.Holder());
@@ -170,7 +170,7 @@ void VideoGstAnalyzerWrap::setOutputParam(const FunctionCallbackInfo<Value>& arg
170170
String::Utf8Value param7(args[6]->ToString());
171171
std::string pluginName = std::string(*param7);
172172

173-
me->setOutputParam(codec,width,height,framerateFPS,bitrateKbps,
173+
me->setInputParam(codec,width,height,framerateFPS,bitrateKbps,
174174
keyFrameIntervalSeconds,algorithm,pluginName);
175175
}
176176

0 commit comments

Comments
 (0)