Skip to content

Commit ae7233d

Browse files
committed
detect loop
1 parent e91954c commit ae7233d

File tree

5 files changed

+106
-45
lines changed

5 files changed

+106
-45
lines changed

exporters/otlp/src/otlp_grpc_forward_proxy.cc

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ namespace otlp
2828

2929
// Detect loops, by sending a random identifier for each forward proxy instance.
3030
// All identifiers received are collected, and send as individual http headers.
31-
static std::string kFwProxyRidHeader{ "otel-fw-proxy-rid" };
31+
static std::string kFwProxyRidHeader{ "otel-grpc-fw-proxy-uid" };
3232

3333
using namespace opentelemetry;
3434

@@ -112,14 +112,23 @@ struct OtlpGrpcForwardProxy::Impl
112112
bool CheckForLoop( grpc::ClientContext* context, const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata ) const
113113
{
114114
assert(context != nullptr);
115-
const auto matching_keys{ client_metadata.equal_range( grpc::string_ref( kFwProxyRidHeader ) ) };
116-
for( auto i = matching_keys.first; i != matching_keys.second; ++i )
117-
if( ascii_strieq( i->second.data(), fw_proxy_id.c_str() ) )
118-
// Loop detected.
115+
const auto matching_keys{ client_metadata.equal_range( kFwProxyRidHeader ) };
116+
auto index{ 0 };
117+
for( auto it = matching_keys.first; it != matching_keys.second; ++it )
118+
{
119+
std::string value(it->second.cbegin(), it->second.cend());
120+
OTEL_INTERNAL_LOG_DEBUG("[otlp_grpc_forward_proxy] proxy=" << fw_proxy_id << " got=" << value << " index=" << ++index);
121+
if( ascii_strieq( value.c_str(), fw_proxy_id.c_str() ) )
122+
{
123+
OTEL_INTERNAL_LOG_ERROR("[otlp_grpc_forward_proxy] Loop detected!");
119124
return true;
125+
}
120126
else
127+
{
121128
// Add what we've got so far
122-
context->AddMetadata(kFwProxyRidHeader, i->second.data() );
129+
context->AddMetadata(kFwProxyRidHeader, value );
130+
}
131+
}
123132
// Add ourselves too.
124133
context->AddMetadata(kFwProxyRidHeader, fw_proxy_id);
125134
return false;

x/.bazelrc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ build --@otel_sdk//:with_dll=true
33
try-import %workspace%/../../top.bazelrc
44
test --test_output=streamed
55

6-
common --run_under='"C:/Program Files/Microsoft Visual Studio/2022/Enterprise/Common7/IDE/devenv.exe"'
6+
common:devenv --run_under='"C:/Program Files/Microsoft Visual Studio/2022/Enterprise/Common7/IDE/devenv.exe" /debugexe '
7+
test:devenv --run_under='"C:/Program Files/Microsoft Visual Studio/2022/Enterprise/Common7/IDE/devenv.exe" /debugexe '

x/BUILD.bazel

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,39 @@ cc_test(
1010
deps = ["@otel_sdk//:dll"],
1111
local = True,
1212
tags = ["manual", "external"],
13+
14+
env_inherit =
15+
[
16+
"ALLUSERSPROFILE",
17+
"APPDATA",
18+
"CommonProgramFiles",
19+
"CommonProgramFiles(x86)",
20+
"CommonProgramW6432",
21+
"COMPUTERNAME",
22+
"ComSpec",
23+
"LOCALAPPDATA",
24+
"NUMBER_OF_PROCESSORS",
25+
"OS",
26+
"Path",
27+
"PATHEXT",
28+
"PROCESSOR_ARCHITECTURE",
29+
"PROCESSOR_IDENTIFIER",
30+
"PROCESSOR_LEVEL",
31+
"PROCESSOR_REVISION",
32+
"ProgramData",
33+
"ProgramFiles",
34+
"ProgramFiles(x86)",
35+
"ProgramW6432",
36+
"SystemDrive",
37+
"SystemRoot",
38+
"TEMP",
39+
"TMP",
40+
"USERNAME",
41+
"USERPROFILE",
42+
"VS140COMNTOOLS",
43+
"windir",
44+
"_NT_SYMBOL_PATH",
45+
],
1346
)
1447

1548
cc_binary(

x/MODULE.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ local_path_override(
99
path = "..",
1010
)
1111

12+
bazel_dep(name = "abseil-cpp", version = "20240722.0.bcr.1")
1213
bazel_dep(name = "aspect_bazel_lib", version = "2.9.4")
1314
bazel_dep(name = "bazel_skylib", version = "1.7.1")
1415
# Below is needed as we have "build --@curl//:use_mbedtls=true" from ..\.bazelrc

x/x.cpp

Lines changed: 55 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,11 @@ void metrics_counter_example(const std::string &name)
139139
provider->GetMeter(name, "1.2.0");
140140
auto double_counter = meter->CreateDoubleCounter(counter_name);
141141

142-
for (uint32_t i = 0; i < 20; ++i)
142+
for (uint32_t i = 0; i < 2000; ++i)
143143
{
144144
double val = (rand() % 700) + 1.1;
145145
double_counter->Add(val);
146-
std::this_thread::sleep_for(std::chrono::milliseconds(500));
146+
std::this_thread::sleep_for(std::chrono::milliseconds(5));
147147
}
148148
}
149149

@@ -155,9 +155,9 @@ void metrics_observable_counter_example(const std::string &name)
155155
provider->GetMeter(name, "1.2.0");
156156
double_observable_counter = meter->CreateDoubleObservableCounter(counter_name);
157157
double_observable_counter->AddCallback(MeasurementFetcher::Fetcher, nullptr);
158-
for (uint32_t i = 0; i < 20; ++i)
158+
for (uint32_t i = 0; i < 2000; ++i)
159159
{
160-
std::this_thread::sleep_for(std::chrono::milliseconds(500));
160+
std::this_thread::sleep_for(std::chrono::milliseconds(5));
161161
}
162162
}
163163

