Skip to content

Commit 48c3fbb

Browse files
authored
SkyWalking CDS implementation (#61)
* update skywalking protocol v8.4.0 * update skywalking protocol v8.4.0 * wip * update mock collector * wip * wip * update skywalking-python * wip * wip * refactor * refactor * refactor * implement skywalking cds * fix agent test tool * tes * test * update * wip * wip * fix * unit test * README and set interval * fix * spdlog fix * update loglevel * fix * fix test
1 parent 96ee5e7 commit 48c3fbb

26 files changed

+685
-78
lines changed

README.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ After you constructed config, then setup tracer. Tracer supports gRPC reporter o
6464
TLS adoption and REST tracer will be supported in the future.
6565

6666
```cpp
67-
TracerConfig tracer_config(oap_addr, token);
67+
TracerConfig tracer_config;
68+
69+
// Setup
70+
6871
TracerPtr tracer = createInsecureGrpcTracer(tracer_config);
6972
```
7073

@@ -122,6 +125,23 @@ tracing_span->endSpan();
122125
tracer->report(std::move(tracing_context));
123126
```
124127
128+
#### Skywalking CDS
129+
130+
C++ agent implements Skywalking CDS feature it allows to change bootstrap config dynamically from the response of sync request, invoked from this periodically.
131+
132+
```cpp
133+
TracerConfig config;
134+
// If you set this value as zero, CDS request won't occur.
135+
config.set_cds_request_interval(5); // CDS request interval should be 5sec
136+
```
137+
138+
Currently, Configurable values dynamically are like below.
139+
140+
| Config Key | Value Description |
141+
|:----:|:----:|
142+
|instance_name| Instance name of this agent |
143+
144+
125145
## Security
126146

127147
If you've found any security issues, please read [Security Reporting Process](https://github.com/SkyAPM/cpp2sky/blob/main/SECURITY.md) and take described steps.

bazel/fmtlib.BUILD

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
load("@rules_cc//cc:defs.bzl", "cc_library")
2+
3+
licenses(["notice"]) # Apache 2
4+
5+
cc_library(
6+
name = "fmtlib",
7+
hdrs = glob([
8+
"include/fmt/*.h",
9+
]),
10+
defines = ["FMT_HEADER_ONLY"],
11+
includes = ["include"],
12+
visibility = ["//visibility:public"],
13+
)

bazel/repositories.bzl

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,17 @@ def cpp2sky_dependencies():
77
com_google_googletest()
88
com_google_protobuf()
99
com_github_httplib()
10+
com_github_fmtlib_fmt()
11+
com_github_gabime_spdlog()
1012

1113
def skywalking_data_collect_protocol():
1214
http_archive(
1315
name = "skywalking_data_collect_protocol",
14-
sha256 = "edfa970394511213eacc8055b4c13e4e9773e9196122a49e0db68f6162f67dff",
15-
urls = ["https://github.com/apache/skywalking-data-collect-protocol/archive/v8.4.0.tar.gz"],
16-
strip_prefix = "skywalking-data-collect-protocol-8.4.0",
16+
sha256 = "015d152be6efc653cf0f9f9c2e7edff1b38b826273f03fffedd307026ef56e28",
17+
urls = [
18+
"https://github.com/Shikugawa/skywalking-data-collect-protocol/archive/fbce318842c40c81cf909d50a7f6dd1556d2e4b2.tar.gz"
19+
],
20+
strip_prefix = "skywalking-data-collect-protocol-fbce318842c40c81cf909d50a7f6dd1556d2e4b2",
1721
)
1822

1923
def com_github_grpc_grpc():
@@ -59,3 +63,21 @@ def com_github_httplib():
5963
build_file = "//bazel:httplib.BUILD",
6064
urls = ["https://github.com/yhirose/cpp-httplib/archive/v0.7.15.tar.gz"]
6165
)
66+
67+
def com_github_fmtlib_fmt():
68+
http_archive(
69+
name = "com_github_fmtlib_fmt",
70+
sha256 = "decfdf9ad274070fa85f26407b816f5a4d82205ae86bac1990be658d0795ea4d",
71+
strip_prefix = "fmt-7.0.3",
72+
build_file = "//bazel:fmtlib.BUILD",
73+
urls = ["https://github.com/fmtlib/fmt/releases/download/7.0.3/fmt-7.0.3.zip"],
74+
)
75+
76+
def com_github_gabime_spdlog():
77+
http_archive(
78+
name = "com_github_gabime_spdlog",
79+
sha256 = "f0114a4d3c88be9e696762f37a7c379619443ce9d668546c61b21d41affe5b62",
80+
strip_prefix = "spdlog-1.7.0",
81+
build_file = "//bazel:spdlog.BUILD",
82+
urls = ["https://github.com/gabime/spdlog/archive/v1.7.0.tar.gz"]
83+
)

bazel/spdlog.BUILD

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
load("@rules_cc//cc:defs.bzl", "cc_library")
2+
3+
licenses(["notice"]) # Apache 2
4+
5+
cc_library(
6+
name = "spdlog",
7+
hdrs = glob([
8+
"include/**/*.h",
9+
]),
10+
defines = ["SPDLOG_FMT_EXTERNAL"],
11+
includes = ["include"],
12+
visibility = ["//visibility:public"],
13+
deps = ["@com_github_fmtlib_fmt//:fmtlib"],
14+
)

cpp2sky/config.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,7 @@ message TracerConfig {
2525

2626
// The size of buffer it stores pending messages.
2727
uint32 delayed_buffer_size = 6;
28+
29+
// CDS sync request interval. If this value is zero, CDS feature will be disabled.
30+
uint32 cds_request_interval = 7;
2831
}

cpp2sky/tracer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,6 @@ class Tracer {
4040

4141
using TracerPtr = std::unique_ptr<Tracer>;
4242

43-
TracerPtr createInsecureGrpcTracer(const TracerConfig& cfg);
43+
TracerPtr createInsecureGrpcTracer(TracerConfig& cfg);
4444

4545
} // namespace cpp2sky

docker-compose.dev.yml

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,15 @@ services:
2020
memlock:
2121
soft: -1
2222
hard: -1
23+
consul:
24+
image: docker.io/bitnami/consul:1-debian-10
25+
ports:
26+
- '8300:8300'
27+
- '8301:8301'
28+
- '8301:8301/udp'
29+
- '8500:8500'
30+
- '8600:8600'
31+
- '8600:8600/udp'
2332
oap:
2433
image: apache/skywalking-oap-server:8.4.0-es7
2534
depends_on:
@@ -33,15 +42,15 @@ services:
3342
environment:
3443
SW_STORAGE: elasticsearch7
3544
SW_STORAGE_ES_CLUSTER_NODES: elasticsearch:9200
36-
expose:
37-
- "11800"
38-
- "12800"
3945
healthcheck:
4046
test: ["CMD-SHELL", "/skywalking/bin/swctl"]
4147
interval: 30s
4248
timeout: 10s
4349
retries: 3
4450
start_period: 40s
51+
depends_on:
52+
- consul
53+
- elasticsearch
4554
ui:
4655
image: apache/skywalking-ui:8.4.0
4756
depends_on:
@@ -53,3 +62,5 @@ services:
5362
- 8080:8080
5463
environment:
5564
SW_OAP_ADDRESS: oap:12800
65+
depends_on:
66+
- oap

example/sample.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ void init() {
2929
config.set_instance_name("node_0");
3030
config.set_service_name("mesh");
3131
config.set_address("0.0.0.0:11800");
32+
config.set_cds_request_interval(5);
3233
}
3334

3435
int main() {

source/BUILD

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,24 @@ cc_library(
77
"tracing_context_impl.h",
88
"grpc_async_client_impl.h",
99
"tracer_impl.h",
10+
"cds_impl.h",
11+
"dynamic_config.h"
1012
],
1113
srcs = [
1214
"propagation_impl.cc",
1315
"tracing_context_impl.cc",
1416
"grpc_async_client_impl.cc",
1517
"tracer_impl.cc",
18+
"cds_impl.cc",
19+
"dynamic_config.cc"
1620
],
1721
deps = [
1822
"@skywalking_data_collect_protocol//language-agent:tracing_protocol_cc_proto",
1923
"@skywalking_data_collect_protocol//language-agent:tracing_protocol_cc_grpc",
24+
"@skywalking_data_collect_protocol//language-agent:configuration_discovery_service_cc_proto",
25+
"@skywalking_data_collect_protocol//language-agent:configuration_discovery_service_cc_grpc",
2026
"@com_github_grpc_grpc//:grpc++",
27+
"@com_github_gabime_spdlog//:spdlog",
2128
"//cpp2sky/internal:async_client_interface",
2229
"//cpp2sky/internal:stream_builder_interface",
2330
"//cpp2sky:cpp2sky_interface",
@@ -40,6 +47,7 @@ cc_library(
4047
"@skywalking_data_collect_protocol//language-agent:tracing_protocol_cc_proto",
4148
"@skywalking_data_collect_protocol//language-agent:tracing_protocol_cc_grpc",
4249
"@com_github_grpc_grpc//:grpc++",
50+
"@com_github_gabime_spdlog//:spdlog",
4351
"//cpp2sky/internal:async_client_interface",
4452
"//cpp2sky/internal:stream_builder_interface",
4553
"//cpp2sky:cpp2sky_interface",
@@ -54,12 +62,14 @@ cc_library(
5462
hdrs =[
5563
"propagation_impl.h",
5664
"tracing_context_impl.h",
65+
"dynamic_config.h",
5766
],
5867
srcs = [
5968
"propagation_impl.cc",
6069
"tracing_context_impl.cc",
6170
],
6271
deps = [
72+
"@skywalking_data_collect_protocol//language-agent:configuration_discovery_service_cc_proto",
6373
"@skywalking_data_collect_protocol//language-agent:tracing_protocol_cc_proto",
6474
"//cpp2sky:config_cc_proto",
6575
"//cpp2sky:cpp2sky_interface",

source/cds_impl.cc

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright 2021 SkyAPM
2+
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "cds_impl.h"
16+
17+
#include <condition_variable>
18+
19+
#include "spdlog/spdlog.h"
20+
21+
namespace cpp2sky {
22+
23+
using namespace spdlog;
24+
25+
GrpcAsyncConfigDiscoveryServiceClient::GrpcAsyncConfigDiscoveryServiceClient(
26+
const std::string& address, grpc::CompletionQueue& cq,
27+
UnaryStreamBuilderPtr<CdsRequest, CdsResponse> builder,
28+
std::shared_ptr<grpc::ChannelCredentials> cred)
29+
: builder_(std::move(builder)),
30+
cq_(cq),
31+
stub_(grpc::CreateChannel(address, cred)) {}
32+
33+
GrpcAsyncConfigDiscoveryServiceClient::
34+
~GrpcAsyncConfigDiscoveryServiceClient() {
35+
resetStream();
36+
}
37+
38+
void GrpcAsyncConfigDiscoveryServiceClient::sendMessage(CdsRequest request) {
39+
resetStream();
40+
stream_ = builder_->create(*this, request);
41+
info("[CDS] Stream {} had created", fmt::ptr(stream_.get()));
42+
}
43+
44+
void GrpcAsyncConfigDiscoveryServiceClient::resetStream() {
45+
if (stream_) {
46+
info("[CDS] Stream {} has destroyed", fmt::ptr(this));
47+
stream_.reset();
48+
}
49+
}
50+
51+
GrpcAsyncConfigDiscoveryServiceStream::GrpcAsyncConfigDiscoveryServiceStream(
52+
AsyncClient<CdsRequest, CdsResponse>& parent, CdsRequest request,
53+
DynamicConfig& config)
54+
: client_(parent), config_(config) {
55+
sendMessage(request);
56+
}
57+
58+
void GrpcAsyncConfigDiscoveryServiceStream::sendMessage(CdsRequest request) {
59+
response_reader_ = client_.stub().PrepareUnaryCall(
60+
&ctx_, "/skywalking.v3.ConfigurationDiscoveryService/fetchConfigurations",
61+
request, &client_.completionQueue());
62+
response_reader_->StartCall();
63+
response_reader_->Finish(&commands_, &status_,
64+
reinterpret_cast<void*>(&read_done_));
65+
}
66+
67+
void GrpcAsyncConfigDiscoveryServiceStream::onReadDone() {
68+
info("[CDS] Stream {} read finished with gRPC status {}", fmt::ptr(this),
69+
static_cast<int>(status_.error_code()));
70+
71+
if (status_.ok()) {
72+
config_.onConfigChange(commands_);
73+
}
74+
75+
// Stream which finished to read done won't be destroyed here.
76+
// But it will be destroyed when new stream created.
77+
}
78+
79+
AsyncStreamPtr<CdsRequest, CdsResponse>
80+
GrpcAsyncConfigDiscoveryServiceStreamBuilder::create(
81+
AsyncClient<CdsRequest, CdsResponse>& client, CdsRequest request) {
82+
return std::make_shared<GrpcAsyncConfigDiscoveryServiceStream>(
83+
client, request, config_);
84+
}
85+
86+
} // namespace cpp2sky

0 commit comments

Comments
 (0)