@@ -25,6 +25,7 @@ VideoDemuxerFlowUnit::~VideoDemuxerFlowUnit() = default;
2525modelbox::Status VideoDemuxerFlowUnit::Open (
2626 const std::shared_ptr<modelbox::Configuration> &opts) {
2727 key_frame_only_ = opts->GetBool (" key_frame_only" , false );
28+ queue_size_ = opts->GetUint64 (" queue_size" , queue_size_);
2829 return modelbox::STATUS_OK;
2930}
3031modelbox::Status VideoDemuxerFlowUnit::Close () { return modelbox::STATUS_OK; }
@@ -62,15 +63,16 @@ modelbox::Status VideoDemuxerFlowUnit::Reconnect(
6263
6364modelbox::Status VideoDemuxerFlowUnit::Process (
6465 std::shared_ptr<modelbox::DataContext> data_ctx) {
65- auto video_demuxer = std::static_pointer_cast<FfmpegVideoDemuxer >(
66+ auto demuxer_worker = std::static_pointer_cast<DemuxerWorker >(
6667 data_ctx->GetPrivate (DEMUXER_CTX));
6768 modelbox::Status demux_status = modelbox::STATUS_FAULT;
6869 std::shared_ptr<AVPacket> pkt;
69- if (video_demuxer != nullptr ) {
70- demux_status = video_demuxer-> Demux (pkt);
70+ if (demuxer_worker != nullptr ) {
71+ demux_status = demuxer_worker-> ReadPacket (pkt);
7172 }
7273
7374 if (demux_status == modelbox::STATUS_OK) {
75+ auto video_demuxer = demuxer_worker->GetDemuxer ();
7476 auto ret = WriteData (data_ctx, pkt, video_demuxer);
7577 if (!ret) {
7678 return ret;
@@ -86,8 +88,9 @@ modelbox::Status VideoDemuxerFlowUnit::Process(
8688
8789void VideoDemuxerFlowUnit::WriteEnd (
8890 std::shared_ptr<modelbox::DataContext> &data_ctx) {
89- auto video_demuxer = std::static_pointer_cast<FfmpegVideoDemuxer >(
91+ auto demuxer_worker = std::static_pointer_cast<DemuxerWorker >(
9092 data_ctx->GetPrivate (DEMUXER_CTX));
93+ auto video_demuxer = demuxer_worker->GetDemuxer ();
9194 auto video_packet_output = data_ctx->Output (VIDEO_PACKET_OUTPUT);
9295 video_packet_output->Build ({1 });
9396 auto end_packet = video_packet_output->At (0 );
@@ -287,7 +290,16 @@ modelbox::Status VideoDemuxerFlowUnit::InitDemuxer(
287290 std::static_pointer_cast<std::string>(meta->GetMeta (SOURCE_URL));
288291 *uri_meta = *source_url;
289292
290- data_ctx->SetPrivate (DEMUXER_CTX, video_demuxer);
293+ auto is_rtsp = (source_url->find (" rtsp://" ) == 0 );
294+ auto demuxer_worker =
295+ std::make_shared<DemuxerWorker>(is_rtsp, queue_size_, video_demuxer);
296+ ret = demuxer_worker->Init ();
297+ if (ret != modelbox::STATUS_OK) {
298+ MBLOG_ERROR << " init demuxer failed, ret " << ret;
299+ return ret;
300+ }
301+
302+ data_ctx->SetPrivate (DEMUXER_CTX, demuxer_worker);
291303 data_ctx->SetPrivate (SOURCE_URL, source_url);
292304
293305 UpdateStatsInfo (data_ctx, video_demuxer);
@@ -322,3 +334,106 @@ MODELBOX_DRIVER_FLOWUNIT(desc) {
322334 desc.Desc .SetDescription (FLOWUNIT_DESC);
323335 desc.Desc .SetVersion (" 1.0.0" );
324336}
337+
338+ DemuxerWorker::DemuxerWorker (bool is_async, size_t cache_size,
339+ std::shared_ptr<FfmpegVideoDemuxer> demuxer)
340+ : is_async_(is_async),
341+ cache_size_(cache_size),
342+ demuxer_(std::move(demuxer)) {
343+ if (cache_size_ < 2 ) {
344+ cache_size_ = 2 ;
345+ }
346+ }
347+
348+ DemuxerWorker::~DemuxerWorker () {
349+ if (demux_thread_ != nullptr ) {
350+ demux_thread_running_ = false ;
351+ demux_thread_->join ();
352+ }
353+ }
354+
355+ modelbox::Status DemuxerWorker::Init () {
356+ if (!is_async_) {
357+ return modelbox::STATUS_OK;
358+ }
359+
360+ demux_thread_running_ = true ;
361+ demux_thread_ = std::make_shared<std::thread>([this ]() {
362+ while (IsRunning ()) {
363+ Process ();
364+ }
365+ });
366+
367+ return modelbox::STATUS_OK;
368+ }
369+
370+ std::shared_ptr<FfmpegVideoDemuxer> DemuxerWorker::GetDemuxer () const {
371+ return demuxer_;
372+ }
373+
374+ size_t DemuxerWorker::GetDropCount () const { return packet_drop_count_; }
375+
376+ modelbox::Status DemuxerWorker::ReadPacket (
377+ std::shared_ptr<AVPacket> &av_packet) {
378+ if (!is_async_) {
379+ return demuxer_->Demux (av_packet);
380+ }
381+
382+ PopCache (av_packet);
383+ if (av_packet == nullptr ) {
384+ return last_demux_status_;
385+ }
386+
387+ return modelbox::STATUS_OK;
388+ }
389+
390+ bool DemuxerWorker::IsRunning () const { return demux_thread_running_; }
391+
392+ void DemuxerWorker::Process () {
393+ std::shared_ptr<AVPacket> av_packet;
394+ last_demux_status_ = demuxer_->Demux (av_packet);
395+ if (last_demux_status_ != modelbox::STATUS_OK) {
396+ demux_thread_running_ = false ;
397+ av_packet = nullptr ;
398+ }
399+
400+ PushCache (av_packet);
401+ }
402+
403+ void DemuxerWorker::PushCache (const std::shared_ptr<AVPacket> &av_packet) {
404+ std::unique_lock<std::mutex> lock (packet_cache_lock_);
405+ while (packet_cache_.size () >= cache_size_) {
406+ ++packet_drop_count_;
407+ // drop packet, cache_size >= 2
408+ auto iter = packet_cache_.begin ();
409+ auto first_iter = iter;
410+ auto second_iter = ++iter;
411+ if (IsKeyFrame (*first_iter) && !IsKeyFrame (*second_iter)) {
412+ // we need drop the frame rely on key frame first to avoid invalid picture
413+ packet_cache_.erase (second_iter);
414+ continue ;
415+ }
416+
417+ // no more frame rely on front frame, just drop front
418+ packet_cache_.pop_front ();
419+ }
420+
421+ packet_cache_.push_back (av_packet);
422+ packet_cache_not_empty_.notify_all ();
423+ }
424+
425+ void DemuxerWorker::PopCache (std::shared_ptr<AVPacket> &av_packet) {
426+ std::unique_lock<std::mutex> lock (packet_cache_lock_);
427+ packet_cache_not_empty_.wait (lock, [&]() { return !packet_cache_.empty (); });
428+ av_packet = packet_cache_.front ();
429+ if (av_packet == nullptr ) {
430+ // stream end, keep nullptr in cache
431+ return ;
432+ }
433+
434+ packet_cache_.pop_front ();
435+ }
436+
437+ bool DemuxerWorker::IsKeyFrame (const std::shared_ptr<AVPacket> &av_packet) {
438+ return (av_packet->flags & AV_PKT_FLAG_KEY) != 0 ;
439+ }
0 commit comments