@@ -169,13 +169,13 @@ void metrics_histogram_example(const std::string &name)
169169
provider->GetMeter(name, "1.2.0");
170170
auto histogram_counter = meter->CreateDoubleHistogram(histogram_name, "des", "unit");
171171
auto context = opentelemetry::context::Context{};
172-
for (uint32_t i = 0; i < 20; ++i)
172+
for (uint32_t i = 0; i < 2000; ++i)
173173
{
174174
double val = (rand() % 700) + 1.1;
175175
std::map<std::string, std::string> labels = get_random_attr();
176176
auto labelkv = opentelemetry::common::KeyValueIterableView<decltype(labels)>{labels};
177177
histogram_counter->Record(val, labelkv, context);
178-
std::this_thread::sleep_for(std::chrono::milliseconds(250));
178+
std::this_thread::sleep_for(std::chrono::milliseconds(2));
179179
}
180180
}
181181

@@ -296,8 +296,8 @@ void InitMetrics(const std::string &name)
296296

297297
// Initialize and set the global MeterProvider
298298
opentelemetry::sdk::metrics::PeriodicExportingMetricReaderOptions options;
299-
options.export_interval_millis = std::chrono::milliseconds(1000);
300-
options.export_timeout_millis = std::chrono::milliseconds(500);
299+
options.export_interval_millis = std::chrono::milliseconds(100);
300+
options.export_timeout_millis = std::chrono::milliseconds(50);
301301

