Skip to content

Commit a8f1eb6

Browse files
committed
Various build fixes
1 parent 9ec80e6 commit a8f1eb6

File tree

5 files changed

+39
-23
lines changed

5 files changed

+39
-23
lines changed

demos/comprehensive/src/transport/benchmark.cpp

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,6 @@ int main()
567567
{
568568
for (size_t blob_size : blob_sizes)
569569
{
570-
fmt::print("blob_size {}", blob_size);
571570
auto result = run_spsc_benchmark(enc.enc, blob_size);
572571
if (result.error != rpc::error::OK())
573572
{
@@ -578,21 +577,21 @@ int main()
578577
}
579578
}
580579

581-
fmt::print("run_tcp_benchmark\n");
582-
uint16_t tcp_port = 18900;
583-
for (const auto& enc : encodings)
584-
{
585-
for (size_t blob_size : blob_sizes)
586-
{
587-
auto result = run_tcp_benchmark(enc.enc, blob_size, tcp_port++);
588-
if (result.error != rpc::error::OK())
589-
{
590-
fmt::print("tcp | {:>18} | {:>9} | error {}\n", enc.name, blob_size, result.error);
591-
continue;
592-
}
593-
print_stats("tcp", enc.name, blob_size, result.stats);
594-
}
595-
}
580+
// fmt::print("run_tcp_benchmark\n");
581+
// uint16_t tcp_port = 18900;
582+
// for (const auto& enc : encodings)
583+
// {
584+
// for (size_t blob_size : blob_sizes)
585+
// {
586+
// auto result = run_tcp_benchmark(enc.enc, blob_size, tcp_port++);
587+
// if (result.error != rpc::error::OK())
588+
// {
589+
// fmt::print("tcp | {:>18} | {:>9} | error {}\n", enc.name, blob_size, result.error);
590+
// continue;
591+
// }
592+
// print_stats("tcp", enc.name, blob_size, result.stats);
593+
// }
594+
// }
596595

597596
print_footer();
598597
return 0;

rpc/include/rpc/internal/remote_pointer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -752,7 +752,7 @@ namespace rpc
752752
std::allocator_traits<decltype(actual_cb_allocator)>::deallocate(actual_cb_allocator, this, 1);
753753
}
754754
void* get_deleter_ptr(const std::type_info&) noexcept override { return nullptr; }
755-
~control_block_make_shared() DEFAULT_DESTRUCTOR
755+
~control_block_make_shared() { }
756756
};
757757
} // namespace __shared_ptr_control_block
758758

telemetry/include/rpc/telemetry/multiplexing_telemetry_service.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ namespace rpc
7474
explicit multiplexing_telemetry_service(std::vector<std::shared_ptr<i_telemetry_service>>&& child_services);
7575

7676
virtual ~multiplexing_telemetry_service() DEFAULT_DESTRUCTOR;
77-
;
77+
7878
multiplexing_telemetry_service(const multiplexing_telemetry_service&) = delete;
7979
multiplexing_telemetry_service& operator=(const multiplexing_telemetry_service&) = delete;
8080

transports/spsc/include/transports/spsc/transport.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ namespace rpc::spsc
5555
stdex::member_ptr<spsc_transport> keep_alive_;
5656

5757
std::atomic<bool> peer_requested_disconnection_ = false;
58+
std::atomic<bool> pumps_started_ = false;
5859

5960
struct activity_tracker
6061
{
@@ -170,7 +171,7 @@ namespace rpc::spsc
170171
// Check if the operation was cancelled during shutdown
171172
if (res_payload.error_code != rpc::error::OK())
172173
{
173-
RPC_DEBUG("call_peer returning cancelled error for zone: {} sequence_number: {}",
174+
RPC_ERROR("call_peer returning cancelled error for zone: {} sequence_number: {}",
174175
get_service()->get_zone_id().get_val(),
175176
sequence_number);
176177
CO_RETURN res_payload.error_code;

transports/spsc/src/transport.cpp

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,14 @@ namespace rpc::spsc
359359
auto self = shared_from_this();
360360
RPC_DEBUG("pump_send_and_receive zone={}", get_zone_id().get_val());
361361

362+
// Guard against multiple calls
363+
bool expected = false;
364+
if (!pumps_started_.compare_exchange_strong(expected, true))
365+
{
366+
RPC_ERROR("pump_send_and_receive called MULTIPLE TIMES on zone {} - BUG!", get_zone_id().get_val());
367+
return;
368+
}
369+
362370
// Schedule both producer and consumer tasks
363371
auto svc = get_service();
364372
auto scheduler = svc->get_scheduler();
@@ -429,7 +437,7 @@ namespace rpc::spsc
429437
stop_loop = true;
430438
break;
431439
}
432-
// CO_AWAIT svc->get_scheduler()->yield_for(std::chrono::milliseconds(100));
440+
// CO_AWAIT svc->get_scheduler()->yield_for(std::chrono::milliseconds(1));
433441
CO_AWAIT svc->get_scheduler()->schedule();
434442
}
435443
break;
@@ -457,7 +465,7 @@ namespace rpc::spsc
457465
auto str_err = rpc::from_yas_binary(rpc::span(prefix_buf), prefix);
458466
if (!str_err.empty())
459467
{
460-
RPC_ERROR("failed invalid prefix");
468+
RPC_ERROR("Instance #{} deserialization FAILED: {}", my_instance, str_err);
461469
break;
462470
}
463471
assert(prefix.direction);
@@ -484,9 +492,14 @@ namespace rpc::spsc
484492
message_blob blob;
485493
if (!receive_spsc_queue_->pop(blob))
486494
{
495+
// if (get_status() == rpc::transport_status::DISCONNECTING)
496+
// {
497+
// stop_loop = true;
498+
// break;
499+
// }
487500
if (!received_any)
488501
{
489-
// CO_AWAIT svc->get_scheduler()->yield_for(std::chrono::milliseconds(100));
502+
// CO_AWAIT svc->get_scheduler()->yield_for(std::chrono::milliseconds(1));
490503
CO_AWAIT svc->get_scheduler()->schedule();
491504
}
492505
break;
@@ -506,6 +519,9 @@ namespace rpc::spsc
506519
}
507520
}
508521

522+
if (stop_loop)
523+
break;
524+
509525
if (receive_data.empty())
510526
{
511527
envelope_payload payload;
@@ -682,7 +698,7 @@ namespace rpc::spsc
682698
auto status = push_message(send_data);
683699
if (status == send_queue_status::SEND_QUEUE_EMPTY || status == send_queue_status::SPSC_QUEUE_FULL)
684700
{
685-
// CO_AWAIT svc->get_scheduler()->yield_for(std::chrono::milliseconds(100));
701+
// CO_AWAIT svc->get_scheduler()->yield_for(std::chrono::milliseconds(1));
686702
CO_AWAIT svc->get_scheduler()->schedule();
687703
}
688704
}

0 commit comments

Comments
 (0)