Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions runtime-light/stdlib/exit/exit-functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "runtime-light/state/instance-state.h"
#include "runtime-light/stdlib/fork/fork-functions.h"
#include "runtime-light/stdlib/output/output-state.h"
#include "runtime-light/stdlib/rpc/rpc-client-state.h"

inline kphp::coro::task<> f$exit(mixed v = 0) noexcept { // TODO: make it synchronous
auto& instance_st{InstanceState::get()};
Expand All @@ -26,6 +27,20 @@ inline kphp::coro::task<> f$exit(mixed v = 0) noexcept { // TODO: make it synchr
exit_code = 1;
}
co_await kphp::forks::id_managed(instance_st.run_instance_epilogue());

// Sending a request can be canceled if the instance closes the stream before it will be sent by RPC component.
// Wait for all results to guarantee that the requests are sent.
auto& rpc_client_instance_st{RpcClientInstanceState::get()};
while (!rpc_client_instance_st.response_awaiter_tasks.empty()) {
const auto it_response_awaiter{rpc_client_instance_st.response_awaiter_tasks.begin()};
const auto& [query_id, awaiter_task]{*it_response_awaiter};

rpc_client_instance_st.response_awaiter_tasks.erase(it_response_awaiter);
rpc_client_instance_st.response_fetcher_instances.erase(query_id);

co_await kphp::forks::id_managed(awaiter_task);
}

k2::exit(static_cast<int32_t>(exit_code));
}

Expand Down
3 changes: 2 additions & 1 deletion runtime-light/stdlib/rpc/rpc-api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,11 @@ kphp::coro::task<kphp::rpc::query_info> send_request(std::string_view actor, std
auto awaiter_task{awaiter_coroutine(query_id, std::move(stream), timeout, collect_responses_extra_info)};
kphp::log::assertion(kphp::coro::io_scheduler::get().start(awaiter_task));

rpc_client_instance_st.response_awaiter_tasks.emplace(query_id, std::move(awaiter_task));

if (ignore_answer) {
co_return kphp::rpc::query_info{.id = kphp::rpc::IGNORED_ANSWER_QUERY_ID, .request_size = request_size, .timestamp = timestamp};
}
rpc_client_instance_st.response_awaiter_tasks.emplace(query_id, std::move(awaiter_task));
co_return kphp::rpc::query_info{.id = query_id, .request_size = request_size, .timestamp = timestamp};
}

Expand Down
Loading