Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit d13d3b5

Browse files
authored
Add Gstreamer pipeline error notify mechanism (#934)
1 parent 84991f7 commit d13d3b5

File tree

7 files changed

+182
-127
lines changed

7 files changed

+182
-127
lines changed

source/agent/analytics/analytics-agent.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,13 @@ createInternalConnection(connectionId, direction, internalOpt) {
116116
this.onStreamDestroyed(options.controller, newStreamId);
117117
}
118118
this.inputs[connectionId] = true;
119+
120+
var notifyStatus = this.onStatus;
121+
this.engine.addEventListener('fatal', function (error) {
122+
log.error('GStreamer pipeline error:', error);
123+
notifyStatus(options.controller, connectionId, 'out', {type: 'failed', reason: 'Analytics error: ' + error});
124+
});
125+
119126
return Promise.resolve();
120127
}
121128
return Promise.reject('Connection already exist: ' + connectionId);

source/agent/analytics/videoGstPipeline/VideoGstAnalyzer.cc

Lines changed: 110 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ static void dump(void* index, uint8_t* buf, int len)
7979
}
8080
}
8181

82-
VideoGstAnalyzer::VideoGstAnalyzer() {
82+
VideoGstAnalyzer::VideoGstAnalyzer(EventRegistry *handle) : m_asyncHandle(handle)
83+
{
8384
ELOG_INFO("Init");
8485
sourceid = 0;
8586
sink = NULL;
@@ -94,78 +95,104 @@ VideoGstAnalyzer::VideoGstAnalyzer() {
9495
}
9596
}
9697

97-
VideoGstAnalyzer::~VideoGstAnalyzer() {
98+
VideoGstAnalyzer::~VideoGstAnalyzer()
99+
{
98100
ELOG_DEBUG("Closed all media in this Analyzer");
99-
if (pipeline_ != nullptr && pipelineHandle != nullptr) {
100-
destroyPlugin(pipeline_);
101-
dlclose(pipelineHandle);
101+
destroyPipeline();
102+
}
103+
104+
bool VideoGstAnalyzer::notifyAsyncEvent(const std::string& event, const std::string& data)
105+
{
106+
if (m_asyncHandle) {
107+
return m_asyncHandle->notifyAsyncEvent(event, data);
108+
} else {
109+
return false;
102110
}
103-
stopLoop();
104111
}
105112

106-
gboolean VideoGstAnalyzer::StreamEventCallBack(GstBus *bus, GstMessage *message, gpointer data)
107-
{
108-
ELOG_DEBUG("Got %s message\n", GST_MESSAGE_TYPE_NAME(message));
109-
110-
VideoGstAnalyzer* pStreamObj = static_cast<VideoGstAnalyzer*>(data);
111-
112-
switch (GST_MESSAGE_TYPE(message)) {
113-
case GST_MESSAGE_ERROR: {
114-
GError *err;
115-
gchar *debug;
116-
gst_message_parse_error(message, &err, &debug);
117-
ELOG_ERROR("Error: %s\n", err->message);
118-
g_error_free(err);
119-
g_free(debug);
120-
g_main_loop_quit(pStreamObj->loop);
121-
break;
122-
}
123-
case GST_MESSAGE_EOS:
124-
/* end-of-stream */
125-
ELOG_ERROR("End of stream\n");
126-
g_main_loop_quit(pStreamObj->loop);
127-
break;
128-
case GST_MESSAGE_TAG:{
129-
/* end-of-stream */
130-
GstTagList *tags = NULL;
131-
gst_message_parse_tag (message, &tags);
132-
133-
ELOG_DEBUG("Got tags from element %s:\n", GST_OBJECT_NAME (message->src));
134-
gst_tag_list_unref (tags);
135-
break;
136-
}
137-
case GST_MESSAGE_QOS:{
138-
/* end-of-stream */
139-
ELOG_DEBUG("Got QOS message from %s \n",message->src->name);
140-
break;
141-
}
142-
case GST_MESSAGE_STATE_CHANGED:{
143-
GstState old_state, new_state, pending_state;
144-
gst_message_parse_state_changed(message, &old_state, &new_state, &pending_state);
145-
ELOG_DEBUG("State change from %d to %d, play:%d \n",old_state, new_state, GST_STATE_PAUSED);
146-
break;
147-
}
148-
default:
149-
/* unhandled message */
150-
break;
113+
bool VideoGstAnalyzer::notifyAsyncEventInEmergency(const std::string& event, const std::string& data)
114+
{
115+
if (m_asyncHandle) {
116+
return m_asyncHandle->notifyAsyncEventInEmergency(event, data);
117+
} else {
118+
return false;
151119
}
152-
return true;
120+
}
121+
122+
gboolean VideoGstAnalyzer::StreamEventCallBack(GstBus *bus, GstMessage *message, gpointer data)
123+
{
124+
ELOG_DEBUG("Got %s message\n", GST_MESSAGE_TYPE_NAME(message));
125+
126+
VideoGstAnalyzer* pStreamObj = static_cast<VideoGstAnalyzer*>(data);
127+
128+
switch (GST_MESSAGE_TYPE(message)) {
129+
case GST_MESSAGE_ERROR: {
130+
GError *err;
131+
gchar *debug;
132+
gst_message_parse_error(message, &err, &debug);
133+
ELOG_ERROR("Error: %s\n", err->message);
134+
g_error_free(err);
135+
g_free(debug);
136+
pStreamObj->notifyAsyncEvent("fatal", "GStreamer pipeline error");
137+
break;
138+
}
139+
case GST_MESSAGE_EOS:
140+
/* end-of-stream */
141+
ELOG_ERROR("End of stream\n");
142+
g_main_loop_quit(pStreamObj->loop);
143+
break;
144+
case GST_MESSAGE_TAG:{
145+
/* end-of-stream */
146+
GstTagList *tags = NULL;
147+
gst_message_parse_tag (message, &tags);
148+
149+
ELOG_DEBUG("Got tags from element %s:\n", GST_OBJECT_NAME (message->src));
150+
gst_tag_list_unref (tags);
151+
break;
152+
}
153+
case GST_MESSAGE_QOS:{
154+
/* end-of-stream */
155+
ELOG_DEBUG("Got QOS message from %s \n",message->src->name);
156+
break;
153157
}
158+
case GST_MESSAGE_STATE_CHANGED:{
159+
GstState old_state, new_state, pending_state;
160+
gst_message_parse_state_changed(message, &old_state, &new_state, &pending_state);
161+
ELOG_DEBUG("State change from %d to %d, play:%d \n",old_state, new_state, GST_STATE_PAUSED);
162+
break;
163+
}
164+
default:
165+
/* unhandled message */
166+
break;
167+
}
168+
return true;
169+
}
154170

155171
void VideoGstAnalyzer::clearPipeline()
156-
{
157-
if (pipeline != nullptr){
158-
gst_element_set_state(pipeline, GST_STATE_NULL);
159-
gst_object_unref(GST_OBJECT(pipeline));
160-
g_source_remove(m_bus_watch_id);
161-
g_main_loop_unref(loop);
162-
gst_object_unref(m_bus);
163-
}
164-
172+
{
173+
if (pipeline != nullptr){
174+
gst_element_set_state(pipeline, GST_STATE_NULL);
175+
gst_object_unref(GST_OBJECT(pipeline));
176+
g_source_remove(m_bus_watch_id);
177+
g_main_loop_unref(loop);
178+
gst_object_unref(m_bus);
165179
}
166180

167-
int VideoGstAnalyzer::createPipeline() {
181+
}
182+
183+
void VideoGstAnalyzer::destroyPipeline()
184+
{
185+
ELOG_DEBUG("Closed all media in this Analyzer");
186+
setState(GST_STATE_NULL);
187+
if (pipeline_ != nullptr && pipelineHandle != nullptr) {
188+
destroyPlugin(pipeline_);
189+
dlclose(pipelineHandle);
190+
stopLoop();
191+
}
192+
}
168193

194+
int VideoGstAnalyzer::createPipeline()
195+
{
169196
pipelineHandle = dlopen(libraryName.c_str(), RTLD_LAZY);
170197
if (pipelineHandle == nullptr) {
171198
ELOG_ERROR_T("Failed to open the plugin.(%s)", libraryName.c_str());
@@ -263,7 +290,8 @@ void VideoGstAnalyzer::new_sample_from_sink (GstElement * source, gpointer data)
263290
gst_sample_unref(sample);
264291
}
265292

266-
int VideoGstAnalyzer::addElementMany() {
293+
int VideoGstAnalyzer::addElementMany()
294+
{
267295
if(pipeline_){
268296
rvaStatus status = pipeline_->LinkElements();
269297
if(status != RVA_ERR_OK) {
@@ -290,20 +318,22 @@ int VideoGstAnalyzer::addElementMany() {
290318
}
291319

292320

293-
void VideoGstAnalyzer::stopLoop(){
321+
void VideoGstAnalyzer::stopLoop()
322+
{
294323
if(loop){
295324
ELOG_DEBUG("main loop quit\n");
296325
g_main_loop_quit(loop);
297326
}
298-
g_thread_join(m_thread);
299327
}
300328

301-
void VideoGstAnalyzer::main_loop_thread(gpointer data){
329+
void VideoGstAnalyzer::main_loop_thread(gpointer data)
330+
{
302331
g_main_loop_run(loop);
303332
g_thread_exit(0);
304333
}
305334

306-
void VideoGstAnalyzer::setState(GstState newstate) {
335+
void VideoGstAnalyzer::setState(GstState newstate)
336+
{
307337
ret = gst_element_set_state(pipeline, newstate);
308338
if (ret == GST_STATE_CHANGE_FAILURE) {
309339
ELOG_ERROR("Unable to set the pipeline to the PLAYING state.\n");
@@ -312,7 +342,8 @@ void VideoGstAnalyzer::setState(GstState newstate) {
312342
}
313343

314344

315-
int VideoGstAnalyzer::setPlaying() {
345+
int VideoGstAnalyzer::setPlaying()
346+
{
316347

317348
setState(GST_STATE_PLAYING);
318349

@@ -321,12 +352,14 @@ int VideoGstAnalyzer::setPlaying() {
321352
return 0;
322353
}
323354

324-
void VideoGstAnalyzer::emitListenTo(int minPort, int maxPort) {
355+
void VideoGstAnalyzer::emitListenTo(int minPort, int maxPort)
356+
{
325357
ELOG_DEBUG("Listening\n");
326358
m_internalin.reset(new GstInternalIn((GstAppSrc*)source, minPort, maxPort));
327359
}
328360

329-
void VideoGstAnalyzer::addOutput(int connectionID, owt_base::FrameDestination* out) {
361+
void VideoGstAnalyzer::addOutput(int connectionID, owt_base::FrameDestination* out)
362+
{
330363
ELOG_DEBUG("Add analyzed stream back to OWT\n");
331364
if (sink != nullptr){
332365

@@ -338,8 +371,7 @@ void VideoGstAnalyzer::addOutput(int connectionID, owt_base::FrameDestination* o
338371
}
339372
m_gstinternalout->setPad(encoder_pad);
340373
}
341-
//gst_pad_send_event(encoder_pad, gst_event_new_custom( GST_EVENT_CUSTOM_UPSTREAM, gst_structure_new( "GstForceKeyUnit", "all-headers", G_TYPE_BOOLEAN, TRUE, NULL)));
342-
//m_internalout.push_back(out);
374+
343375
m_gstinternalout->addVideoDestination(out);
344376
if(!addlistener) {
345377
g_object_set (G_OBJECT (sink), "emit-signals", TRUE, "sync", FALSE, NULL);
@@ -352,22 +384,23 @@ void VideoGstAnalyzer::addOutput(int connectionID, owt_base::FrameDestination* o
352384

353385
}
354386

355-
void VideoGstAnalyzer::disconnect(owt_base::FrameDestination* out){
387+
void VideoGstAnalyzer::disconnect(owt_base::FrameDestination* out)
388+
{
356389
ELOG_DEBUG("Disconnect remote connection\n");
357-
//m_internalout.remove(out);
358390
m_gstinternalout->removeVideoDestination(out);
359391
}
360392

361-
int VideoGstAnalyzer::getListeningPort() {
393+
int VideoGstAnalyzer::getListeningPort()
394+
{
362395
int listeningPort;
363396
listeningPort = m_internalin->getListeningPort();
364397
ELOG_DEBUG(">>>>>Listen port is :%d\n", listeningPort);
365398
return listeningPort;
366399
}
367400

368401
void VideoGstAnalyzer::setOutputParam(std::string codec, int width, int height,
369-
int framerate, int bitrate, int kfi, std::string algo, std::string libraryName){
370-
402+
int framerate, int bitrate, int kfi, std::string algo, std::string libraryName)
403+
{
371404
this->codec = codec;
372405
this->width = width;
373406
this->height = height;

source/agent/analytics/videoGstPipeline/VideoGstAnalyzer.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,18 @@
1717
#include "AnalyticsPipeline.h"
1818
#include <stdio.h>
1919
#include "MediaFramePipeline.h"
20+
#include <EventRegistry.h>
2021

2122
namespace mcu {
2223

23-
class VideoGstAnalyzer{
24+
class VideoGstAnalyzer : public EventRegistry {
2425
DECLARE_LOGGER();
2526
public:
26-
VideoGstAnalyzer();
27-
virtual ~VideoGstAnalyzer();
27+
VideoGstAnalyzer(EventRegistry* handle);
28+
~VideoGstAnalyzer();
2829
int createPipeline();
2930
void clearPipeline();
31+
void destroyPipeline();
3032
int getListeningPort();
3133
void emitListenTo(int minPort,int maxPort);
3234
int setPlaying();
@@ -55,6 +57,10 @@ class VideoGstAnalyzer{
5557
static GMainLoop *loop;
5658
static gboolean StreamEventCallBack(GstBus *bus, GstMessage *message, gpointer data);
5759
void setState(GstState newstate);
60+
// EventRegistry
61+
bool notifyAsyncEvent(const std::string& event, const std::string& data) override;
62+
bool notifyAsyncEventInEmergency(const std::string& event, const std::string& data) override;
63+
5864
boost::scoped_ptr<GstInternalIn> m_internalin;
5965
boost::scoped_ptr<GstInternalOut> m_gstinternalout;
6066
//std::list<owt_base::InternalOut*> m_internalout;
@@ -67,6 +73,7 @@ class VideoGstAnalyzer{
6773
rvaPipeline* pipeline_;
6874
rva_create_t* createPlugin;
6975
rva_destroy_t* destroyPlugin;
76+
EventRegistry *m_asyncHandle;
7077

7178
GstStateChangeReturn ret;
7279

0 commit comments

Comments
 (0)