diff --git a/include/libcyphal/transport/can/can_transport_impl.hpp b/include/libcyphal/transport/can/can_transport_impl.hpp index 3f422b1f4..59d448d6a 100644 --- a/include/libcyphal/transport/can/can_transport_impl.hpp +++ b/include/libcyphal/transport/can/can_transport_impl.hpp @@ -683,6 +683,14 @@ class TransportImpl final : private TransportDelegate, public ICanTransport return (*frame_handler_ptr)(deadline, *frame); }); } + + if ((result == 0) && (media.canard_tx_queue().size == 0)) + { + // There was nothing successfully polled, + // AND won't be in the (near) future (b/c queue is empty), + // so we are done with this TX media - no more callbacks for now (until brand new TX transfer). + media.tx_callback().reset(); + } } /// @brief Tries to peek the first TX item from the media TX queue which is not expired. diff --git a/include/libcyphal/transport/udp/udp_transport_impl.hpp b/include/libcyphal/transport/udp/udp_transport_impl.hpp index 89862b678..5fadaf9ca 100644 --- a/include/libcyphal/transport/udp/udp_transport_impl.hpp +++ b/include/libcyphal/transport/udp/udp_transport_impl.hpp @@ -760,7 +760,9 @@ class TransportImpl final : private TransportDelegate, public IUdpTransport } // for a valid tx item - // There is nothing to send anymore, so we are done with this media TX socket - no more callbacks for now. + // There was nothing successfully sent (otherwise we would have `return`-ed earlier), + // AND won't be in the (near) future (b/c queue is empty), + // so we are done with this TX media - no more callbacks for now (until brand new TX transfer). media.txSocketState().callback.reset(); } diff --git a/test/unittest/transport/can/test_can_transport.cpp b/test/unittest/transport/can/test_can_transport.cpp index 407f14e30..842428b82 100644 --- a/test/unittest/transport/can/test_can_transport.cpp +++ b/test/unittest/transport/can/test_can_transport.cpp @@ -529,6 +529,7 @@ TEST_F(TestCanTransport, sending_multiframe_payload_for_non_anonymous) constexpr auto timeout = 1s; + // MTU size makes sure that 2 frames are needed - hence "multi-frame". const auto payload = makeIotaArray(b('0')); TransferTxMetadata metadata{{0x13, Priority::Nominal}, {}}; @@ -537,6 +538,7 @@ TEST_F(TestCanTransport, sending_multiframe_payload_for_non_anonymous) scheduler_.scheduleAt(1s, [&](const auto&) { // + // Expect 1st frame pushing. EXPECT_CALL(media_mock_, push(_, _, _)).WillOnce([&](auto deadline, auto can_id, auto& pld) { EXPECT_THAT(now(), metadata.deadline - timeout); EXPECT_THAT(deadline, metadata.deadline); @@ -549,15 +551,19 @@ TEST_F(TestCanTransport, sending_multiframe_payload_for_non_anonymous) }); EXPECT_CALL(media_mock_, registerPushCallback(_)) // .WillOnce(Invoke([&](auto function) { // - return scheduler_.registerAndScheduleNamedCallback("", now() + 10us, std::move(function)); + return scheduler_.registerAndScheduleNamedCallback("tx", now() + 10us, std::move(function)); })); + // There was never any TX yet, so no callback should be registered. + EXPECT_FALSE(scheduler_.hasNamedCallback("tx")); + metadata.deadline = now() + timeout; auto failure = session->send(metadata, makeSpansFrom(payload)); EXPECT_THAT(failure, Eq(cetl::nullopt)); }); scheduler_.scheduleAt(1s + 10us, [&](const auto&) { // + // Expect 2nd frame pushing. EXPECT_CALL(media_mock_, push(_, _, _)).WillOnce([&](auto deadline, auto can_id, auto& pld) { EXPECT_THAT(now(), metadata.deadline - timeout + 10us); EXPECT_THAT(deadline, metadata.deadline); @@ -569,6 +575,19 @@ TEST_F(TestCanTransport, sending_multiframe_payload_for_non_anonymous) return IMedia::PushResult::Success{true /* is_accepted */}; }); }); + scheduler_.scheduleAt(1s + 20us, [&](const auto&) { + // + // Callback is still there b/c the 2nd frame was pushed. + ASSERT_TRUE(scheduler_.hasNamedCallback("tx")); + + // Emulate that media done with the 2nd frame - this should remove the callback b/c TX queue is empty. + scheduler_.scheduleNamedCallback("tx", now()); + }); + scheduler_.scheduleAt(1s + 20us + 1us, [&](const auto&) { + // + // TX pipeline encountered an empty queue - hence the callback should be dropped. + EXPECT_FALSE(scheduler_.hasNamedCallback("tx")); + }); scheduler_.spinFor(10s); } diff --git a/test/unittest/transport/udp/test_udp_transport.cpp b/test/unittest/transport/udp/test_udp_transport.cpp index 25ef2d527..efa22142b 100644 --- a/test/unittest/transport/udp/test_udp_transport.cpp +++ b/test/unittest/transport/udp/test_udp_transport.cpp @@ -563,11 +563,13 @@ TEST_F(TestUpdTransport, sending_multiframe_payload_for_non_anonymous) constexpr auto timeout = 1s; + // +1 makes sure that 2 frames are needed - hence "multi-frame". const auto payload = makeIotaArray(b('0')); TransferTxMetadata metadata{{0x13, Priority::Nominal}, {}}; scheduler_.scheduleAt(1s, [&](const auto&) { // + // Expect 1st frame sending. EXPECT_CALL(tx_socket_mock_, send(_, _, _, _)) .WillOnce([&](auto deadline, auto endpoint, auto, auto fragments) { EXPECT_THAT(now(), metadata.deadline - timeout); @@ -579,15 +581,22 @@ TEST_F(TestUpdTransport, sending_multiframe_payload_for_non_anonymous) }); EXPECT_CALL(tx_socket_mock_, registerCallback(_)) // .WillOnce(Invoke([&](auto function) { // - return scheduler_.registerAndScheduleNamedCallback("", now() + 10us, std::move(function)); + return scheduler_.registerAndScheduleNamedCallback("tx", now() + 10us, std::move(function)); })); + // There was never any TX yet, so no callback should be registered. + EXPECT_FALSE(scheduler_.hasNamedCallback("tx")); + metadata.deadline = now() + timeout; auto failure = session->send(metadata, makeSpansFrom(payload)); EXPECT_THAT(failure, Eq(cetl::nullopt)); + + // We just did TX - it should register callback (to run at +10us) for the 1st frame. + EXPECT_TRUE(scheduler_.hasNamedCallback("tx")); }); scheduler_.scheduleAt(1s + 10us, [&](const auto&) { // + // Expect 2nd frame sending. EXPECT_CALL(tx_socket_mock_, send(_, _, _, _)) .WillOnce([&](auto deadline, auto endpoint, auto, auto fragments) { EXPECT_THAT(now(), metadata.deadline - timeout + 10us); @@ -599,6 +608,19 @@ TEST_F(TestUpdTransport, sending_multiframe_payload_for_non_anonymous) return ITxSocket::SendResult::Success{true /* is_accepted */}; }); }); + scheduler_.scheduleAt(1s + 20us, [&](const auto&) { + // + // Callback is still there b/c the 2nd frame was sent. + ASSERT_TRUE(scheduler_.hasNamedCallback("tx")); + + // Emulate that media done with the 2nd frame - this should remove the callback b/c TX queue is empty. + scheduler_.scheduleNamedCallback("tx", now()); + }); + scheduler_.scheduleAt(1s + 20us + 1us, [&](const auto&) { + // + // TX pipeline encountered an empty queue - hence the callback should be dropped. + EXPECT_FALSE(scheduler_.hasNamedCallback("tx")); + }); scheduler_.scheduleAt(9s, [&](const auto&) { // session.reset();