Skip to content

Commit 064dc34

Browse files
authored
multiplexing gRPC callback and cds events (#85)
1 parent 8a40d1d commit 064dc34

File tree

4 files changed

+70
-26
lines changed

4 files changed

+70
-26
lines changed

source/tracer_impl.cc

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ namespace cpp2sky {
2828
TracerImpl::TracerImpl(TracerConfig& config,
2929
std::shared_ptr<grpc::ChannelCredentials> cred)
3030
: config_(config),
31-
grpc_callback_thread_([this] { this->run(); }),
31+
evloop_thread_([this] { this->run(); }),
3232
segment_factory_(config) {
3333
init(config, cred);
3434
}
@@ -38,7 +38,7 @@ TracerImpl::TracerImpl(
3838
AsyncClientPtr<TracerRequestType, TracerResponseType> reporter_client)
3939
: config_(config),
4040
reporter_client_(std::move(reporter_client)),
41-
grpc_callback_thread_([this] { this->run(); }),
41+
evloop_thread_([this] { this->run(); }),
4242
segment_factory_(config) {
4343
init(config, nullptr);
4444
}
@@ -47,11 +47,7 @@ TracerImpl::~TracerImpl() {
4747
reporter_client_.reset();
4848
cds_client_.reset();
4949
cq_.Shutdown();
50-
grpc_callback_thread_.join();
51-
52-
if (cds_thread_.joinable()) {
53-
cds_thread_.join();
54-
}
50+
evloop_thread_.join();
5551
}
5652

5753
TracingContextPtr TracerImpl::newContext() { return segment_factory_.create(); }
@@ -80,23 +76,28 @@ void TracerImpl::run() {
8076
void* got_tag;
8177
bool ok = false;
8278
while (true) {
83-
grpc::CompletionQueue::NextStatus status =
84-
cq_.AsyncNext(&got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME));
85-
if (status == grpc::CompletionQueue::SHUTDOWN) {
86-
return;
79+
// TODO(shikugawa): cleanup evloop handler.
80+
if (cds_timer_ != nullptr && cds_timer_->check()) {
81+
cdsRequest();
82+
}
83+
84+
grpc::CompletionQueue::NextStatus status = cq_.AsyncNext(
85+
&got_tag, &ok, gpr_time_from_nanos(0, GPR_CLOCK_REALTIME));
86+
switch (status) {
87+
case grpc::CompletionQueue::TIMEOUT:
88+
continue;
89+
case grpc::CompletionQueue::SHUTDOWN:
90+
return;
8791
}
8892
static_cast<StreamCallbackTag*>(got_tag)->callback(!ok);
8993
}
9094
}
9195

92-
void TracerImpl::startCds(std::chrono::seconds seconds) {
93-
while (true) {
94-
skywalking::v3::ConfigurationSyncRequest request;
95-
request.set_service(config_.tracerConfig().service_name());
96-
request.set_uuid(config_.uuid());
97-
cds_client_->sendMessage(request);
98-
std::this_thread::sleep_for(seconds);
99-
}
96+
void TracerImpl::cdsRequest() {
97+
skywalking::v3::ConfigurationSyncRequest request;
98+
request.set_service(config_.tracerConfig().service_name());
99+
request.set_uuid(config_.uuid());
100+
cds_client_->sendMessage(request);
100101
}
101102

102103
void TracerImpl::init(TracerConfig& config,
@@ -124,10 +125,8 @@ void TracerImpl::init(TracerConfig& config,
124125
config.address(), cq_,
125126
std::make_unique<GrpcAsyncConfigDiscoveryServiceStreamBuilder>(config_),
126127
cred);
127-
cds_thread_ = std::thread([this] {
128-
this->startCds(
129-
std::chrono::seconds(config_.tracerConfig().cds_request_interval()));
130-
});
128+
cds_timer_ =
129+
std::make_unique<Timer>(config_.tracerConfig().cds_request_interval());
131130
}
132131
}
133132

source/tracer_impl.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "source/cds_impl.h"
2424
#include "source/grpc_async_client_impl.h"
2525
#include "source/tracing_context_impl.h"
26+
#include "source/utils/timer.h"
2627

2728
namespace cpp2sky {
2829

@@ -50,14 +51,14 @@ class TracerImpl : public Tracer {
5051
void init(TracerConfig& config,
5152
std::shared_ptr<grpc::ChannelCredentials> cred);
5253
void run();
53-
void startCds(std::chrono::seconds seconds);
54+
void cdsRequest();
5455

56+
std::unique_ptr<Timer> cds_timer_;
5557
DynamicConfig config_;
5658
AsyncClientPtr<TracerRequestType, TracerResponseType> reporter_client_;
5759
AsyncClientPtr<CdsRequest, CdsResponse> cds_client_;
5860
grpc::CompletionQueue cq_;
59-
std::thread grpc_callback_thread_;
60-
std::thread cds_thread_;
61+
std::thread evloop_thread_;
6162
TracingContextFactory segment_factory_;
6263
std::list<MatcherPtr> op_name_matchers_;
6364
};

source/utils/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ cc_library(
66
"base64.h",
77
"random_generator.h",
88
"circular_buffer.h",
9+
"timer.h",
910
],
1011
srcs = [
1112
"random_generator.cc",

source/utils/timer.h

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright 2021 SkyAPM
2+
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
7+
// http://apache.org/licenses/LICENSE-2.0
8+
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include <chrono>
18+
19+
#include "cpp2sky/time.h"
20+
21+
namespace cpp2sky {
22+
class Timer {
23+
public:
24+
Timer(int64_t interval_sec) {
25+
interval_ = interval_sec * 1000;
26+
prev_time_ = TimePoint<SteadyTime>().fetch();
27+
}
28+
29+
bool check() {
30+
const auto current_time = TimePoint<SteadyTime>().fetch();
31+
bool result = current_time - prev_time_ > interval_;
32+
if (result) {
33+
prev_time_ = current_time;
34+
}
35+
36+
return result;
37+
}
38+
39+
private:
40+
int64_t prev_time_;
41+
int64_t interval_;
42+
};
43+
} // namespace cpp2sky

0 commit comments

Comments
 (0)