302302
auto reader = opentelemetry::sdk::metrics::PeriodicExportingMetricReaderFactory::Create(
303303
std::move(exporter), options);
@@ -382,43 +382,47 @@ struct proxy_thread
382382
std::mutex mu;
383383
std::condition_variable cv;
384384
bool ready{ false };
385-
static inline std::unique_ptr<opentelemetry::exporter::otlp::OtlpGrpcForwardProxy> proxy;
386-
static void entry(proxy_thread* ctx)
385+
std::unique_ptr<opentelemetry::exporter::otlp::OtlpGrpcForwardProxy> proxy;
386+
static void thread_entry(proxy_thread* this_, const std::string& listenAddress, const std::string& sendAddress )
387+
{
388+
this_->entry(listenAddress, sendAddress);
389+
}
390+
void entry(const std::string& listenAddress, const std::string& sendAddress )
387391
{
388392
using namespace opentelemetry::exporter::otlp;
389393

390-
OtlpGrpcClientOptions clientOptions;
391-
clientOptions.endpoint = GetOtlpDefaultGrpcEndpoint();
392-
clientOptions.max_concurrent_requests = 10; //16384;
393-
clientOptions.max_threads = 10;
394+
OtlpGrpcClientOptions clientOptions{};
395+
clientOptions.endpoint = sendAddress; //GetOtlpDefaultGrpcEndpoint();
396+
clientOptions.max_concurrent_requests = 16384;
397+
clientOptions.max_threads = 32;
394398

395-
ctx->proxy = std::make_unique<OtlpGrpcForwardProxy>(clientOptions);
396-
ctx->proxy->SetActive(true);
399+
proxy = std::make_unique<OtlpGrpcForwardProxy>(clientOptions);
400+
proxy->SetActive(true);
397401

398-
ctx->proxy->AddListenAddress("localhost:4317");
402+
proxy->AddListenAddress(listenAddress);
399403
proxy->RegisterMetricExporter(OtlpGrpcForwardProxy::ExportMode::AsyncDropOnFull);
400404
proxy->RegisterTraceExporter(OtlpGrpcForwardProxy::ExportMode::AsyncDropOnFull);
401405
proxy->RegisterLogRecordExporter(OtlpGrpcForwardProxy::ExportMode::AsyncDropOnFull);
402406
printf("Start\n");
403-
ctx->proxy->Start();
407+
proxy->Start();
404408
{
405-
std::unique_lock<std::mutex> lock(ctx->mu);
406-
ctx->ready = true;
407-
ctx->cv.notify_one();
409+
std::unique_lock<std::mutex> lock(mu);
410+
ready = true;
411+
cv.notify_one();
408412
}
409413
printf("Wait\n");
410-
ctx->proxy->Wait();
414+
proxy->Wait();
411415
printf("Done Wait\n");
412416
}
413-
static void start()
417+
proxy_thread() = delete;
418+
explicit proxy_thread(const std::string& listenAddress, const std::string& sendAddress)
414419
{
415-
proxy_thread ctx;
416-
std::thread pt(proxy_thread::entry, &ctx);
420+
std::thread pt(proxy_thread::thread_entry, this, listenAddress, sendAddress);
417421
pt.detach();
418-
std::unique_lock<std::mutex> lock(ctx.mu);
419-
ctx.cv.wait(lock, [&ctx]{ return ctx.ready; });
422+
std::unique_lock<std::mutex> lock( mu );
423+
cv.wait(lock, [this]{ return ready; });
420424
}
421-
static void shutdown()
425+
~proxy_thread()
422426
{
423427
if( proxy )
424428
proxy->Shutdown();
@@ -434,10 +438,24 @@ int main(int argc, const char *argv[])
434438

435439
{
436440
using namespace opentelemetry::sdk::common::internal_log;
441+
GlobalLogHandler::SetLogLevel(LogLevel::None);
437442
GlobalLogHandler::SetLogLevel(LogLevel::Debug);
438443
}
439444

440-
proxy_thread::start();
445+
proxy_thread p0("127.0.0.1:4317", "unix://p/1");
446+
// proxy_thread p1("unix:q:/p/m/opentelemetry-cpp/1.sock", "unix:q:/p/m/opentelemetry-cpp/2.sock");
447+
// proxy_thread p2("unix:q:/p/m/opentelemetry-cpp/2.sock", "unix:q:/p/m/opentelemetry-cpp/3.sock");
448+
// proxy_thread p1("127.0.0.1:43170", "127.0.0.1:43171");
449+
// proxy_thread p2("127.0.0.1:43171", "127.0.0.1:43172");
450+
// proxy_thread p3("127.0.0.1:43172", "127.0.0.1:43173");
451+
// proxy_thread p4("127.0.0.1:43173", "127.0.0.1:43174");
452+
// proxy_thread p5("127.0.0.1:43174", "127.0.0.1:43175");
453+
// proxy_thread p6("127.0.0.1:43175", "127.0.0.1:43176");
454+
// proxy_thread p7("127.0.0.1:43176", "127.0.0.1:43177");
455+
// proxy_thread p8("127.0.0.1:43177", "127.0.0.1:43178");
456+
// proxy_thread p9("127.0.0.1:43178", "127.0.0.1:43179");
457+
// proxy_thread pA("127.0.0.1:43179", opentelemetry::exporter::otlp::GetOtlpDefaultGrpcEndpoint());
458+
//pA.proxy->SetActive( false );
441459

442460
{
443461
using namespace opentelemetry::sdk::common;
@@ -462,15 +480,14 @@ int main(int argc, const char *argv[])
462480
CleanupLogger();
463481
CleanupTracer();
464482

465-
printf("Press Ctrl+C to break\n");
466-
try {
467-
std::this_thread::sleep_for(std::chrono::seconds(500));
468-
}
469-
catch( ... )
470-
{
471-
printf("Caught something?\n");
472-
}
483+
// printf("Press Ctrl+C to break\n");
484+
// try {
485+
// std::this_thread::sleep_for(std::chrono::seconds(500));
486+
// }
487+
// catch( ... )
488+
// {
489+
// printf("Caught something?\n");
490+
// }
473491

474-
proxy_thread::shutdown();
475492
return 0;
476493
}

0 commit comments

Comments
 (0)