Skip to content

Commit 2270c9b

Browse files
Adding open telemetry support. (#757)
* Adding open telemetry support
1 parent bcca6ea commit 2270c9b

File tree

3 files changed

+61
-75
lines changed

3 files changed

+61
-75
lines changed

build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ dependencies {
7676
implementation 'io.grpc:grpc-protobuf:1.28.0'
7777
implementation 'io.grpc:grpc-stub:1.28.0'
7878

79+
compile(platform("io.opentelemetry:opentelemetry-bom:1.19.0"))
80+
compile("io.opentelemetry:opentelemetry-api")
81+
7982
testCompile group: 'junit', name: 'junit', version: '4.12'
8083
testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
8184
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'

src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@
4343
import io.grpc.Metadata;
4444
import io.grpc.MethodDescriptor;
4545
import io.grpc.stub.MetadataUtils;
46+
import io.opentelemetry.api.GlobalOpenTelemetry;
47+
import io.opentelemetry.context.Context;
48+
import io.opentelemetry.context.propagation.TextMapPropagator;
49+
import io.opentelemetry.context.propagation.TextMapSetter;
4650
import java.util.concurrent.TimeUnit;
4751
import java.util.concurrent.atomic.AtomicBoolean;
4852
import org.slf4j.Logger;
@@ -103,7 +107,10 @@ final class GrpcServiceStubs implements IGrpcServiceStubs {
103107
headers.put(RPC_ENCODING_HEADER_KEY, "proto");
104108
Channel interceptedChannel =
105109
ClientInterceptors.intercept(
106-
channel, deadlineInterceptor, MetadataUtils.newAttachHeadersInterceptor(headers));
110+
channel,
111+
deadlineInterceptor,
112+
MetadataUtils.newAttachHeadersInterceptor(headers),
113+
newOpenTelemetryInterceptor());
107114
if (log.isTraceEnabled()) {
108115
interceptedChannel = ClientInterceptors.intercept(interceptedChannel, tracingInterceptor);
109116
}
@@ -119,6 +126,36 @@ final class GrpcServiceStubs implements IGrpcServiceStubs {
119126
this.metaFutureStub = MetaAPIGrpc.newFutureStub(interceptedChannel);
120127
}
121128

129+
private ClientInterceptor newOpenTelemetryInterceptor() {
130+
return new ClientInterceptor() {
131+
@Override
132+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
133+
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
134+
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
135+
next.newCall(method, callOptions)) {
136+
137+
@Override
138+
public void start(Listener<RespT> responseListener, Metadata headers) {
139+
TextMapPropagator propagator =
140+
GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
141+
142+
final TextMapSetter<Metadata> setter =
143+
(carrier, key, value) -> {
144+
if (carrier != null) {
145+
carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
146+
}
147+
};
148+
if (propagator != null) {
149+
propagator.inject(Context.current(), headers, setter);
150+
}
151+
152+
super.start(responseListener, headers);
153+
}
154+
};
155+
}
156+
};
157+
}
158+
122159
private ClientInterceptor newTracingInterceptor() {
123160
return new ClientInterceptor() {
124161

src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java

Lines changed: 20 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -23,79 +23,7 @@
2323
import com.google.common.collect.ImmutableMap;
2424
import com.google.gson.Gson;
2525
import com.google.gson.GsonBuilder;
26-
import com.uber.cadence.BadRequestError;
27-
import com.uber.cadence.ClientVersionNotSupportedError;
28-
import com.uber.cadence.ClusterInfo;
29-
import com.uber.cadence.CountWorkflowExecutionsRequest;
30-
import com.uber.cadence.CountWorkflowExecutionsResponse;
31-
import com.uber.cadence.DeprecateDomainRequest;
32-
import com.uber.cadence.DescribeDomainRequest;
33-
import com.uber.cadence.DescribeDomainResponse;
34-
import com.uber.cadence.DescribeTaskListRequest;
35-
import com.uber.cadence.DescribeTaskListResponse;
36-
import com.uber.cadence.DescribeWorkflowExecutionRequest;
37-
import com.uber.cadence.DescribeWorkflowExecutionResponse;
38-
import com.uber.cadence.DomainAlreadyExistsError;
39-
import com.uber.cadence.DomainNotActiveError;
40-
import com.uber.cadence.EntityNotExistsError;
41-
import com.uber.cadence.GetSearchAttributesResponse;
42-
import com.uber.cadence.GetTaskListsByDomainRequest;
43-
import com.uber.cadence.GetTaskListsByDomainResponse;
44-
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
45-
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
46-
import com.uber.cadence.History;
47-
import com.uber.cadence.InternalServiceError;
48-
import com.uber.cadence.LimitExceededError;
49-
import com.uber.cadence.ListArchivedWorkflowExecutionsRequest;
50-
import com.uber.cadence.ListArchivedWorkflowExecutionsResponse;
51-
import com.uber.cadence.ListClosedWorkflowExecutionsRequest;
52-
import com.uber.cadence.ListClosedWorkflowExecutionsResponse;
53-
import com.uber.cadence.ListDomainsRequest;
54-
import com.uber.cadence.ListDomainsResponse;
55-
import com.uber.cadence.ListOpenWorkflowExecutionsRequest;
56-
import com.uber.cadence.ListOpenWorkflowExecutionsResponse;
57-
import com.uber.cadence.ListTaskListPartitionsRequest;
58-
import com.uber.cadence.ListTaskListPartitionsResponse;
59-
import com.uber.cadence.ListWorkflowExecutionsRequest;
60-
import com.uber.cadence.ListWorkflowExecutionsResponse;
61-
import com.uber.cadence.PollForActivityTaskRequest;
62-
import com.uber.cadence.PollForActivityTaskResponse;
63-
import com.uber.cadence.PollForDecisionTaskRequest;
64-
import com.uber.cadence.PollForDecisionTaskResponse;
65-
import com.uber.cadence.QueryFailedError;
66-
import com.uber.cadence.QueryWorkflowRequest;
67-
import com.uber.cadence.QueryWorkflowResponse;
68-
import com.uber.cadence.RecordActivityTaskHeartbeatByIDRequest;
69-
import com.uber.cadence.RecordActivityTaskHeartbeatRequest;
70-
import com.uber.cadence.RecordActivityTaskHeartbeatResponse;
71-
import com.uber.cadence.RefreshWorkflowTasksRequest;
72-
import com.uber.cadence.RegisterDomainRequest;
73-
import com.uber.cadence.RequestCancelWorkflowExecutionRequest;
74-
import com.uber.cadence.ResetStickyTaskListRequest;
75-
import com.uber.cadence.ResetStickyTaskListResponse;
76-
import com.uber.cadence.ResetWorkflowExecutionRequest;
77-
import com.uber.cadence.ResetWorkflowExecutionResponse;
78-
import com.uber.cadence.RespondActivityTaskCanceledByIDRequest;
79-
import com.uber.cadence.RespondActivityTaskCanceledRequest;
80-
import com.uber.cadence.RespondActivityTaskCompletedByIDRequest;
81-
import com.uber.cadence.RespondActivityTaskCompletedRequest;
82-
import com.uber.cadence.RespondActivityTaskFailedByIDRequest;
83-
import com.uber.cadence.RespondActivityTaskFailedRequest;
84-
import com.uber.cadence.RespondDecisionTaskCompletedRequest;
85-
import com.uber.cadence.RespondDecisionTaskCompletedResponse;
86-
import com.uber.cadence.RespondDecisionTaskFailedRequest;
87-
import com.uber.cadence.RespondQueryTaskCompletedRequest;
88-
import com.uber.cadence.ServiceBusyError;
89-
import com.uber.cadence.SignalWithStartWorkflowExecutionRequest;
90-
import com.uber.cadence.SignalWorkflowExecutionRequest;
91-
import com.uber.cadence.StartWorkflowExecutionRequest;
92-
import com.uber.cadence.StartWorkflowExecutionResponse;
93-
import com.uber.cadence.TerminateWorkflowExecutionRequest;
94-
import com.uber.cadence.UpdateDomainRequest;
95-
import com.uber.cadence.UpdateDomainResponse;
96-
import com.uber.cadence.WorkflowExecutionAlreadyCompletedError;
97-
import com.uber.cadence.WorkflowExecutionAlreadyStartedError;
98-
import com.uber.cadence.WorkflowService;
26+
import com.uber.cadence.*;
9927
import com.uber.cadence.WorkflowService.GetWorkflowExecutionHistory_result;
10028
import com.uber.cadence.internal.Version;
10129
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
@@ -114,6 +42,10 @@
11442
import com.uber.tchannel.messages.ThriftRequest;
11543
import com.uber.tchannel.messages.ThriftResponse;
11644
import com.uber.tchannel.messages.generated.Meta;
45+
import io.opentelemetry.api.GlobalOpenTelemetry;
46+
import io.opentelemetry.context.Context;
47+
import io.opentelemetry.context.propagation.TextMapPropagator;
48+
import io.opentelemetry.context.propagation.TextMapSetter;
11749
import java.net.InetAddress;
11850
import java.net.InetSocketAddress;
11951
import java.net.UnknownHostException;
@@ -275,13 +207,27 @@ public CompletableFuture<Boolean> isHealthy() {
275207
return result;
276208
}
277209

278-
private <T> ThriftRequest<T> buildThriftRequest(String apiName, T body, Long rpcTimeoutOverride) {
210+
protected <T> ThriftRequest<T> buildThriftRequest(
211+
String apiName, T body, Long rpcTimeoutOverride) {
279212
String endpoint = getEndpoint(INTERFACE_NAME, apiName);
280213
ThriftRequest.Builder<T> builder =
281214
new ThriftRequest.Builder<>(options.getServiceName(), endpoint);
282215
// Create a mutable hashmap for headers, as tchannel.tracing.PrefixedHeadersCarrier assumes
283216
// that it can call put directly to add new stuffs (e.g. traces).
284217
final HashMap<String, String> headers = new HashMap<>(thriftHeaders);
218+
TextMapPropagator textMapPropagator =
219+
GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
220+
221+
String tracingHeadersPrefix = "$tracing$";
222+
TextMapSetter<Map<String, String>> setter =
223+
(carrier, key, value) -> {
224+
if (carrier != null) {
225+
carrier.put(tracingHeadersPrefix + key, value);
226+
}
227+
};
228+
229+
textMapPropagator.inject(Context.current(), headers, setter);
230+
285231
if (this.options.getAuthProvider() != null) {
286232
headers.put(
287233
"cadence-authorization",

0 commit comments

Comments
 (0)