Skip to content

Commit f4521f3

Browse files
committed
Try to cancel running requests when shutdown timeout
1 parent ad722f4 commit f4521f3

File tree

1 file changed

+32
-4
lines changed

1 file changed

+32
-4
lines changed

exporters/otlp/src/otlp_grpc_client.cc

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ struct OtlpGrpcClientAsyncData
9090
// too many threads in the process. So we use one completion queue and shared context.
9191
std::shared_ptr<grpc::Channel> channel;
9292
#ifdef ENABLE_ASYNC_EXPORT
93-
grpc::CompletionQueue cq;
94-
93+
std::mutex running_calls_lock;
94+
std::unordered_set<std::shared_ptr<OtlpGrpcAsyncCallDataBase>> running_calls;
9595
// Running requests, this is used to limit the number of concurrent requests.
9696
std::atomic<std::size_t> running_requests{0};
9797
// Request counter is used to record ForceFlush.
@@ -206,6 +206,11 @@ static sdk::common::ExportResult InternalDelegateAsyncExport(
206206

207207
++async_data->start_request_counter;
208208
++async_data->running_requests;
209+
{
210+
std::lock_guard<std::mutex> lock{async_data->running_calls_lock};
211+
async_data->running_calls.insert(
212+
std::static_pointer_cast<OtlpGrpcAsyncCallDataBase>(call_data));
213+
}
209214
// Some old toolchains can only use gRPC 1.33 and it's experimental.
210215
# if defined(GRPC_CPP_VERSION_MAJOR) && \
211216
(GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039
@@ -215,6 +220,11 @@ static sdk::common::ExportResult InternalDelegateAsyncExport(
215220
# endif
216221
->Export(call_data->grpc_context.get(), call_data->request, call_data->response,
217222
[call_data, async_data](::grpc::Status grpc_status) {
223+
{
224+
std::lock_guard<std::mutex> lock{async_data->running_calls_lock};
225+
async_data->running_calls.erase(
226+
std::static_pointer_cast<OtlpGrpcAsyncCallDataBase>(call_data));
227+
}
218228
--async_data->running_requests;
219229
++async_data->finished_request_counter;
220230

@@ -259,11 +269,22 @@ OtlpGrpcClient::OtlpGrpcClient(const OtlpGrpcClientOptions &options) : is_shutdo
259269

260270
OtlpGrpcClient::~OtlpGrpcClient()
261271
{
262-
263272
std::shared_ptr<OtlpGrpcClientAsyncData> async_data;
264273
async_data.swap(async_data_);
265274

266275
#ifdef ENABLE_ASYNC_EXPORT
276+
if (async_data)
277+
{
278+
std::lock_guard<std::mutex> lock(async_data->running_calls_lock);
279+
for (auto &call_data : async_data->running_calls)
280+
{
281+
if (call_data && call_data->grpc_context)
282+
{
283+
call_data->grpc_context->TryCancel();
284+
}
285+
}
286+
}
287+
267288
while (async_data && async_data->running_requests.load(std::memory_order_acquire) > 0)
268289
{
269290
std::unique_lock<std::mutex> lock{async_data->session_waker_lock};
@@ -642,7 +663,14 @@ bool OtlpGrpcClient::Shutdown(OtlpGrpcClientReferenceGuard &guard,
642663
{
643664
force_flush_result = ForceFlush(timeout);
644665

645-
async_data_->cq.Shutdown();
666+
std::lock_guard<std::mutex> lock(async_data_->running_calls_lock);
667+
for (auto &call_data : async_data_->running_calls)
668+
{
669+
if (call_data && call_data->grpc_context)
670+
{
671+
call_data->grpc_context->TryCancel();
672+
}
673+
}
646674
}
647675
else
648676
{

0 commit comments

Comments
 (0)