Skip to content

Commit 0d409d4

Browse files
committed
Share grpc::Channel
1 parent c9cb2df commit 0d409d4

14 files changed

+120
-156
lines changed

exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ namespace otlp
3434

3535
class OtlpGrpcClient;
3636
struct OtlpGrpcClientOptions;
37-
38-
#ifdef ENABLE_ASYNC_EXPORT
3937
struct OtlpGrpcClientAsyncData;
4038

4139
class OtlpGrpcClientReferenceGuard
@@ -53,15 +51,14 @@ class OtlpGrpcClientReferenceGuard
5351
friend class OtlpGrpcClient;
5452
std::atomic<bool> has_value_;
5553
};
56-
#endif
5754

5855
/**
5956
* The OTLP gRPC client contains utility functions of gRPC.
6057
*/
6158
class OtlpGrpcClient
6259
{
6360
public:
64-
OtlpGrpcClient();
61+
OtlpGrpcClient(const OtlpGrpcClientOptions &options);
6562

6663
~OtlpGrpcClient();
6764

@@ -79,20 +76,18 @@ class OtlpGrpcClient
7976
/**
8077
* Create trace service stub to communicate with the OpenTelemetry Collector.
8178
*/
82-
static std::unique_ptr<proto::collector::trace::v1::TraceService::StubInterface>
83-
MakeTraceServiceStub(const OtlpGrpcClientOptions &options);
79+
std::unique_ptr<proto::collector::trace::v1::TraceService::StubInterface> MakeTraceServiceStub();
8480

8581
/**
8682
* Create metrics service stub to communicate with the OpenTelemetry Collector.
8783
*/
88-
static std::unique_ptr<proto::collector::metrics::v1::MetricsService::StubInterface>
89-
MakeMetricsServiceStub(const OtlpGrpcClientOptions &options);
84+
std::unique_ptr<proto::collector::metrics::v1::MetricsService::StubInterface>
85+
MakeMetricsServiceStub();
9086

9187
/**
9288
* Create logs service stub to communicate with the OpenTelemetry Collector.
9389
*/
94-
static std::unique_ptr<proto::collector::logs::v1::LogsService::StubInterface>
95-
MakeLogsServiceStub(const OtlpGrpcClientOptions &options);
90+
std::unique_ptr<proto::collector::logs::v1::LogsService::StubInterface> MakeLogsServiceStub();
9691

9792
static grpc::Status DelegateExport(
9893
proto::collector::trace::v1::TraceService::StubInterface *stub,
@@ -115,8 +110,6 @@ class OtlpGrpcClient
115110
proto::collector::logs::v1::ExportLogsServiceRequest &&request,
116111
proto::collector::logs::v1::ExportLogsServiceResponse *response);
117112

118-
#ifdef ENABLE_ASYNC_EXPORT
119-
120113
void AddReference(OtlpGrpcClientReferenceGuard &guard,
121114
const OtlpGrpcClientOptions &options) noexcept;
122115

@@ -128,6 +121,7 @@ class OtlpGrpcClient
128121
*/
129122
bool RemoveReference(OtlpGrpcClientReferenceGuard &guard) noexcept;
130123

124+
#ifdef ENABLE_ASYNC_EXPORT
131125
/**
132126
* Async export
133127
* @param options Options used to message to create gRPC context and stub(if necessary)
@@ -187,6 +181,7 @@ class OtlpGrpcClient
187181
const proto::collector::logs::v1::ExportLogsServiceRequest &,
188182
proto::collector::logs::v1::ExportLogsServiceResponse *)>
189183
&&result_callback) noexcept;
184+
#endif
190185

191186
/**
192187
* Force flush the gRPC client.
@@ -212,7 +207,6 @@ class OtlpGrpcClient
212207

213208
// Stores shared data between threads of this gRPC client
214209
std::shared_ptr<OtlpGrpcClientAsyncData> async_data_;
215-
#endif
216210
};
217211
} // namespace otlp
218212
} // namespace exporter

exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client_factory.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include <memory>
77

8+
#include "opentelemetry/exporters/otlp/otlp_grpc_client_options.h"
89
#include "opentelemetry/nostd/shared_ptr.h"
910

1011
OPENTELEMETRY_BEGIN_NAMESPACE
@@ -27,7 +28,7 @@ class OPENTELEMETRY_EXPORT OtlpGrpcClientFactory
2728
/**
2829
* Create an OtlpGrpcClient using all default options.
2930
*/
30-
static nostd::shared_ptr<OtlpGrpcClient> Create();
31+
static nostd::shared_ptr<OtlpGrpcClient> Create(const OtlpGrpcClientOptions &options);
3132

3233
#ifdef ENABLE_ASYNC_EXPORT
3334
static nostd::shared_ptr<OtlpGrpcClientReferenceGuard> CreateReferenceGuard();

exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,23 +92,19 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter
9292
bool Shutdown(
9393
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override;
9494

95-
#ifdef ENABLE_ASYNC_EXPORT
9695
/**
9796
* Get the Client object
9897
*
9998
* @return return binded gRPC client
10099
*/
101100
const nostd::shared_ptr<OtlpGrpcClient> &GetClient() const noexcept;
102-
#endif
103101

104102
private:
105103
// The configuration options associated with this exporter.
106104
const OtlpGrpcExporterOptions options_;
107105

108-
#ifdef ENABLE_ASYNC_EXPORT
109106
nostd::shared_ptr<OtlpGrpcClient> client_;
110107
nostd::shared_ptr<OtlpGrpcClientReferenceGuard> client_reference_guard_;
111-
#endif
112108

113109
// For testing
114110
friend class OtlpGrpcExporterTestPeer;

exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,23 +91,19 @@ class OtlpGrpcLogRecordExporter : public opentelemetry::sdk::logs::LogRecordExpo
9191
bool Shutdown(
9292
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override;
9393

94-
#ifdef ENABLE_ASYNC_EXPORT
9594
/**
9695
* Get the Client object
9796
*
9897
* @return return binded gRPC client
9998
*/
10099
const nostd::shared_ptr<OtlpGrpcClient> &GetClient() const noexcept;
101-
#endif
102100

103101
private:
104102
// Configuration options for the exporter
105103
const OtlpGrpcLogRecordExporterOptions options_;
106104

107-
#ifdef ENABLE_ASYNC_EXPORT
108105
nostd::shared_ptr<OtlpGrpcClient> client_;
109106
nostd::shared_ptr<OtlpGrpcClientReferenceGuard> client_reference_guard_;
110-
#endif
111107

112108
// For testing
113109
friend class OtlpGrpcLogRecordExporterTestPeer;

exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_metric_exporter.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,23 +77,19 @@ class OtlpGrpcMetricExporter : public opentelemetry::sdk::metrics::PushMetricExp
7777
bool Shutdown(
7878
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override;
7979

80-
#ifdef ENABLE_ASYNC_EXPORT
8180
/**
8281
* Get the Client object
8382
*
8483
* @return return binded gRPC client
8584
*/
8685
const nostd::shared_ptr<OtlpGrpcClient> &GetClient() const noexcept;
87-
#endif
8886

8987
private:
9088
// The configuration options associated with this exporter.
9189
const OtlpGrpcMetricExporterOptions options_;
9290

93-
#ifdef ENABLE_ASYNC_EXPORT
9491
nostd::shared_ptr<OtlpGrpcClient> client_;
9592
nostd::shared_ptr<OtlpGrpcClientReferenceGuard> client_reference_guard_;
96-
#endif
9793

9894
// Aggregation Temporality selector
9995
const sdk::metrics::AggregationTemporalitySelector aggregation_temporality_selector_;

exporters/otlp/src/otlp_grpc_client.cc

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,18 @@ class OPENTELEMETRY_LOCAL_SYMBOL OtlpGrpcAsyncCallData : public OtlpGrpcAsyncCal
7878
virtual ~OtlpGrpcAsyncCallData() {}
7979
};
8080
} // namespace
81+
#endif
8182

8283
struct OtlpGrpcClientAsyncData
8384
{
85+
8486
std::chrono::system_clock::duration export_timeout = std::chrono::seconds{10};
8587

8688
// The best performance trade-off of gRPC is having numcpu's threads and one completion queue
8789
// per thread, but this exporter should not cost a lot resource and we don't want to create
88-
// too many threads in the process. So we use one completion queue.
90+
// too many threads in the process. So we use one completion queue and shared context.
91+
std::shared_ptr<grpc::Channel> channel;
92+
#ifdef ENABLE_ASYNC_EXPORT
8993
grpc::CompletionQueue cq;
9094

9195
// Running requests, this is used to limit the number of concurrent requests.
@@ -98,6 +102,7 @@ struct OtlpGrpcClientAsyncData
98102
// Condition variable and mutex to control the concurrency count of running requests.
99103
std::mutex session_waker_lock;
100104
std::condition_variable session_waker;
105+
#endif
101106

102107
// Reference count of OtlpGrpcClient
103108
std::atomic<int64_t> reference_count{0};
@@ -107,7 +112,6 @@ struct OtlpGrpcClientAsyncData
107112
// https://stackoverflow.com/questions/53408962/try-to-understand-compiler-error-message-default-member-initializer-required-be
108113
OtlpGrpcClientAsyncData() {}
109114
};
110-
#endif
111115

112116
namespace
113117
{
@@ -243,24 +247,23 @@ static sdk::common::ExportResult InternalDelegateAsyncExport(
243247
#endif
244248
} // namespace
245249

246-
#ifdef ENABLE_ASYNC_EXPORT
247250
OtlpGrpcClientReferenceGuard::OtlpGrpcClientReferenceGuard() noexcept : has_value_{false} {}
248251

249252
OtlpGrpcClientReferenceGuard::~OtlpGrpcClientReferenceGuard() noexcept {}
250-
#endif
251253

252-
OtlpGrpcClient::OtlpGrpcClient()
253-
#ifdef ENABLE_ASYNC_EXPORT
254-
: is_shutdown_(false)
255-
#endif
256-
{}
254+
OtlpGrpcClient::OtlpGrpcClient(const OtlpGrpcClientOptions &options) : is_shutdown_(false)
255+
{
256+
std::shared_ptr<OtlpGrpcClientAsyncData> async_data = MutableAsyncData(options);
257+
async_data->channel = MakeChannel(options);
258+
}
257259

258260
OtlpGrpcClient::~OtlpGrpcClient()
259261
{
260-
#ifdef ENABLE_ASYNC_EXPORT
262+
261263
std::shared_ptr<OtlpGrpcClientAsyncData> async_data;
262264
async_data.swap(async_data_);
263265

266+
#ifdef ENABLE_ASYNC_EXPORT
264267
while (async_data && async_data->running_requests.load(std::memory_order_acquire) > 0)
265268
{
266269
std::unique_lock<std::mutex> lock{async_data->session_waker_lock};
@@ -359,21 +362,33 @@ std::unique_ptr<grpc::ClientContext> OtlpGrpcClient::MakeClientContext(
359362
}
360363

361364
std::unique_ptr<proto::collector::trace::v1::TraceService::StubInterface>
362-
OtlpGrpcClient::MakeTraceServiceStub(const OtlpGrpcClientOptions &options)
365+
OtlpGrpcClient::MakeTraceServiceStub()
363366
{
364-
return proto::collector::trace::v1::TraceService::NewStub(MakeChannel(options));
367+
if (!async_data_ || !async_data_->channel)
368+
{
369+
return nullptr;
370+
}
371+
return proto::collector::trace::v1::TraceService::NewStub(async_data_->channel);
365372
}
366373

367374
std::unique_ptr<proto::collector::metrics::v1::MetricsService::StubInterface>
368-
OtlpGrpcClient::MakeMetricsServiceStub(const OtlpGrpcClientOptions &options)
375+
OtlpGrpcClient::MakeMetricsServiceStub()
369376
{
370-
return proto::collector::metrics::v1::MetricsService::NewStub(MakeChannel(options));
377+
if (!async_data_ || !async_data_->channel)
378+
{
379+
return nullptr;
380+
}
381+
return proto::collector::metrics::v1::MetricsService::NewStub(async_data_->channel);
371382
}
372383

373384
std::unique_ptr<proto::collector::logs::v1::LogsService::StubInterface>
374-
OtlpGrpcClient::MakeLogsServiceStub(const OtlpGrpcClientOptions &options)
385+
OtlpGrpcClient::MakeLogsServiceStub()
375386
{
376-
return proto::collector::logs::v1::LogsService::NewStub(MakeChannel(options));
387+
if (!async_data_ || !async_data_->channel)
388+
{
389+
return nullptr;
390+
}
391+
return proto::collector::logs::v1::LogsService::NewStub(async_data_->channel);
377392
}
378393

379394
grpc::Status OtlpGrpcClient::DelegateExport(
@@ -406,8 +421,6 @@ grpc::Status OtlpGrpcClient::DelegateExport(
406421
return stub->Export(context.get(), request, response);
407422
}
408423

409-
#ifdef ENABLE_ASYNC_EXPORT
410-
411424
void OtlpGrpcClient::AddReference(OtlpGrpcClientReferenceGuard &guard,
412425
const OtlpGrpcClientOptions &options) noexcept
413426
{
@@ -437,6 +450,8 @@ bool OtlpGrpcClient::RemoveReference(OtlpGrpcClientReferenceGuard &guard) noexce
437450
return true;
438451
}
439452

453+
#ifdef ENABLE_ASYNC_EXPORT
454+
440455
/**
441456
* Async export
442457
* @param options Options used to message to create gRPC context and stub(if necessary)
@@ -538,6 +553,8 @@ sdk::common::ExportResult OtlpGrpcClient::DelegateAsyncExport(
538553
"log(s)");
539554
}
540555

556+
#endif
557+
541558
std::shared_ptr<OtlpGrpcClientAsyncData> OtlpGrpcClient::MutableAsyncData(
542559
const OtlpGrpcClientOptions &options)
543560
{
@@ -556,13 +573,15 @@ bool OtlpGrpcClient::IsShutdown() const noexcept
556573
return is_shutdown_.load(std::memory_order_acquire);
557574
}
558575

559-
bool OtlpGrpcClient::ForceFlush(std::chrono::microseconds timeout) noexcept
576+
bool OtlpGrpcClient::ForceFlush(
577+
OPENTELEMETRY_MAYBE_UNUSED std::chrono::microseconds timeout) noexcept
560578
{
561579
if (!async_data_)
562580
{
563581
return true;
564582
}
565583

584+
#ifdef ENABLE_ASYNC_EXPORT
566585
std::size_t request_counter = async_data_->start_request_counter.load(std::memory_order_acquire);
567586
if (request_counter <= async_data_->finished_request_counter.load(std::memory_order_acquire))
568587
{
@@ -601,16 +620,20 @@ bool OtlpGrpcClient::ForceFlush(std::chrono::microseconds timeout) noexcept
601620
}
602621

603622
return timeout_steady > std::chrono::steady_clock::duration::zero();
623+
#else
624+
return true;
625+
#endif
604626
}
605627

606628
bool OtlpGrpcClient::Shutdown(OtlpGrpcClientReferenceGuard &guard,
607-
std::chrono::microseconds timeout) noexcept
629+
OPENTELEMETRY_MAYBE_UNUSED std::chrono::microseconds timeout) noexcept
608630
{
609631
if (!async_data_)
610632
{
611633
return true;
612634
}
613635

636+
#ifdef ENABLE_ASYNC_EXPORT
614637
bool last_reference_removed = RemoveReference(guard);
615638
bool force_flush_result;
616639
if (last_reference_removed && false == is_shutdown_.exchange(true, std::memory_order_acq_rel))
@@ -625,9 +648,11 @@ bool OtlpGrpcClient::Shutdown(OtlpGrpcClientReferenceGuard &guard,
625648
}
626649

627650
return force_flush_result;
628-
}
629-
651+
#else
652+
RemoveReference(guard);
653+
return true;
630654
#endif
655+
}
631656

632657
} // namespace otlp
633658
} // namespace exporter

exporters/otlp/src/otlp_grpc_client_factory.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@ namespace exporter
1212
namespace otlp
1313
{
1414

15-
nostd::shared_ptr<OtlpGrpcClient> OtlpGrpcClientFactory::Create()
15+
nostd::shared_ptr<OtlpGrpcClient> OtlpGrpcClientFactory::Create(
16+
const OtlpGrpcClientOptions &options)
1617
{
17-
return nostd::shared_ptr<OtlpGrpcClient>(new OtlpGrpcClient());
18+
return nostd::shared_ptr<OtlpGrpcClient>(new OtlpGrpcClient(options));
1819
}
1920

2021
#ifdef ENABLE_ASYNC_EXPORT

0 commit comments

Comments
 (0)