diff --git a/CMakeLists.txt b/CMakeLists.txt index 0c7892f..1322972 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -186,6 +186,8 @@ else() ) pkg_search_module(gstreamer REQUIRED IMPORTED_TARGET gstreamer-1.0>=1.4) pkg_search_module(gstreamer-app REQUIRED IMPORTED_TARGET gstreamer-app-1.0>=1.4) + pkg_search_module(gstnet REQUIRED IMPORTED_TARGET gstreamer-net-1.0>=1.4) + pkg_search_module(gio REQUIRED IMPORTED_TARGET gio-2.0) add_executable(${PROJECT_NAME} ${SOURCE_FILES}) @@ -212,6 +214,8 @@ else() ${PNG_LIBRARIES} PkgConfig::gstreamer PkgConfig::gstreamer-app + PkgConfig::gstnet + PkgConfig::gio ) endif() @@ -257,7 +261,9 @@ if(BUILD_TESTS) ) pkg_search_module(gstreamer REQUIRED IMPORTED_TARGET gstreamer-1.0>=1.4) pkg_search_module(gstreamer-app REQUIRED IMPORTED_TARGET gstreamer-app-1.0>=1.4) - target_link_libraries(pixelpilot_tests PkgConfig::gstreamer PkgConfig::gstreamer-app) + pkg_search_module(gstnet REQUIRED IMPORTED_TARGET gstreamer-net-1.0>=1.4) + pkg_search_module(gio REQUIRED IMPORTED_TARGET gio-2.0) + target_link_libraries(pixelpilot_tests PkgConfig::gstreamer PkgConfig::gstreamer-app PkgConfig::gstnet PkgConfig::gio) target_link_options(pixelpilot_tests BEFORE PUBLIC -fsanitize=undefined PUBLIC -fsanitize=address diff --git a/src/dvr.cpp b/src/dvr.cpp index abf3e54..01b6b92 100644 --- a/src/dvr.cpp +++ b/src/dvr.cpp @@ -134,9 +134,11 @@ void Dvr::loop() { if (dvr_file != NULL) { break; } - start(); - if (video_frm_width > 0 && video_frm_height > 0) { - init(); + if (start() == 0) { + idr_request_record_start(); + if (video_frm_width > 0 && video_frm_height > 0) { + init(); + } } break; } @@ -153,9 +155,11 @@ void Dvr::loop() { { SPDLOG_DEBUG("got rpc TOGGLE"); if (dvr_file == NULL) { - start(); - if (video_frm_width > 0 && video_frm_height > 0) { - init(); + if (start() == 0) { + idr_request_record_start(); + if (video_frm_width > 0 && video_frm_height > 0) { + init(); + } } } else { stop(); diff --git a/src/gsmenu/gs_system.c b/src/gsmenu/gs_system.c index 621016b..bb465e1 100644 --- a/src/gsmenu/gs_system.c +++ b/src/gsmenu/gs_system.c @@ -2,6 +2,7 @@ #include #include #include "../main.h" +#include "../gstrtpreceiver.h" #include "gs_system.h" #include "lvgl/lvgl.h" #include "helper.h" @@ -18,6 +19,7 @@ lv_obj_t * resolution; lv_obj_t * rec_enabled; lv_obj_t * rec_fps; lv_obj_t * vsync_disabled; +lv_obj_t * gs_request_idr; lv_obj_t * video_scale; extern lv_obj_t * ap_fpv_ssid; @@ -47,6 +49,8 @@ void gs_system_page_load_callback(lv_obj_t * page) if (disable_vsync) lv_obj_add_state(lv_obj_get_child_by_type(vsync_disabled,0,&lv_switch_class), LV_STATE_CHECKED); else lv_obj_clear_state(lv_obj_get_child_by_type(vsync_disabled,0,&lv_switch_class), LV_STATE_CHECKED); + if (idr_get_enabled()) lv_obj_add_state(lv_obj_get_child_by_type(gs_request_idr,0,&lv_switch_class), LV_STATE_CHECKED); + else lv_obj_clear_state(lv_obj_get_child_by_type(gs_request_idr,0,&lv_switch_class), LV_STATE_CHECKED); } void toggle_rec_enabled() @@ -86,6 +90,14 @@ void disable_vsync_cb(lv_event_t *e) { } } +void gs_request_idr_cb(lv_event_t *e) { + lv_event_code_t event = lv_event_get_code(e); + if (event == LV_EVENT_VALUE_CHANGED) { + lv_obj_t *ta = lv_event_get_target(e); + idr_set_enabled(lv_obj_has_state(ta, LV_STATE_CHECKED)); + } +} + void rec_fps_cb(lv_event_t *e) { lv_event_code_t event = lv_event_get_code(e); if (event == LV_EVENT_VALUE_CHANGED) { @@ -146,6 +158,8 @@ void create_gs_system_menu(lv_obj_t * parent) { vsync_disabled = create_switch(cont,LV_SYMBOL_SETTINGS,"Disable VSYNC","disable_vsync", NULL,false); lv_obj_add_event_cb(lv_obj_get_child_by_type(vsync_disabled,0,&lv_switch_class), disable_vsync_cb, LV_EVENT_VALUE_CHANGED,NULL); + gs_request_idr = create_switch(cont,LV_SYMBOL_SETTINGS,"Request IDR","gs_request_idr", NULL,false); + lv_obj_add_event_cb(lv_obj_get_child_by_type(gs_request_idr,0,&lv_switch_class), gs_request_idr_cb, LV_EVENT_VALUE_CHANGED,NULL); create_text(parent, NULL, "Recording", NULL, NULL, false, LV_MENU_ITEM_BUILDER_VARIANT_1); section = lv_menu_section_create(parent); diff --git a/src/gstrtpreceiver.cpp b/src/gstrtpreceiver.cpp index 654890c..6609ee2 100644 --- a/src/gstrtpreceiver.cpp +++ b/src/gstrtpreceiver.cpp @@ -6,9 +6,11 @@ #include "gstrtpreceiver.h" #include "gst/gstparse.h" #include "gst/gstpipeline.h" +#include "gst/net/gstnetaddressmeta.h" #include "gst/app/gstappsink.h" #include "gst/app/gstappsrc.h" #include "spdlog/spdlog.h" +#include #include #include #include @@ -16,11 +18,23 @@ #include #include #include +#include +#include +#include +#include +#include +#include #include +#include +#include #include #include #include #include +#include +#if defined(__linux__) +#include +#endif namespace pipeline { static std::string gst_create_rtp_caps(const VideoCodec& videoCodec){ @@ -65,6 +79,596 @@ namespace pipeline { } } +namespace { + static constexpr int kIdrUdpPort = 11223; + static constexpr int kIdrBurstCount = 3; + static constexpr int kIdrBurstSpacingMs = 100; + static constexpr int kIdrRepeatCount = 3; + static constexpr int kIdrRepeatSpacingMs = 100; + static constexpr int kIdrRecordRepeatCount = 3; + static constexpr int kIdrRecordRepeatSpacingMs = 150; + static constexpr uint64_t kStreamDownMs = 1200; + static constexpr uint64_t kStreamTickMs = 200; + static constexpr uint64_t kIntegrityCooldownMs = 350; + static constexpr uint64_t kRtpGapCooldownMs = 500; + static constexpr uint64_t kDecodeStallMs = 700; + static constexpr uint64_t kDecodeStallCooldownMs = 700; + static constexpr uint64_t kDecodeStallPktWindowMs = 500; + static constexpr uint64_t kRtpSeqResetMs = 1000; + + static std::mutex g_idr_sock_mutex; + static int g_idr_sock = -1; + static std::atomic g_idr_sock_ready{false}; + + static std::mutex g_last_hop_mutex; + static std::string g_last_hop_ip; + static std::atomic g_last_pkt_ms{0}; + static std::atomic g_stream_up{false}; + static std::atomic g_pending_rec_idr{false}; + static std::atomic g_last_integrity_idr_ms{0}; + static std::atomic g_last_rtp_gap_idr_ms{0}; + static std::atomic g_last_decode_stall_idr_ms{0}; + static std::atomic g_last_decoded_ms{0}; + static std::atomic g_last_rtp_seq_ms{0}; + static std::atomic g_last_rtp_seq{0}; + static std::atomic g_last_rtp_seq_valid{false}; + static std::atomic g_idr_enabled{true}; + static std::atomic g_stream_idr_pending{false}; + static std::atomic g_record_idr_pending{false}; + + static uint64_t now_ms() { + const auto now = std::chrono::steady_clock::now().time_since_epoch(); + return std::chrono::duration_cast(now).count(); + } + + static void request_idr_bursts(const char* reason, int request_count, bool allow_pending); + + static bool is_stream_idr_reason(const char* reason) { + return reason && !strcmp(reason, "stream-up"); + } + + static bool is_record_idr_reason(const char* reason) { + return reason && !strncmp(reason, "record-start", strlen("record-start")); + } + + static bool ensure_idr_socket() { + if (g_idr_sock_ready.load(std::memory_order_acquire)) { + return true; + } + + std::lock_guard lock(g_idr_sock_mutex); + if (g_idr_sock_ready.load(std::memory_order_relaxed)) { + return true; + } + + g_idr_sock = socket(AF_INET, SOCK_DGRAM, 0); + if (g_idr_sock < 0) { + spdlog::warn("[IDR] socket(AF_INET,SOCK_DGRAM) failed: {}", strerror(errno)); + return false; + } + + g_idr_sock_ready.store(true, std::memory_order_release); + spdlog::info("[IDR] UDP socket ready"); + return true; + } + + static uint32_t secure_random_u32() { + uint32_t out = 0; +#if defined(__linux__) + ssize_t n = getrandom(&out, sizeof(out), 0); + if (n == sizeof(out)) { + return out; + } +#endif + static std::random_device rd; + out = (static_cast(rd()) << 16) ^ static_cast(rd()); + return out; + } + + static void make_idr_token3(char out[4]) { + static const char alphabet[] = "abcdefghijklmnopqrstuvwxyz"; + const uint32_t r0 = secure_random_u32(); + const uint32_t r1 = secure_random_u32(); + const uint32_t r2 = secure_random_u32(); + out[0] = alphabet[r0 % 26]; + out[1] = alphabet[r1 % 26]; + out[2] = alphabet[r2 % 26]; + out[3] = '\0'; + } + + static bool extract_sender_ip_from_buffer(GstBuffer* buf, std::string& out_ip) { + out_ip.clear(); + if (!buf) { + return false; + } + + GstNetAddressMeta* meta = (GstNetAddressMeta*)gst_buffer_get_meta(buf, GST_NET_ADDRESS_META_API_TYPE); + if (!meta || !meta->addr) { + return false; + } + + if (!G_IS_INET_SOCKET_ADDRESS(meta->addr)) { + return false; + } + + GInetSocketAddress* isa = G_INET_SOCKET_ADDRESS(meta->addr); + GInetAddress* ia = g_inet_socket_address_get_address(isa); + if (!ia) { + return false; + } + + gchar* s = g_inet_address_to_string(ia); + if (!s) { + return false; + } + + out_ip = s; + g_free(s); + return !out_ip.empty(); + } + + static void maybe_update_last_hop_from_buffer(GstBuffer* buf) { + if (!g_idr_enabled.load(std::memory_order_relaxed)) { + return; + } + + std::string ip; + if (!extract_sender_ip_from_buffer(buf, ip)) { + return; + } + + std::lock_guard lock(g_last_hop_mutex); + if (ip != g_last_hop_ip) { + g_last_hop_ip = ip; + spdlog::info("[NET] Last-hop sender: {}", g_last_hop_ip); + } + } + + static std::string get_last_hop_ip_copy() { + std::lock_guard lock(g_last_hop_mutex); + return g_last_hop_ip; + } + + static bool extract_rtp_sequence(GstBuffer* buf, uint16_t* out_seq) { + if (!buf || !out_seq) { + return false; + } + + GstMapInfo map; + if (!gst_buffer_map(buf, &map, GST_MAP_READ)) { + return false; + } + + bool ok = false; + if (map.size >= 4) { + const uint8_t* data = map.data; + *out_seq = static_cast((data[2] << 8) | data[3]); + ok = true; + } + + gst_buffer_unmap(buf, &map); + return ok; + } + + static void maybe_request_idr_for_rtp_gap(uint16_t gap_count) { + if (!g_idr_enabled.load(std::memory_order_relaxed)) { + return; + } + + if (!g_stream_up.load(std::memory_order_relaxed)) { + return; + } + + const uint64_t now = now_ms(); + const uint64_t last = g_last_rtp_gap_idr_ms.load(std::memory_order_relaxed); + if (last && (now - last) < kRtpGapCooldownMs) { + return; + } + + g_last_rtp_gap_idr_ms.store(now, std::memory_order_relaxed); + spdlog::info("[IDR] RTP gap detected (missing {} packet(s)) -> request IDR", gap_count); + request_idr_bursts("rtp-gap", 1, false); + } + + static void maybe_track_rtp_sequence(GstBuffer* buf) { + if (!g_idr_enabled.load(std::memory_order_relaxed)) { + return; + } + + uint16_t seq = 0; + if (!extract_rtp_sequence(buf, &seq)) { + return; + } + + const uint64_t now = now_ms(); + if (!g_last_rtp_seq_valid.load(std::memory_order_relaxed)) { + g_last_rtp_seq.store(seq, std::memory_order_relaxed); + g_last_rtp_seq_ms.store(now, std::memory_order_relaxed); + g_last_rtp_seq_valid.store(true, std::memory_order_relaxed); + return; + } + + const uint16_t last = g_last_rtp_seq.load(std::memory_order_relaxed); + const uint16_t diff = static_cast(seq - last); + if (diff == 0) { + return; + } + + if (diff >= 30000) { + const uint64_t last_ms = g_last_rtp_seq_ms.load(std::memory_order_relaxed); + if (last_ms == 0 || (now - last_ms) > kRtpSeqResetMs) { + g_last_rtp_seq.store(seq, std::memory_order_relaxed); + g_last_rtp_seq_ms.store(now, std::memory_order_relaxed); + } + return; + } + + if (diff > 1) { + maybe_request_idr_for_rtp_gap(static_cast(diff - 1)); + } + + g_last_rtp_seq.store(seq, std::memory_order_relaxed); + g_last_rtp_seq_ms.store(now, std::memory_order_relaxed); + } + + static void for_each_nal(const uint8_t* data, size_t size, + const std::function& cb) { + auto find_start = [&](size_t from, size_t& start_len) -> size_t { + for (size_t i = from; i + 3 < size; i++) { + if (data[i] == 0x00 && data[i + 1] == 0x00) { + if (data[i + 2] == 0x01) { + start_len = 3; + return i; + } + if (i + 3 < size && data[i + 2] == 0x00 && data[i + 3] == 0x01) { + start_len = 4; + return i; + } + } + } + start_len = 0; + return size; + }; + + size_t pos = 0; + while (pos < size) { + size_t start_len = 0; + size_t start = find_start(pos, start_len); + if (start == size) { + break; + } + size_t nal_start = start + start_len; + size_t next_len = 0; + size_t next = find_start(nal_start, next_len); + size_t nal_end = (next == size) ? size : next; + if (nal_end > nal_start) { + cb(data + nal_start, nal_end - nal_start); + } + pos = nal_end; + } + } + + static bool has_idr_frame(const uint8_t* data, size_t size, VideoCodec codec) { + bool found = false; + if (!data || size == 0) { + return false; + } + for_each_nal(data, size, [&](const uint8_t* nal, size_t nal_size) { + if (found || !nal || nal_size == 0) { + return; + } + if (codec == VideoCodec::H265) { + uint8_t nal_type = (nal[0] >> 1) & 0x3f; + if (nal_type >= 16 && nal_type <= 21) { + found = true; + } + } else if (codec == VideoCodec::H264) { + uint8_t nal_type = nal[0] & 0x1f; + if (nal_type == 5) { + found = true; + } + } + }); + return found; + } + + static void maybe_mark_idr_received(const uint8_t* data, size_t size, VideoCodec codec) { + if (!g_idr_enabled.load(std::memory_order_relaxed)) { + return; + } + + if (!g_stream_idr_pending.load(std::memory_order_relaxed) && + !g_record_idr_pending.load(std::memory_order_relaxed)) { + return; + } + + if (!has_idr_frame(data, size, codec)) { + return; + } + + if (g_stream_idr_pending.exchange(false, std::memory_order_relaxed)) { + spdlog::info("[IDR] Stream refresh confirmed (IDR received)"); + } + if (g_record_idr_pending.exchange(false, std::memory_order_relaxed)) { + g_pending_rec_idr.store(false, std::memory_order_relaxed); + spdlog::info("[IDR] Record refresh confirmed (IDR received)"); + } + } + + static void send_idr_token_to_ip(const char* ip, const char token3[4]) { + if (!ip || !ip[0]) { + return; + } + + sockaddr_in dst{}; + dst.sin_family = AF_INET; + dst.sin_port = htons(static_cast(kIdrUdpPort)); + + if (inet_pton(AF_INET, ip, &dst.sin_addr) != 1) { + spdlog::warn("[IDR] inet_pton failed for ip={}", ip); + return; + } + + char payload[16]; + snprintf(payload, sizeof(payload), "%s\n", token3); + int rc = sendto(g_idr_sock, payload, static_cast(strlen(payload)), 0, + reinterpret_cast(&dst), static_cast(sizeof(dst))); + if (rc < 0) { + spdlog::warn("[IDR] sendto({}:{}) failed: {}", ip, kIdrUdpPort, strerror(errno)); + } + } + + static void send_idr_burst(const std::string& ip) { + for (int i = 0; i < kIdrBurstCount; ++i) { + char tok[4]; + make_idr_token3(tok); + send_idr_token_to_ip(ip.c_str(), tok); + if (i + 1 < kIdrBurstCount) { + std::this_thread::sleep_for(std::chrono::milliseconds(kIdrBurstSpacingMs)); + } + } + } + + static void request_idr_bursts(const char* reason, int request_count, bool allow_pending) { + if (!g_idr_enabled.load(std::memory_order_relaxed)) { + return; + } + + const bool track_stream = is_stream_idr_reason(reason); + const bool track_record = is_record_idr_reason(reason); + if (track_stream) { + g_stream_idr_pending.store(true, std::memory_order_relaxed); + } + if (track_record) { + g_record_idr_pending.store(true, std::memory_order_relaxed); + } + + const std::string ip = get_last_hop_ip_copy(); + if (ip.empty()) { + spdlog::warn("[IDR] Cannot request IDR (last-hop unknown) reason={}", reason ? reason : "(null)"); + if (allow_pending) { + g_pending_rec_idr.store(true, std::memory_order_relaxed); + } + return; + } + + if (!ensure_idr_socket()) { + return; + } + + g_pending_rec_idr.store(false, std::memory_order_relaxed); + const std::string reason_str = reason ? reason : ""; + + if (track_record) { + std::thread([ip, reason_str, request_count]() { + const char* reason_c = reason_str.empty() ? "no-reason" : reason_str.c_str(); + for (int r = 0; r < request_count; ++r) { + if (!g_record_idr_pending.load(std::memory_order_relaxed)) { + spdlog::info("[IDR] Record refresh confirmed; skipping remaining bursts"); + break; + } + spdlog::info("[IDR] Request 1 burst(s) to {}:{} ({} {}/{})", + ip, kIdrUdpPort, reason_c, r + 1, request_count); + send_idr_burst(ip); + if (r + 1 < request_count) { + std::this_thread::sleep_for( + std::chrono::milliseconds(kIdrRecordRepeatSpacingMs)); + } + } + }).detach(); + return; + } + + std::thread([ip, reason_str, request_count]() { + const char* reason_c = reason_str.empty() ? "no-reason" : reason_str.c_str(); + const bool track_stream = is_stream_idr_reason(reason_c); + spdlog::info("[IDR] Request {} burst(s) to {}:{} ({})", request_count, ip, kIdrUdpPort, reason_c); + for (int r = 0; r < request_count; ++r) { + if (track_stream && !g_stream_idr_pending.load(std::memory_order_relaxed)) { + spdlog::info("[IDR] Stream refresh confirmed; skipping remaining bursts"); + break; + } + send_idr_burst(ip); + if (r + 1 < request_count) { + std::this_thread::sleep_for(std::chrono::milliseconds(kIdrRepeatSpacingMs)); + } + } + }).detach(); + } + + static void on_incoming_stream_buffer(GstBuffer* buf, const char* tag) { + if (!g_idr_enabled.load(std::memory_order_relaxed)) { + return; + } + + g_last_pkt_ms.store(now_ms(), std::memory_order_relaxed); + maybe_update_last_hop_from_buffer(buf); + + if (!g_stream_up.exchange(true)) { + spdlog::info("[NET] Stream UP ({})", tag ? tag : "unknown"); + request_idr_bursts("stream-up", kIdrRepeatCount, false); + } + + if (g_pending_rec_idr.load(std::memory_order_relaxed)) { + if (!g_record_idr_pending.load(std::memory_order_relaxed)) { + g_pending_rec_idr.store(false, std::memory_order_relaxed); + } else { + const std::string ip = get_last_hop_ip_copy(); + if (!ip.empty()) { + g_pending_rec_idr.store(false, std::memory_order_relaxed); + request_idr_bursts("record-start(pending)", kIdrRecordRepeatCount, false); + } + } + } + } + + static void maybe_request_decode_stall(uint64_t now) { + if (!g_idr_enabled.load(std::memory_order_relaxed)) { + return; + } + + if (!g_stream_up.load(std::memory_order_relaxed)) { + return; + } + + const uint64_t last_pkt = g_last_pkt_ms.load(std::memory_order_relaxed); + const uint64_t last_decoded = g_last_decoded_ms.load(std::memory_order_relaxed); + if (last_decoded == 0) { + return; + } + + if (last_pkt && (now - last_pkt) > kDecodeStallPktWindowMs) { + return; + } + + if (last_pkt > last_decoded && (now - last_decoded) > kDecodeStallMs) { + const uint64_t last_idr = g_last_decode_stall_idr_ms.load(std::memory_order_relaxed); + if (!last_idr || (now - last_idr) > kDecodeStallCooldownMs) { + g_last_decode_stall_idr_ms.store(now, std::memory_order_relaxed); + spdlog::info("[IDR] Decode stall (no frames for {} ms) -> request IDR", now - last_decoded); + request_idr_bursts("decode-stall", 1, false); + } + } + } + + static void tick_stream_presence() { + if (!g_idr_enabled.load(std::memory_order_relaxed)) { + return; + } + + static uint64_t last_tick = 0; + const uint64_t now = now_ms(); + if (now - last_tick < kStreamTickMs) { + return; + } + last_tick = now; + + if (!g_stream_up.load(std::memory_order_relaxed)) { + return; + } + + const uint64_t last = g_last_pkt_ms.load(std::memory_order_relaxed); + if (last && now > last && (now - last) > kStreamDownMs) { + if (g_stream_up.exchange(false)) { + spdlog::info("[NET] Stream DOWN (no packets for {} ms)", now - last); + g_last_rtp_seq_valid.store(false, std::memory_order_relaxed); + g_last_rtp_seq_ms.store(0, std::memory_order_relaxed); + } + } + + maybe_request_decode_stall(now); + } + + static void reset_stream_tracking() { + g_stream_up.store(false, std::memory_order_relaxed); + g_last_pkt_ms.store(0, std::memory_order_relaxed); + g_last_decoded_ms.store(0, std::memory_order_relaxed); + g_last_rtp_seq_valid.store(false, std::memory_order_relaxed); + g_last_rtp_seq_ms.store(0, std::memory_order_relaxed); + g_stream_idr_pending.store(false, std::memory_order_relaxed); + std::lock_guard lock(g_last_hop_mutex); + g_last_hop_ip.clear(); + } + + static GstPadProbeReturn udp_last_hop_probe(GstPad*, GstPadProbeInfo* info, gpointer) { + if (!g_idr_enabled.load(std::memory_order_relaxed)) { + return GST_PAD_PROBE_OK; + } + + if (GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER) { + GstBuffer* buf = GST_PAD_PROBE_INFO_BUFFER(info); + if (buf) { + on_incoming_stream_buffer(buf, "udpsrc"); + maybe_track_rtp_sequence(buf); + } + } + return GST_PAD_PROBE_OK; + } + + static void attach_last_hop_probes(GstElement* pipeline) { + if (!g_idr_enabled.load(std::memory_order_relaxed)) { + return; + } + + if (!pipeline || !GST_IS_BIN(pipeline)) { + return; + } + + GstIterator* it = gst_bin_iterate_recurse(GST_BIN(pipeline)); + if (!it) { + return; + } + + GValue v = G_VALUE_INIT; + while (gst_iterator_next(it, &v) == GST_ITERATOR_OK) { + GstElement* e = GST_ELEMENT(g_value_get_object(&v)); + GstElementFactory* f = e ? gst_element_get_factory(e) : nullptr; + const gchar* fname = f ? gst_plugin_feature_get_name(GST_PLUGIN_FEATURE(f)) : nullptr; + + if (fname && (!strcmp(fname, "udpsrc") || !strcmp(fname, "ts-udpsrc"))) { + if (g_object_class_find_property(G_OBJECT_GET_CLASS(e), "retrieve-sender-address")) { + g_object_set(G_OBJECT(e), "retrieve-sender-address", TRUE, NULL); + } + + GstPad* src_pad = gst_element_get_static_pad(e, "src"); + if (src_pad) { + gst_pad_add_probe(src_pad, GST_PAD_PROBE_TYPE_BUFFER, udp_last_hop_probe, nullptr, nullptr); + gst_object_unref(src_pad); + spdlog::info("[NET] last-hop probe attached to {}", fname); + } + } + + g_value_unset(&v); + } + gst_iterator_free(it); + } + + static void maybe_request_idr_rate_limited(const char* reason, const char* context) { + if (!g_idr_enabled.load(std::memory_order_relaxed)) { + return; + } + + if (!g_stream_up.load(std::memory_order_relaxed)) { + return; + } + + const uint64_t now = now_ms(); + const uint64_t last = g_last_integrity_idr_ms.load(std::memory_order_relaxed); + if (last && (now - last) < kIntegrityCooldownMs) { + return; + } + + g_last_integrity_idr_ms.store(now, std::memory_order_relaxed); + if (context && context[0]) { + spdlog::info("[IDR] {} -> request IDR", context); + } else { + spdlog::info("[IDR] Decoder issue -> request IDR"); + } + + request_idr_bursts(reason ? reason : "decoder-issue", 1, false); + } +} + static void initGstreamerOrThrow() { GError* error = nullptr; if (!gst_init_check(nullptr, nullptr, &error)) { @@ -114,7 +718,9 @@ GstRtpReceiver::GstRtpReceiver(const char *s, const VideoCodec& codec) { } GstRtpReceiver::~GstRtpReceiver(){ - close(sock); + if (sock >= 0) { + close(sock); + } } static std::shared_ptr> gst_copy_buffer(GstBuffer* buffer){ @@ -141,11 +747,13 @@ static void loop_pull_appsink_samples(bool& keep_looping,GstElement *app_sink_el //gst_debug_sample(sample); GstBuffer* buffer = gst_sample_get_buffer(sample); if (buffer) { + on_incoming_stream_buffer(buffer, "appsink"); auto buff_copy=gst_copy_buffer(buffer); out_cb(buff_copy); } gst_sample_unref(sample); } + tick_stream_presence(); } } @@ -175,6 +783,9 @@ void GstRtpReceiver::loop_pull_samples() void GstRtpReceiver::on_new_sample(std::shared_ptr > sample) { + if (sample && !sample->empty()) { + maybe_mark_idr_received(sample->data(), sample->size(), m_video_codec); + } if(m_cb){ //debug_sample(sample); m_cb(sample); @@ -283,6 +894,7 @@ void GstRtpReceiver::stop_receiving() { gst_object_unref(m_gst_pipeline); m_gst_pipeline = nullptr; } + reset_stream_tracking(); spdlog::info("GstRtpReceiver::stop_receiving end"); } @@ -345,6 +957,8 @@ void GstRtpReceiver::switch_to_stream() { return; } + attach_last_hop_probes(m_gst_pipeline); + // If using Unix socket, setup appsrc with buffer pool if (unix_socket) { GstElement* appsrc = gst_bin_get_by_name(GST_BIN(m_gst_pipeline), "appsrc"); @@ -556,4 +1170,28 @@ void GstRtpReceiver::skip_duration(int64_t skip_ms) { if (!gst_element_send_event(m_gst_pipeline, seek_event)) { spdlog::warn("Failed to send seek event for skipping."); } -} \ No newline at end of file +} + +void idr_set_enabled(bool enabled) { + g_idr_enabled.store(enabled, std::memory_order_relaxed); +} + +bool idr_get_enabled() { + return g_idr_enabled.load(std::memory_order_relaxed); +} + +void idr_request_record_start() { + request_idr_bursts("record-start", kIdrRecordRepeatCount, true); +} + +void idr_request_decoder_issue(const char* reason) { + const char* ctx = reason ? reason : "decoder-issue"; + maybe_request_idr_rate_limited(reason, ctx); +} + +void idr_notify_decoded_frame() { + if (!g_idr_enabled.load(std::memory_order_relaxed)) { + return; + } + g_last_decoded_ms.store(now_ms(), std::memory_order_relaxed); +} diff --git a/src/gstrtpreceiver.h b/src/gstrtpreceiver.h index 17dc9d8..b5b0a0f 100644 --- a/src/gstrtpreceiver.h +++ b/src/gstrtpreceiver.h @@ -8,6 +8,8 @@ #include #include +#include +#ifdef __cplusplus #include #include #include @@ -74,7 +76,7 @@ class GstRtpReceiver { std::unique_ptr m_pull_samples_thread=nullptr; // appsrc const char* unix_socket = nullptr; - int sock; + int sock = -1; bool m_read_socket_run = false; std::unique_ptr m_read_socket_thread; @@ -84,6 +86,19 @@ class GstRtpReceiver { bool m_is_paused = false; double m_pre_pause_rate = 1.0; }; +#endif +#ifdef __cplusplus +extern "C" { +#endif +void idr_set_enabled(bool enabled); +bool idr_get_enabled(); +void idr_request_record_start(); +void idr_request_decoder_issue(const char* reason); +void idr_notify_decoded_frame(); +#ifdef __cplusplus +} +#endif + #endif //FPVUE_GSTRTPRECEIVER_H diff --git a/src/main.cpp b/src/main.cpp index d4a5936..3b990a0 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -100,6 +100,7 @@ int video_zpos = 1; bool mavlink_dvr_on_arm = false; bool osd_custom_message = false; bool disable_vsync = false; +bool disable_gregidr = false; uint32_t refresh_frequency_ms = 1000; VideoCodec codec = VideoCodec::H265; @@ -262,6 +263,20 @@ void *__FRAME_THREAD__(void *param) init_buffer(frame); } else { // regular frame received + idr_notify_decoded_frame(); + const RK_U32 errinfo = mpp_frame_get_errinfo(frame); + const RK_U32 discard = mpp_frame_get_discard(frame); + if (errinfo || discard) { + const char* reason = "decoder-issue"; + if (errinfo && discard) { + reason = "decoder-errinfo+discard"; + } else if (errinfo) { + reason = "decoder-errinfo"; + } else if (discard) { + reason = "decoder-discard"; + } + idr_request_decoder_issue(reason); + } if (!mpi.first_frame_ts.tv_sec) { ts = ats; mpi.first_frame_ts = ats; @@ -571,6 +586,8 @@ void read_gstreamerpipe_stream(MppPacket *packet, int gst_udp_port, const char * auto cb=[&packet,/*&decoder_stalled_count,*/ &bytes_received, &period_start](std::shared_ptr> frame){ // Let the gst pull thread run at quite high priority static bool first= false; + static int stall_count = 0; + static uint64_t last_stall_idr_ms = 0; if(first){ SchedulingHelper::set_thread_params_max_realtime("DisplayThread",SchedulingHelper::PRIORITY_REALTIME_LOW); first= false; @@ -578,7 +595,17 @@ void read_gstreamerpipe_stream(MppPacket *packet, int gst_udp_port, const char * bytes_received += frame->size(); uint64_t now = get_time_ms(); osd_publish_uint_fact("gstreamer.received_bytes", NULL, 0, frame->size()); - feed_packet_to_decoder(packet,frame->data(),frame->size()); + const bool fed_ok = feed_packet_to_decoder(packet,frame->data(),frame->size()); + if (!fed_ok) { + stall_count++; + if (stall_count >= 3 && (now - last_stall_idr_ms) > 500) { + last_stall_idr_ms = now; + stall_count = 0; + idr_request_decoder_issue("decoder-feed-stall"); + } + } else { + stall_count = 0; + } if (dvr_enabled && dvr != NULL) { dvr->frame(frame); } @@ -695,6 +722,8 @@ void printHelp() { "\n" " --disable-vsync - Disable VSYNC commits\n" "\n" + " --disable-gregidr - Disable last-hop probing and IDR requests\n" + "\n" " --screen-mode-list - Print the list of supported screen modes and exit.\n" "\n" " --wfb-api-port - Port of wfb-server for cli statistics. (Default: 8003)\n" @@ -868,6 +897,11 @@ int main(int argc, char **argv) continue; } + __OnArgument("--disable-gregidr") { + disable_gregidr = true; + continue; + } + __OnArgument("--screen-mode-list") { print_modelist = 1; continue; @@ -909,6 +943,7 @@ int main(int argc, char **argv) __EndParseConsoleArguments__ spdlog::set_level(log_level); + idr_set_enabled(!disable_gregidr); if (dvr_template != NULL && video_framerate < 0 ) { printf("--dvr-framerate must be provided when dvr is enabled.\n");