Skip to content

Commit 3632343

Browse files
author
Kapil Rastogi
authored
Merge pull request #72 from ExpediaDotCom/haystack-opencensus-integration
Adding haystack opencensus integration
2 parents c2e37fe + 457ed8e commit 3632343

File tree

40 files changed

+1699
-309
lines changed

40 files changed

+1699
-309
lines changed

.travis.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1+
sudo: required
2+
13
dist: trusty
24

5+
services:
6+
- docker
7+
38
language: java
49

510
jdk:
@@ -9,6 +14,11 @@ cache:
914
directories:
1015
- $HOME/.m2
1116

17+
addons:
18+
hosts:
19+
- haystack-agent
20+
- kafkasvc
21+
1222
install:
1323
- ./mvnw --batch-mode install -B -V
1424

commons/pom.xml

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
2+
3+
<modelVersion>4.0.0</modelVersion>
4+
5+
<parent>
6+
<groupId>com.expedia.www</groupId>
7+
<artifactId>haystack-client-java-parent</artifactId>
8+
<version>0.2.1-SNAPSHOT</version>
9+
</parent>
10+
11+
<artifactId>haystack-client-commons</artifactId>
12+
<packaging>jar</packaging>
13+
<name>haystack-client-commons</name>
14+
15+
<dependencies>
16+
<dependency>
17+
<groupId>com.expedia.www</groupId>
18+
<artifactId>haystack-commons</artifactId>
19+
</dependency>
20+
21+
<dependency>
22+
<groupId>io.grpc</groupId>
23+
<artifactId>grpc-netty-shaded</artifactId>
24+
</dependency>
25+
26+
<dependency>
27+
<groupId>io.grpc</groupId>
28+
<artifactId>grpc-protobuf</artifactId>
29+
</dependency>
30+
31+
<dependency>
32+
<groupId>io.grpc</groupId>
33+
<artifactId>grpc-stub</artifactId>
34+
</dependency>
35+
36+
<dependency>
37+
<groupId>org.apache.commons</groupId>
38+
<artifactId>commons-lang3</artifactId>
39+
</dependency>
40+
41+
<dependency>
42+
<groupId>org.slf4j</groupId>
43+
<artifactId>slf4j-api</artifactId>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>org.apache.httpcomponents</groupId>
48+
<artifactId>httpclient</artifactId>
49+
</dependency>
50+
51+
<!-- Test dependencies -->
52+
<dependency>
53+
<groupId>junit</groupId>
54+
<artifactId>junit</artifactId>
55+
<scope>test</scope>
56+
</dependency>
57+
58+
<dependency>
59+
<groupId>org.mockito</groupId>
60+
<artifactId>mockito-core</artifactId>
61+
<scope>test</scope>
62+
</dependency>
63+
64+
<dependency>
65+
<groupId>io.grpc</groupId>
66+
<artifactId>grpc-testing</artifactId>
67+
<scope>test</scope>
68+
</dependency>
69+
70+
<dependency>
71+
<groupId>org.awaitility</groupId>
72+
<artifactId>awaitility</artifactId>
73+
<scope>test</scope>
74+
</dependency>
75+
</dependencies>
76+
77+
</project>
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
* Copyright 2018 Expedia, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package com.expedia.www.haystack.client.dispatchers.clients;
19+
20+
import com.expedia.open.tracing.agent.api.DispatchResult;
21+
import com.expedia.open.tracing.agent.api.SpanAgentGrpc;
22+
import com.expedia.www.haystack.client.metrics.*;
23+
import io.grpc.ManagedChannel;
24+
import io.grpc.netty.shaded.io.grpc.netty.NegotiationType;
25+
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
26+
import io.grpc.stub.StreamObserver;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.util.Arrays;
31+
import java.util.concurrent.TimeUnit;
32+
33+
abstract public class BaseGrpcClient<R> implements Client<R> {
34+
private static final Logger LOGGER = LoggerFactory.getLogger(BaseGrpcClient.class);
35+
36+
protected final ManagedChannel channel;
37+
protected final SpanAgentGrpc.SpanAgentStub stub;
38+
protected final long shutdownTimeoutMS;
39+
protected final StreamObserver<DispatchResult> observer;
40+
41+
protected final Timer sendTimer;
42+
protected final Counter sendExceptionCounter;
43+
protected final Timer closeTimer;
44+
protected final Counter closeTimeoutCounter;
45+
protected final Counter closeInterruptedCounter;
46+
protected final Counter closeExceptionCounter;
47+
protected final Counter flushCounter;
48+
49+
public BaseGrpcClient(Metrics metrics,
50+
ManagedChannel channel,
51+
SpanAgentGrpc.SpanAgentStub stub, StreamObserver<DispatchResult> observer,
52+
long shutdownTimeoutMS) {
53+
this.channel = channel;
54+
this.stub = stub;
55+
this.shutdownTimeoutMS = shutdownTimeoutMS;
56+
this.observer = observer;
57+
58+
this.sendTimer = Timer.builder("send").register(metrics);
59+
this.sendExceptionCounter = Counter.builder("send").tag(new Tag("state", "exception")).register(metrics);
60+
this.closeTimer = Timer.builder("close").register(metrics);
61+
this.closeTimeoutCounter = Counter.builder("close").tag(new Tag("state", "timeout")).register(metrics);
62+
this.closeInterruptedCounter = Counter.builder("close").tag(new Tag("state", "interrupted")).register(metrics);
63+
this.closeExceptionCounter = Counter.builder("close").tag(new Tag("state", "exception")).register(metrics);
64+
this.flushCounter = Counter.builder("flush").register(metrics);
65+
}
66+
67+
public void close() {
68+
try (Timer.Sample timer = closeTimer.start()) {
69+
channel.shutdown();
70+
try {
71+
if (!channel.awaitTermination(shutdownTimeoutMS, TimeUnit.SECONDS)) {
72+
channel.shutdownNow();
73+
closeTimeoutCounter.increment();
74+
LOGGER.warn("Channel failed to terminate, forcibly closing it.");
75+
if (!channel.awaitTermination(shutdownTimeoutMS, TimeUnit.SECONDS)) {
76+
closeTimeoutCounter.increment();
77+
LOGGER.error("Channel failed to terminate.");
78+
}
79+
}
80+
} catch (InterruptedException e) {
81+
closeInterruptedCounter.increment();
82+
LOGGER.error("Unable to close the channel.", e);
83+
}
84+
} catch (Exception e) {
85+
closeExceptionCounter.increment();
86+
LOGGER.error("Unexpected exception caught on client shutdown.", e);
87+
throw e;
88+
}
89+
}
90+
91+
public void flush() {
92+
flushCounter.increment();
93+
}
94+
95+
public static class GRPCAgentClientStreamObserver implements StreamObserver<DispatchResult> {
96+
private Counter onCompletedCounter;
97+
private Counter onErrorCounter;
98+
private Counter ratelimitCounter;
99+
private Counter unknownCounter;
100+
private Counter badresultCounter;
101+
102+
public GRPCAgentClientStreamObserver(Metrics metrics) {
103+
this.onCompletedCounter = Counter.builder("observer").tag(new Tag("state", "completed")).register(metrics);
104+
this.onErrorCounter = Counter.builder("observer").tag(new Tag("state", "error")).register(metrics);
105+
this.ratelimitCounter = Counter.builder("observer").tag(new Tag("state", "ratelimited")).register(metrics);
106+
this.unknownCounter = Counter.builder("observer").tag(new Tag("state", "unknown")).register(metrics);
107+
this.badresultCounter = Counter.builder("observer").tag(new Tag("state", "badresult")).register(metrics);
108+
}
109+
110+
@Override
111+
public void onCompleted() {
112+
onCompletedCounter.increment();
113+
LOGGER.debug("Dispatching span completed");
114+
}
115+
116+
@Override
117+
public void onError(Throwable t) {
118+
onErrorCounter.increment();
119+
LOGGER.error("Dispatching span failed with error: {}", t);
120+
}
121+
122+
@Override
123+
public void onNext(DispatchResult value) {
124+
switch (value.getCode()) {
125+
case SUCCESS:
126+
// do nothing
127+
break;
128+
case RATE_LIMIT_ERROR:
129+
ratelimitCounter.increment();
130+
LOGGER.error("Rate limit error received from agent");
131+
break;
132+
case UNKNOWN_ERROR:
133+
unknownCounter.increment();
134+
LOGGER.error("Unknown error received from agent");
135+
break;
136+
default:
137+
badresultCounter.increment();
138+
LOGGER.error("Unknown result received from agent: {}", value.getCode());
139+
}
140+
}
141+
}
142+
143+
public static abstract class Builder {
144+
protected StreamObserver<DispatchResult> observer;
145+
146+
protected Metrics metrics;
147+
148+
// Options to build a channel
149+
protected String host;
150+
protected int port;
151+
protected long keepAliveTimeMS = TimeUnit.SECONDS.toMillis(30);
152+
protected long keepAliveTimeoutMS = TimeUnit.SECONDS.toMillis(30);
153+
protected boolean keepAliveWithoutCalls = true;
154+
protected NegotiationType negotiationType = NegotiationType.PLAINTEXT;
155+
156+
// either build a channel or provide one
157+
protected ManagedChannel channel;
158+
159+
protected long shutdownTimeoutMS = TimeUnit.SECONDS.toMillis(30);
160+
161+
private Builder(MetricsRegistry registry) {
162+
this(new Metrics(registry, Client.class.getName(), Arrays.asList(new Tag("type", "grpc"))));
163+
}
164+
165+
private Builder(Metrics metrics) {
166+
this.observer = new GRPCAgentClientStreamObserver(metrics);
167+
this.metrics = metrics;
168+
169+
}
170+
171+
public Builder(MetricsRegistry metrics, ManagedChannel channel) {
172+
this(metrics);
173+
this.channel = channel;
174+
}
175+
176+
public Builder(Metrics metrics, ManagedChannel channel) {
177+
this(metrics);
178+
this.channel = channel;
179+
}
180+
181+
public Builder(MetricsRegistry metrics, String host, int port) {
182+
this(metrics);
183+
this.host = host;
184+
this.port = port;
185+
}
186+
187+
public Builder(Metrics metrics, String host, int port) {
188+
this(metrics);
189+
this.host = host;
190+
this.port = port;
191+
}
192+
193+
public Builder withObserver(StreamObserver<DispatchResult> observer) {
194+
this.observer = observer;
195+
return this;
196+
}
197+
198+
public Builder withKeepAliveTimeMS(long keepAliveTimeMS) {
199+
this.keepAliveTimeMS = keepAliveTimeMS;
200+
return this;
201+
}
202+
203+
public Builder withKeepAliveTimeoutMS(long keepAliveTimeoutMS) {
204+
this.keepAliveTimeoutMS = keepAliveTimeoutMS;
205+
return this;
206+
}
207+
208+
public Builder withKeepAliveWithoutCalls(boolean keepAliveWithoutCalls) {
209+
this.keepAliveWithoutCalls = keepAliveWithoutCalls;
210+
return this;
211+
}
212+
213+
public Builder withNegotiationType(NegotiationType negotiationType) {
214+
this.negotiationType = negotiationType;
215+
return this;
216+
}
217+
218+
public Builder withShutdownTimeoutMS(long shutdownTimeoutMS) {
219+
this.shutdownTimeoutMS = shutdownTimeoutMS;
220+
return this;
221+
}
222+
223+
protected ManagedChannel buildManagedChannel() {
224+
return NettyChannelBuilder.forAddress(host, port)
225+
.keepAliveTime(keepAliveTimeMS, TimeUnit.MILLISECONDS)
226+
.keepAliveTimeout(keepAliveTimeoutMS, TimeUnit.MILLISECONDS)
227+
.keepAliveWithoutCalls(keepAliveWithoutCalls)
228+
.negotiationType(negotiationType)
229+
.build();
230+
}
231+
232+
public abstract BaseGrpcClient build();
233+
}
234+
}

0 commit comments

Comments
 (0)