Skip to content

Commit 5afc662

Browse files
authored
grpc: improve stream more reliable (#32)
1 parent 754841e commit 5afc662

File tree

10 files changed

+344
-58
lines changed

10 files changed

+344
-58
lines changed

cpp2sky/internal/async_client.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
#include <google/protobuf/message.h>
1818
#include <grpcpp/grpcpp.h>
1919

20+
#include <condition_variable>
2021
#include <memory>
21-
#include <queue>
2222

2323
using google::protobuf::Message;
2424

@@ -62,10 +62,9 @@ class AsyncClient {
6262
virtual std::string peerAddress() = 0;
6363

6464
/**
65-
* Drain pending messages
65+
* Drain pending message.
6666
*/
67-
virtual void drainPendingMessages(
68-
std::queue<RequestType>& pending_messages) = 0;
67+
virtual void drainPendingMessage(RequestType message) = 0;
6968

7069
/**
7170
* Reset stream if it is living.
@@ -109,6 +108,11 @@ class AsyncStream {
109108
*/
110109
virtual void sendMessage(RequestType message) = 0;
111110

111+
/**
112+
* Restore drained message.
113+
*/
114+
virtual void undrainMessage(RequestType message) = 0;
115+
112116
/**
113117
* Handle incoming event related to this stream.
114118
*/
@@ -128,7 +132,7 @@ class AsyncStreamFactory {
128132
*/
129133
virtual AsyncStreamPtr<RequestType> create(
130134
AsyncClient<RequestType, ResponseType>* client,
131-
std::queue<RequestType>& drained_messages) = 0;
135+
std::condition_variable& cv) = 0;
132136
};
133137

134138
template <class RequestType, class ResponseType>

example/sample_client.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,5 @@ int main() {
5757

5858
// 5. Send span data
5959
tracer->sendSegment(std::move(current_segment));
60-
6160
return 0;
6261
}

source/grpc_async_client_impl.cc

Lines changed: 80 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,16 @@
1414

1515
#include "grpc_async_client_impl.h"
1616

17+
#include <chrono>
1718
#include <string_view>
19+
#include <thread>
1820

1921
#include "source/utils/exception.h"
2022
#include "utils/grpc_status.h"
2123

24+
#define DEFAULT_CONNECTION_ACTIVE_RETRY_TIMES 5
25+
#define DEFAULT_CONNECTION_ACTIVE_RETRY_SLEEP_SEC 3
26+
2227
namespace cpp2sky {
2328

2429
namespace {
@@ -51,9 +56,45 @@ GrpcAsyncSegmentReporterClient::GrpcAsyncSegmentReporterClient(
5156
startStream();
5257
}
5358

59+
GrpcAsyncSegmentReporterClient::~GrpcAsyncSegmentReporterClient() {
60+
// If connection is inactive, it dispose all drained messages even if it has
61+
// tons of messages.
62+
uint8_t retry_times = DEFAULT_CONNECTION_ACTIVE_RETRY_TIMES;
63+
while (channel_->GetState(false) !=
64+
grpc_connectivity_state::GRPC_CHANNEL_READY) {
65+
if (retry_times <= 0) {
66+
gpr_log(GPR_INFO,
67+
"All %ld pending messages have disposed because of no active "
68+
"connection",
69+
drained_messages_.size());
70+
resetStream();
71+
return;
72+
}
73+
retry_times--;
74+
std::this_thread::sleep_for(
75+
std::chrono::seconds(DEFAULT_CONNECTION_ACTIVE_RETRY_SLEEP_SEC));
76+
}
77+
78+
// It will wait until there is no drained messages.
79+
// There are no timeout option to handle this, so if you would like to stop
80+
// them, you should send signals like SIGTERM.
81+
// If server stopped with accidental issue, the event loop handle that it
82+
// failed to send message and close stream, then recreate new stream and try
83+
// to do it. This process will continue forever without sending explicit
84+
// signal.
85+
{
86+
std::unique_lock<std::mutex> lck(mux_);
87+
while (!drained_messages_.empty()) {
88+
cv_.wait(lck);
89+
}
90+
}
91+
92+
resetStream();
93+
}
94+
5495
void GrpcAsyncSegmentReporterClient::sendMessage(TracerRequestType message) {
5596
if (!stream_) {
56-
drained_messages_.emplace(message);
97+
drained_messages_.push(message);
5798
gpr_log(GPR_INFO,
5899
"No active stream, inserted message into draining message queue. "
59100
"pending message size: %ld",
@@ -76,42 +117,39 @@ GrpcAsyncSegmentReporterClient::createWriter(grpc::ClientContext* ctx,
76117
void GrpcAsyncSegmentReporterClient::startStream() {
77118
resetStream();
78119

79-
// Try to establish connection.
80-
channel_->GetState(true);
81-
stream_ = factory_.create(this, drained_messages_);
82-
stream_->startStream();
83-
}
120+
stream_ = factory_.create(this, cv_);
84121

85-
void GrpcAsyncSegmentReporterClient::drainPendingMessages(
86-
std::queue<TracerRequestType>& pending_messages) {
87-
const auto pending_messages_size = pending_messages.size();
88-
while (!pending_messages.empty()) {
89-
auto msg = pending_messages.front();
90-
pending_messages.pop();
91-
drained_messages_.emplace(msg);
122+
const auto drained_messages_size = drained_messages_.size();
123+
while (!drained_messages_.empty()) {
124+
auto msg = drained_messages_.front();
125+
drained_messages_.pop();
126+
if (msg.has_value()) {
127+
stream_->undrainMessage(msg.value());
128+
}
92129
}
93-
gpr_log(GPR_INFO, "%ld pending messages drained.", pending_messages_size);
130+
gpr_log(GPR_INFO, "%ld drained messages inserted into pending messages.",
131+
drained_messages_size);
132+
133+
stream_->startStream();
134+
gpr_log(GPR_INFO, "Stream %p had created.", stream_.get());
94135
}
95136

96137
GrpcAsyncSegmentReporterStream::GrpcAsyncSegmentReporterStream(
97138
AsyncClient<TracerRequestType, TracerResponseType>* client,
98-
std::queue<TracerRequestType>& drained_messages)
99-
: client_(client) {
100-
const auto drained_messages_size = drained_messages.size();
101-
while (!drained_messages.empty()) {
102-
auto msg = drained_messages.front();
103-
pending_messages_.emplace(msg);
104-
drained_messages.pop();
105-
}
106-
gpr_log(GPR_INFO, "%ld drained messages inserted into pending messages.",
107-
drained_messages_size);
108-
}
139+
std::condition_variable& cv)
140+
: client_(client), cv_(cv) {}
109141

110142
GrpcAsyncSegmentReporterStream::~GrpcAsyncSegmentReporterStream() {
111-
{
112-
std::unique_lock<std::mutex> lck_(mux_);
113-
cond_.wait(lck_, [this] { return pending_messages_.empty(); });
143+
const auto pending_messages_size = pending_messages_.size();
144+
while (!pending_messages_.empty()) {
145+
auto msg = pending_messages_.front();
146+
pending_messages_.pop();
147+
if (msg.has_value()) {
148+
client_->drainPendingMessage(msg.value());
149+
}
114150
}
151+
gpr_log(GPR_INFO, "%ld pending messages drained.", pending_messages_size);
152+
115153
ctx_.TryCancel();
116154
request_writer_->Finish(&status_, toTag(&finish_));
117155
}
@@ -130,17 +168,19 @@ bool GrpcAsyncSegmentReporterStream::startStream() {
130168
}
131169

132170
void GrpcAsyncSegmentReporterStream::sendMessage(TracerRequestType message) {
133-
pending_messages_.emplace(message);
171+
pending_messages_.push(message);
134172
clearPendingMessages();
135173
}
136174

137175
bool GrpcAsyncSegmentReporterStream::clearPendingMessages() {
138176
if (state_ != Operation::Idle || pending_messages_.empty()) {
139177
return false;
140178
}
141-
auto message = pending_messages_.back();
142-
pending_messages_.pop();
143-
request_writer_->Write(message, toTag(&write_done_));
179+
auto message = pending_messages_.front();
180+
if (!message.has_value()) {
181+
return false;
182+
}
183+
request_writer_->Write(message.value(), toTag(&write_done_));
144184
return true;
145185
}
146186

@@ -152,6 +192,11 @@ bool GrpcAsyncSegmentReporterStream::handleOperation(Operation incoming_op) {
152192
state_ = Operation::Idle;
153193
} else if (state_ == Operation::WriteDone) {
154194
gpr_log(GPR_INFO, "Write finished");
195+
// Enqueue message after sending message finished.
196+
// With this, messages which failed to sent never lost even if connection
197+
// was closed. because pending messages with messages which failed to send
198+
// will drained and resend another stream.
199+
pending_messages_.pop();
155200
state_ = Operation::Idle;
156201
}
157202

@@ -161,16 +206,13 @@ bool GrpcAsyncSegmentReporterStream::handleOperation(Operation incoming_op) {
161206
// to write.
162207
clearPendingMessages();
163208

164-
// Release if lock with condition variable has been acquired.
165-
// It will blocked if stream has notified to be closed.
166209
if (pending_messages_.empty()) {
167-
cond_.notify_all();
210+
cv_.notify_all();
168211
}
169212
return true;
170213
} else if (state_ == Operation::Finished) {
171214
gpr_log(GPR_INFO, "Stream closed with http status: %d",
172215
grpcStatusToGenericHttpStatus(status_.error_code()));
173-
client_->drainPendingMessages(pending_messages_);
174216
if (!status_.ok()) {
175217
gpr_log(GPR_ERROR, "%s", status_.error_message().c_str());
176218
}
@@ -181,12 +223,11 @@ bool GrpcAsyncSegmentReporterStream::handleOperation(Operation incoming_op) {
181223

182224
AsyncStreamPtr<TracerRequestType> GrpcAsyncSegmentReporterStreamFactory::create(
183225
AsyncClient<TracerRequestType, TracerResponseType>* client,
184-
std::queue<TracerRequestType>& drained_messages) {
226+
std::condition_variable& cv) {
185227
if (client == nullptr) {
186228
return nullptr;
187229
}
188-
return std::make_shared<GrpcAsyncSegmentReporterStream>(client,
189-
drained_messages);
230+
return std::make_shared<GrpcAsyncSegmentReporterStream>(client, cv);
190231
}
191232

192233
} // namespace cpp2sky

source/grpc_async_client_impl.h

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424
#include "cpp2sky/internal/async_client.h"
2525
#include "language-agent/Tracing.grpc.pb.h"
2626
#include "language-agent/Tracing.pb.h"
27+
#include "source/utils/circular_buffer.h"
28+
29+
#define DRAIN_BUFFER_SIZE 1024
30+
#define PENDING_MESSAGE_BUFFER_SIZE 1024
2731

2832
namespace cpp2sky {
2933

@@ -54,19 +58,21 @@ class GrpcAsyncSegmentReporterClient final
5458
AsyncStreamFactory<TracerRequestType, TracerResponseType>& factory,
5559
std::shared_ptr<grpc::ChannelCredentials> cred, std::string address,
5660
std::string token);
61+
~GrpcAsyncSegmentReporterClient();
5762

5863
// AsyncClient
5964
void sendMessage(TracerRequestType message) override;
6065
std::string peerAddress() override { return address_; }
6166
std::unique_ptr<grpc::ClientAsyncWriter<TracerRequestType>> createWriter(
6267
grpc::ClientContext* ctx, TracerResponseType* response,
6368
void* tag) override;
64-
void drainPendingMessages(
65-
std::queue<TracerRequestType>& pending_messages) override;
69+
void drainPendingMessage(TracerRequestType pending_message) override {
70+
drained_messages_.push(pending_message);
71+
}
6672
void resetStream() override {
6773
if (stream_) {
74+
gpr_log(GPR_INFO, "Stream %p had destroyed.", stream_.get());
6875
stream_.reset();
69-
stream_ = nullptr;
7076
}
7177
}
7278
void startStream() override;
@@ -80,7 +86,10 @@ class GrpcAsyncSegmentReporterClient final
8086
grpc::CompletionQueue* cq_;
8187
std::shared_ptr<grpc::Channel> channel_;
8288
AsyncStreamPtr<TracerRequestType> stream_;
83-
std::queue<TracerRequestType> drained_messages_;
89+
CircularBuffer<TracerRequestType> drained_messages_{DRAIN_BUFFER_SIZE};
90+
91+
std::mutex mux_;
92+
std::condition_variable cv_;
8493
};
8594

8695
struct TaggedStream {
@@ -96,13 +105,16 @@ class GrpcAsyncSegmentReporterStream final
96105
public:
97106
GrpcAsyncSegmentReporterStream(
98107
AsyncClient<TracerRequestType, TracerResponseType>* client,
99-
std::queue<TracerRequestType>& drained_messages);
108+
std::condition_variable& cv);
100109
~GrpcAsyncSegmentReporterStream() override;
101110

102111
// AsyncStream
103112
bool startStream() override;
104113
void sendMessage(TracerRequestType message) override;
105114
bool handleOperation(Operation incoming_op) override;
115+
void undrainMessage(TracerRequestType message) override {
116+
pending_messages_.push(message);
117+
}
106118

107119
private:
108120
bool clearPendingMessages();
@@ -112,15 +124,15 @@ class GrpcAsyncSegmentReporterStream final
112124
grpc::Status status_;
113125
grpc::ClientContext ctx_;
114126
std::unique_ptr<grpc::ClientAsyncWriter<TracerRequestType>> request_writer_;
115-
std::queue<TracerRequestType> pending_messages_;
127+
CircularBuffer<TracerRequestType> pending_messages_{
128+
PENDING_MESSAGE_BUFFER_SIZE};
116129
Operation state_{Operation::Initialized};
117130

118131
TaggedStream connected_{Operation::Connected, this};
119132
TaggedStream write_done_{Operation::WriteDone, this};
120133
TaggedStream finish_{Operation::Finished, this};
121134

122-
std::mutex mux_;
123-
std::condition_variable cond_;
135+
std::condition_variable& cv_;
124136
};
125137

126138
class GrpcAsyncSegmentReporterStreamFactory final
@@ -129,7 +141,7 @@ class GrpcAsyncSegmentReporterStreamFactory final
129141
// AsyncStreamFactory
130142
AsyncStreamPtr<TracerRequestType> create(
131143
AsyncClient<TracerRequestType, TracerResponseType>* client,
132-
std::queue<TracerRequestType>& drained_messages) override;
144+
std::condition_variable& cv) override;
133145
};
134146

135147
} // namespace cpp2sky

source/utils/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ cc_library(
77
"exception.h",
88
"random_generator.h",
99
"grpc_status.h",
10+
"circular_buffer.h",
1011
],
1112
deps = [
1213
"//cpp2sky:random_interface",

0 commit comments

Comments
 (0)