Skip to content

Commit 147eaab

Browse files
authored
simplify gRPC message pending/drain queue (#72)
1 parent 66ea63b commit 147eaab

File tree

11 files changed

+43
-112
lines changed

11 files changed

+43
-112
lines changed

cpp2sky/internal/async_client.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
#include <memory>
2222

23+
#include "source/utils/circular_buffer.h"
24+
2325
using google::protobuf::Message;
2426

2527
namespace cpp2sky {
@@ -30,14 +32,14 @@ class AsyncClient {
3032
virtual ~AsyncClient() = default;
3133

3234
/**
33-
* Send the specified protobuf message
35+
* Send the specified protobuf message.
3436
*/
3537
virtual void sendMessage(RequestType message) = 0;
3638

3739
/**
38-
* Drain pending message.
40+
* Pending message queue reference.
3941
*/
40-
virtual void drainPendingMessage(RequestType message) = 0;
42+
virtual CircularBuffer<RequestType>& pendingMessages() = 0;
4143

4244
/**
4345
* Start stream if there is no living stream.

docker-compose.dev.yml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,18 @@ services:
3030
- '8600:8600'
3131
- '8600:8600/udp'
3232
oap:
33-
image: apache/skywalking-oap-server:8.4.0-es7
33+
image: apache/skywalking-oap-server:8.5.0-es7
3434
depends_on:
3535
- elasticsearch
3636
links:
3737
- elasticsearch
38-
restart: always
3938
ports:
4039
- 11800:11800
4140
- 12800:12800
4241
environment:
4342
SW_STORAGE: elasticsearch7
4443
SW_STORAGE_ES_CLUSTER_NODES: elasticsearch:9200
44+
restart: always
4545
healthcheck:
4646
test: ["CMD-SHELL", "/skywalking/bin/swctl"]
4747
interval: 30s
@@ -52,12 +52,11 @@ services:
5252
- consul
5353
- elasticsearch
5454
ui:
55-
image: apache/skywalking-ui:8.4.0
55+
image: apache/skywalking-ui:8.5.0
5656
depends_on:
5757
- oap
5858
links:
5959
- oap
60-
restart: always
6160
ports:
6261
- 8080:8080
6362
environment:

source/BUILD

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -36,33 +36,6 @@ cc_library(
3636
visibility = ["//visibility:public"],
3737
)
3838

39-
# TODO(shikugawa): This is a terrible hack to disable unexpected behavior for
40-
# test that has introduced GRPC_TEST.
41-
cc_library(
42-
name = "async_client_impl_test",
43-
hdrs = [
44-
"grpc_async_client_impl.h",
45-
],
46-
srcs = [
47-
"grpc_async_client_impl.cc",
48-
],
49-
deps = [
50-
"@skywalking_data_collect_protocol//language-agent:tracing_protocol_cc_proto",
51-
"@skywalking_data_collect_protocol//language-agent:tracing_protocol_cc_grpc",
52-
"@com_github_grpc_grpc//:grpc++",
53-
"@com_github_gabime_spdlog//:spdlog",
54-
"@com_google_absl//absl/strings:strings",
55-
"//cpp2sky/internal:async_client_interface",
56-
"//cpp2sky/internal:stream_builder_interface",
57-
"//cpp2sky/internal:matcher_interface",
58-
"//cpp2sky:cpp2sky_interface",
59-
"//source/utils:util_lib",
60-
"//source/matchers:suffix_matcher_lib",
61-
],
62-
copts = ["-DGRPC_TEST"],
63-
visibility = ["//visibility:public"],
64-
)
65-
6639
cc_library(
6740
name = "cpp2sky_data_lib",
6841
hdrs =[

source/cds_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ class GrpcAsyncConfigDiscoveryServiceClient final
3737
~GrpcAsyncConfigDiscoveryServiceClient();
3838

3939
void sendMessage(CdsRequest request);
40-
void drainPendingMessage(CdsRequest pending_message) override {}
4140
void startStream() override {}
41+
CircularBuffer<CdsRequest>& pendingMessages() override { assert(false); }
4242
grpc::CompletionQueue& completionQueue() override { return cq_; }
4343
grpc::TemplatedGenericStub<CdsRequest, CdsResponse>& stub() override {
4444
return stub_;

source/dynamic_config.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,4 @@ class DynamicConfig {
4242
std::set<std::string> ignore_fields_;
4343
};
4444

45-
} // namespace cpp2sky
45+
} // namespace cpp2sky

source/grpc_async_client_impl.cc

Lines changed: 14 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -41,37 +41,30 @@ GrpcAsyncSegmentReporterClient::GrpcAsyncSegmentReporterClient(
4141
}
4242

4343
GrpcAsyncSegmentReporterClient::~GrpcAsyncSegmentReporterClient() {
44-
// It will wait until there is no drained messages.
45-
// There are no timeout option to handle this, so if you would like to stop
46-
// them, you should send signals like SIGTERM.
47-
// If server stopped with accidental issue, the event loop handle that it
48-
// failed to send message and close stream, then recreate new stream and try
49-
// to do it. This process will continue forever without sending explicit
50-
// signal.
51-
// TODO(shikugawa): Block to wait drained messages to be clear with createing
52-
// condition variable wrapper.
53-
#ifndef GRPC_TEST
44+
// It will wait until there is no drained messages with 5 second timeout.
5445
if (stream_) {
5546
std::unique_lock<std::mutex> lck(mux_);
56-
while (!drained_messages_.empty()) {
57-
cv_.wait(lck);
47+
while (!pending_messages_.empty()) {
48+
cv_.wait_for(lck, std::chrono::seconds(5));
49+
pending_messages_.clear();
5850
}
5951
}
60-
#endif
6152

6253
resetStream();
6354
}
6455

6556
void GrpcAsyncSegmentReporterClient::sendMessage(TracerRequestType message) {
57+
pending_messages_.push(message);
58+
6659
if (!stream_) {
67-
drained_messages_.push(message);
6860
info(
69-
"[Reporter] No active stream, inserted message into draining message "
61+
"[Reporter] No active stream, inserted message into pending message "
7062
"queue. "
7163
"pending message size: {}",
72-
drained_messages_.size());
64+
pending_messages_.size());
7365
return;
7466
}
67+
7568
stream_->sendMessage(message);
7669
}
7770

@@ -80,19 +73,6 @@ void GrpcAsyncSegmentReporterClient::startStream() {
8073

8174
stream_ = factory_->create(*this, cv_);
8275
info("[Reporter] Stream {} had created.", fmt::ptr(stream_.get()));
83-
84-
const auto drained_messages_size = drained_messages_.size();
85-
86-
while (!drained_messages_.empty()) {
87-
auto msg = drained_messages_.front();
88-
drained_messages_.pop();
89-
if (msg.has_value()) {
90-
stream_->sendMessage(msg.value());
91-
}
92-
}
93-
94-
info("[Reporter] {} drained messages inserted into pending messages.",
95-
drained_messages_size);
9676
}
9777

9878
void GrpcAsyncSegmentReporterClient::resetStream() {
@@ -121,28 +101,15 @@ GrpcAsyncSegmentReporterStream::GrpcAsyncSegmentReporterStream(
121101
request_writer_->StartCall(reinterpret_cast<void*>(&ready_));
122102
}
123103

124-
GrpcAsyncSegmentReporterStream::~GrpcAsyncSegmentReporterStream() {
125-
const auto pending_messages_size = pending_messages_.size();
126-
while (!pending_messages_.empty()) {
127-
auto msg = pending_messages_.front();
128-
pending_messages_.pop();
129-
if (msg.has_value()) {
130-
client_.drainPendingMessage(msg.value());
131-
}
132-
}
133-
info("[Reporter] {} pending messages drained.", pending_messages_size);
134-
}
135-
136104
void GrpcAsyncSegmentReporterStream::sendMessage(TracerRequestType message) {
137-
pending_messages_.push(message);
138105
clearPendingMessage();
139106
}
140107

141108
bool GrpcAsyncSegmentReporterStream::clearPendingMessage() {
142-
if (state_ != StreamState::Idle || pending_messages_.empty()) {
109+
if (state_ != StreamState::Idle || client_.pendingMessages().empty()) {
143110
return false;
144111
}
145-
auto message = pending_messages_.front();
112+
auto message = client_.pendingMessages().front();
146113
if (!message.has_value()) {
147114
return false;
148115
}
@@ -164,21 +131,19 @@ void GrpcAsyncSegmentReporterStream::onIdle() {
164131

165132
// Release pending messages which are inserted when stream is not ready
166133
// to write.
167-
clearPendingMessage();
168-
169-
if (pending_messages_.empty()) {
134+
if (!clearPendingMessage()) {
170135
cv_.notify_all();
171136
}
172137
}
173138

174139
void GrpcAsyncSegmentReporterStream::onWriteDone() {
175140
info("[Reporter] Write finished");
176141

177-
// Enqueue message after sending message finished.
142+
// Dequeue message after sending message finished.
178143
// With this, messages which failed to sent never lost even if connection
179144
// was closed. because pending messages with messages which failed to send
180145
// will drained and resend another stream.
181-
pending_messages_.pop();
146+
client_.pendingMessages().pop();
182147
state_ = StreamState::Idle;
183148

184149
onIdle();

source/grpc_async_client_impl.h

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@
2727
#include "cpp2sky/internal/stream_builder.h"
2828
#include "language-agent/Tracing.grpc.pb.h"
2929
#include "language-agent/Tracing.pb.h"
30-
#include "source/utils/circular_buffer.h"
31-
32-
#define DRAIN_BUFFER_SIZE 1024
33-
#define PENDING_MESSAGE_BUFFER_SIZE 1024
3430

3531
namespace cpp2sky {
3632

33+
namespace {
34+
static constexpr size_t pending_message_buffer_size = 1024;
35+
}
36+
3737
using TracerRequestType = skywalking::v3::SegmentObject;
3838
using TracerResponseType = skywalking::v3::Commands;
3939

@@ -51,8 +51,8 @@ class GrpcAsyncSegmentReporterClient final
5151

5252
// AsyncClient
5353
void sendMessage(TracerRequestType message) override;
54-
void drainPendingMessage(TracerRequestType pending_message) override {
55-
drained_messages_.push(pending_message);
54+
CircularBuffer<TracerRequestType>& pendingMessages() override {
55+
return pending_messages_;
5656
}
5757
void startStream() override;
5858
grpc::TemplatedGenericStub<TracerRequestType, TracerResponseType>& stub()
@@ -61,7 +61,7 @@ class GrpcAsyncSegmentReporterClient final
6161
}
6262
grpc::CompletionQueue& completionQueue() override { return cq_; }
6363

64-
size_t numOfMessages() { return drained_messages_.size(); }
64+
size_t numOfMessages() { return pending_messages_.size(); }
6565

6666
private:
6767
void resetStream();
@@ -72,7 +72,8 @@ class GrpcAsyncSegmentReporterClient final
7272
grpc::CompletionQueue& cq_;
7373
grpc::TemplatedGenericStub<TracerRequestType, TracerResponseType> stub_;
7474
AsyncStreamPtr<TracerRequestType, TracerResponseType> stream_;
75-
CircularBuffer<TracerRequestType> drained_messages_{DRAIN_BUFFER_SIZE};
75+
CircularBuffer<TracerRequestType> pending_messages_{
76+
pending_message_buffer_size};
7677

7778
std::mutex mux_;
7879
std::condition_variable cv_;
@@ -85,7 +86,6 @@ class GrpcAsyncSegmentReporterStream final
8586
GrpcAsyncSegmentReporterStream(
8687
AsyncClient<TracerRequestType, TracerResponseType>& client,
8788
std::condition_variable& cv, const std::string& token);
88-
~GrpcAsyncSegmentReporterStream() override;
8989

9090
// AsyncStream
9191
void sendMessage(TracerRequestType message) override;
@@ -106,8 +106,6 @@ class GrpcAsyncSegmentReporterStream final
106106
std::unique_ptr<
107107
grpc::ClientAsyncReaderWriter<TracerRequestType, TracerResponseType>>
108108
request_writer_;
109-
CircularBuffer<TracerRequestType> pending_messages_{
110-
PENDING_MESSAGE_BUFFER_SIZE};
111109
StreamState state_{StreamState::Initialized};
112110

113111
StreamCallbackTag ready_{StreamState::Ready, this};

source/utils/circular_buffer.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,14 @@ class CircularBuffer {
8686
*/
8787
size_t size() const { return item_count_; }
8888

89+
/**
90+
* Clear buffer
91+
*/
92+
void clear() {
93+
buf_.clear();
94+
item_count_ = 0;
95+
}
96+
8997
// Used for test
9098
size_t frontIdx() { return front_; }
9199
size_t backIdx() { return back_; }

test/BUILD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ cc_test(
4747
],
4848
deps = [
4949
"@com_google_googletest//:gtest_main",
50-
"//source:async_client_impl_test",
50+
"//source:cpp2sky_lib",
5151
":mocks",
5252
],
5353
visibility = ["//visibility:public"],

test/grpc_async_client_test.cc

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,4 @@ TEST_F(GrpcAsyncSegmentReporterClientTest, SendMessageTest) {
5757
client_->sendMessage(fake_message);
5858
}
5959

60-
TEST_F(GrpcAsyncSegmentReporterClientTest, MessageDrainTest) {
61-
std::queue<TracerRequestType> fake_pending_messages;
62-
for (int i = 0; i < 3; ++i) {
63-
fake_pending_messages.emplace(skywalking::v3::SegmentObject());
64-
}
65-
while (fake_pending_messages.size() != 0) {
66-
auto msg = fake_pending_messages.front();
67-
fake_pending_messages.pop();
68-
client_->drainPendingMessage(msg);
69-
}
70-
EXPECT_EQ(fake_pending_messages.size(), 0);
71-
EXPECT_EQ(client_->numOfMessages(), 3);
72-
}
73-
7460
} // namespace cpp2sky

0 commit comments

Comments
 (0)