Skip to content

Commit 96ee5e7

Browse files
authored
gRPC interface and segment reporter refactor (#60)
1 parent 57f5ea5 commit 96ee5e7

File tree

12 files changed

+312
-259
lines changed

12 files changed

+312
-259
lines changed

cpp2sky/internal/BUILD

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,9 @@ cc_library(
2222
],
2323
visibility = ["//visibility:public"],
2424
)
25+
26+
cc_library(
27+
name = "stream_builder_interface",
28+
hdrs = ["stream_builder.h"],
29+
visibility = ["//visibility:public"],
30+
)

cpp2sky/internal/async_client.h

Lines changed: 61 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -15,31 +15,15 @@
1515
#pragma once
1616

1717
#include <google/protobuf/message.h>
18+
#include <grpcpp/generic/generic_stub.h>
1819
#include <grpcpp/grpcpp.h>
1920

20-
#include <condition_variable>
2121
#include <memory>
2222

2323
using google::protobuf::Message;
2424

2525
namespace cpp2sky {
2626

27-
template <class RequestType, class ResponseType>
28-
class TracerStub {
29-
public:
30-
virtual ~TracerStub() = default;
31-
32-
/**
33-
* Initialize request writer.
34-
*/
35-
virtual std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> createWriter(
36-
grpc::ClientContext* ctx, ResponseType* response,
37-
grpc::CompletionQueue* cq, void* tag) = 0;
38-
};
39-
40-
template <class RequestType, class ResponseType>
41-
using TracerStubPtr = std::unique_ptr<TracerStub<RequestType, ResponseType>>;
42-
4327
template <class RequestType, class ResponseType>
4428
class AsyncClient {
4529
public:
@@ -51,91 +35,105 @@ class AsyncClient {
5135
virtual void sendMessage(RequestType message) = 0;
5236

5337
/**
54-
* Get writer.
38+
* Drain pending message.
5539
*/
56-
virtual std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> createWriter(
57-
grpc::ClientContext* ctx, ResponseType* response, void* tag) = 0;
40+
virtual void drainPendingMessage(RequestType message) = 0;
5841

5942
/**
60-
* Peer address of current gRPC client..
43+
* Start stream if there is no living stream.
6144
*/
62-
virtual std::string peerAddress() = 0;
45+
virtual void startStream() = 0;
6346

6447
/**
65-
* Drain pending message.
48+
* Completion queue.
6649
*/
67-
virtual void drainPendingMessage(RequestType message) = 0;
50+
virtual grpc::CompletionQueue& completionQueue() = 0;
6851

6952
/**
70-
* Reset stream if it is living.
53+
* gRPC Stub
7154
*/
72-
virtual void resetStream() = 0;
55+
virtual grpc::TemplatedGenericStub<RequestType, ResponseType>& stub() = 0;
56+
};
7357

74-
/**
75-
* Start stream if there is no living stream.
76-
*/
77-
virtual void startStream() = 0;
58+
template <class RequestType, class ResponseType>
59+
using AsyncClientPtr = std::unique_ptr<AsyncClient<RequestType, ResponseType>>;
60+
61+
template <class RequestType, class ResponseType>
62+
class AsyncStream {
63+
public:
64+
virtual ~AsyncStream() = default;
7865

7966
/**
80-
* The number of drained pending messages.
67+
* Send message. It will move the state from Init to Write.
8168
*/
82-
virtual size_t numOfMessages() = 0;
69+
virtual void sendMessage(RequestType message) = 0;
8370
};
8471

85-
enum class Operation : uint8_t {
72+
enum class StreamState : uint8_t {
8673
Initialized = 0,
87-
Connected = 1,
74+
Ready = 1,
8875
Idle = 2,
8976
WriteDone = 3,
77+
ReadDone = 4,
9078
};
9179

92-
template <class RequestType, class ResponseType>
93-
using AsyncClientPtr = std::unique_ptr<AsyncClient<RequestType, ResponseType>>;
94-
95-
template <class RequestType>
96-
class AsyncStream {
80+
class AsyncStreamCallback {
9781
public:
98-
virtual ~AsyncStream() = default;
82+
/**
83+
* Callback when stream ready event occured.
84+
*/
85+
virtual void onReady() = 0;
9986

10087
/**
101-
* Start stream. It will move the state of stream to Init.
88+
* Callback when idle event occured.
10289
*/
103-
virtual bool startStream() = 0;
90+
virtual void onIdle() = 0;
10491

10592
/**
106-
* Send message. It will move the state from Init to Write.
93+
* Callback when write done event occured.
10794
*/
108-
virtual void sendMessage(RequestType message) = 0;
95+
virtual void onWriteDone() = 0;
10996

11097
/**
111-
* Restore drained message.
98+
* Callback when read done event occured.
11299
*/
113-
virtual void undrainMessage(RequestType message) = 0;
100+
virtual void onReadDone() = 0;
114101

115102
/**
116-
* Handle incoming event related to this stream.
103+
* Callback when stream had finished with arbitrary error.
117104
*/
118-
virtual void handleOperation(Operation incoming_op) = 0;
105+
virtual void onStreamFinish() = 0;
119106
};
120107

121-
template <class RequestType>
122-
using AsyncStreamPtr = std::shared_ptr<AsyncStream<RequestType>>;
123-
124-
template <class RequestType, class ResponseType>
125-
class AsyncStreamFactory {
108+
struct StreamCallbackTag {
126109
public:
127-
virtual ~AsyncStreamFactory() = default;
128-
129-
/**
130-
* Create async stream entity
131-
*/
132-
virtual AsyncStreamPtr<RequestType> create(
133-
AsyncClient<RequestType, ResponseType>* client,
134-
std::condition_variable& cv) = 0;
110+
void callback(bool stream_finished) {
111+
if (stream_finished) {
112+
callback_->onStreamFinish();
113+
return;
114+
}
115+
116+
switch (state_) {
117+
case StreamState::Ready:
118+
callback_->onReady();
119+
break;
120+
case StreamState::WriteDone:
121+
callback_->onWriteDone();
122+
break;
123+
case StreamState::Idle:
124+
callback_->onIdle();
125+
break;
126+
case StreamState::ReadDone:
127+
callback_->onReadDone();
128+
break;
129+
}
130+
}
131+
132+
StreamState state_;
133+
AsyncStreamCallback* callback_;
135134
};
136135

137136
template <class RequestType, class ResponseType>
138-
using AsyncStreamFactoryPtr =
139-
std::unique_ptr<AsyncStreamFactory<RequestType, ResponseType>>;
137+
using AsyncStreamPtr = std::shared_ptr<AsyncStream<RequestType, ResponseType>>;
140138

141139
} // namespace cpp2sky

cpp2sky/internal/stream_builder.h

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright 2021 SkyAPM
2+
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include <grpcpp/grpcpp.h>
18+
19+
#include <condition_variable>
20+
21+
namespace cpp2sky {
22+
23+
template <class RequestType, class ResponseType>
24+
class ClientStreamingStreamBuilder {
25+
public:
26+
virtual ~ClientStreamingStreamBuilder() = default;
27+
28+
/**
29+
* Create async stream entity
30+
*/
31+
virtual AsyncStreamPtr<RequestType, ResponseType> create(
32+
AsyncClient<RequestType, ResponseType>& client,
33+
std::condition_variable& cv) = 0;
34+
};
35+
36+
template <class RequestType, class ResponseType>
37+
using ClientStreamingStreamBuilderPtr =
38+
std::unique_ptr<ClientStreamingStreamBuilder<RequestType, ResponseType>>;
39+
40+
template <class RequestType, class ResponseType>
41+
class UnaryStreamBuilder {
42+
public:
43+
virtual ~UnaryStreamBuilder() = default;
44+
45+
/**
46+
* Create async stream entity
47+
*/
48+
virtual AsyncStreamPtr<RequestType, ResponseType> create(
49+
AsyncClient<RequestType, ResponseType>& client, RequestType request) = 0;
50+
};
51+
52+
template <class RequestType, class ResponseType>
53+
using UnaryStreamBuilderPtr =
54+
std::unique_ptr<UnaryStreamBuilder<RequestType, ResponseType>>;
55+
56+
} // namespace cpp2sky

example/sample.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ TracerConfig config;
2727

2828
void init() {
2929
config.set_instance_name("node_0");
30-
config.set_service_name("");
30+
config.set_service_name("mesh");
3131
config.set_address("0.0.0.0:11800");
3232
}
3333

@@ -42,6 +42,7 @@ int main() {
4242
std::string context = req.get_header_value(kPropagationHeader.data());
4343

4444
TracingContextPtr tracing_context;
45+
4546
if (!context.empty()) {
4647
// 2. Create tracing context with propagated information.
4748
tracing_context = tracer->newContext(createSpanContext(context));

source/BUILD

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,36 @@ cc_library(
1919
"@skywalking_data_collect_protocol//language-agent:tracing_protocol_cc_grpc",
2020
"@com_github_grpc_grpc//:grpc++",
2121
"//cpp2sky/internal:async_client_interface",
22+
"//cpp2sky/internal:stream_builder_interface",
2223
"//cpp2sky:cpp2sky_interface",
2324
"//source/utils:util_lib",
2425
],
2526
visibility = ["//visibility:public"],
2627
)
2728

29+
# TODO(shikugawa): This is a terrible hack to disable unexpected behavior for
30+
# test that has introduced GRPC_TEST.
31+
cc_library(
32+
name = "async_client_impl_test",
33+
hdrs = [
34+
"grpc_async_client_impl.h",
35+
],
36+
srcs = [
37+
"grpc_async_client_impl.cc",
38+
],
39+
deps = [
40+
"@skywalking_data_collect_protocol//language-agent:tracing_protocol_cc_proto",
41+
"@skywalking_data_collect_protocol//language-agent:tracing_protocol_cc_grpc",
42+
"@com_github_grpc_grpc//:grpc++",
43+
"//cpp2sky/internal:async_client_interface",
44+
"//cpp2sky/internal:stream_builder_interface",
45+
"//cpp2sky:cpp2sky_interface",
46+
"//source/utils:util_lib",
47+
],
48+
copts = ["-DGRPC_TEST"],
49+
visibility = ["//visibility:public"],
50+
)
51+
2852
cc_library(
2953
name = "cpp2sky_data_lib",
3054
hdrs =[
@@ -42,4 +66,4 @@ cc_library(
4266
"//source/utils:util_lib",
4367
],
4468
visibility = ["//visibility:public"],
45-
)
69+
)

0 commit comments

Comments
 (0)