Skip to content

Commit 779da66

Browse files
committed
[OTLP] Add custom OTLP SigV4 OkHttp exporter
1 parent 2740c56 commit 779da66

File tree

6 files changed

+650
-0
lines changed

6 files changed

+650
-0
lines changed

awsagentprovider/build.gradle.kts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ dependencies {
4444
compileOnly("io.opentelemetry:opentelemetry-exporter-otlp")
4545
// For Udp emitter
4646
compileOnly("io.opentelemetry:opentelemetry-exporter-otlp-common")
47+
// For HTTP SigV4 emitter
48+
compileOnly("com.squareup.okhttp3:okhttp")
49+
implementation("software.amazon.awssdk:auth:2.30.14")
50+
implementation("software.amazon.awssdk:http-auth-aws:2.30.14")
4751

4852
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
4953
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")

awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,8 @@ private SdkTracerProviderBuilder customizeTracerProviderBuilder(
221221
return tracerProviderBuilder;
222222
}
223223

224+
// TODO: RETURN HERE IF OUR USE CASE IS HIT
225+
224226
// Construct meterProvider
225227
MetricExporter metricsExporter =
226228
ApplicationSignalsExporterProvider.INSTANCE.createExporter(configProps);
@@ -286,6 +288,12 @@ private SpanExporter customizeSpanExporter(
286288
.build();
287289
}
288290
}
291+
// When running OTLP endpoint for X-Ray backend, use custom exporter for SigV4 authentication
292+
// TODO: Figure out if `isOtlpSpanExporter(spanExporter)` is needed in the condition
293+
else if (OtlpSigV4HttpSpanExporterBuilder.CLOUDWATCH_OTLP_TRACES_ENDPOINT.equals(
294+
System.getenv(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT_CONFIG))) {
295+
spanExporter = new OtlpSigV4HttpSpanExporterBuilder().build();
296+
}
289297

290298
if (isApplicationSignalsEnabled(configProps)) {
291299
return AwsMetricAttributesSpanExporterBuilder.create(
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.opentelemetry.javaagent.providers;
17+
18+
import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.AUTHORIZATION;
19+
import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.HOST;
20+
import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.X_AMZ_CONTENT_SHA256;
21+
import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.X_AMZ_DATE;
22+
23+
import io.opentelemetry.exporter.internal.http.HttpSender.Response;
24+
import io.opentelemetry.exporter.internal.marshal.Marshaler;
25+
import io.opentelemetry.sdk.common.CompletableResultCode;
26+
import io.opentelemetry.sdk.internal.DaemonThreadFactory;
27+
import java.io.IOException;
28+
import java.time.Duration;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.concurrent.SynchronousQueue;
32+
import java.util.concurrent.ThreadPoolExecutor;
33+
import java.util.concurrent.TimeUnit;
34+
import java.util.function.Consumer;
35+
import java.util.function.Supplier;
36+
import java.util.logging.Logger;
37+
import javax.annotation.Nullable;
38+
import okhttp3.Call;
39+
import okhttp3.Callback;
40+
import okhttp3.Dispatcher;
41+
import okhttp3.HttpUrl;
42+
import okhttp3.MediaType;
43+
import okhttp3.OkHttpClient;
44+
import okhttp3.Request;
45+
import okhttp3.RequestBody;
46+
import okhttp3.ResponseBody;
47+
import okio.BufferedSink;
48+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
49+
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
50+
import software.amazon.awssdk.http.SdkHttpRequest;
51+
import software.amazon.awssdk.http.auth.aws.signer.AwsV4HttpSigner;
52+
import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;
53+
54+
/**
55+
* This class represents a HTTP sender that sends data to a specified endpoint. It is used to send
56+
* data to an endpoint, e.g. an OTLP endpoint.
57+
*/
58+
public class OtlpSigV4HttpSender {
59+
private static final Logger logger = Logger.getLogger(OtlpSigV4HttpSender.class.getName());
60+
61+
private final OkHttpClient client;
62+
private final HttpUrl url;
63+
// @Nullable private final Compressor compressor;
64+
private final boolean exportAsJson;
65+
private final Supplier<Map<String, List<String>>> headerSupplier;
66+
private final MediaType mediaType;
67+
68+
private final AwsV4HttpSigner signer;
69+
private final AwsCredentialsProvider awsCredentialsProvider;
70+
private final String regionName = "us-east-1"; // TODO: Fix
71+
private final String serviceName = "xray"; // TODO: Fix
72+
73+
public OtlpSigV4HttpSender(
74+
String endpoint,
75+
long timeoutNanos,
76+
long connectionTimeoutNanos,
77+
Supplier<Map<String, List<String>>> headerSupplier,
78+
boolean exportAsJson) {
79+
80+
int callTimeoutMillis =
81+
(int) Math.min(Duration.ofNanos(timeoutNanos).toMillis(), Integer.MAX_VALUE);
82+
int connectTimeoutMillis =
83+
(int) Math.min(Duration.ofNanos(connectionTimeoutNanos).toMillis(), Integer.MAX_VALUE);
84+
Dispatcher dispatcher =
85+
new Dispatcher(
86+
new ThreadPoolExecutor(
87+
0,
88+
Integer.MAX_VALUE,
89+
60,
90+
TimeUnit.SECONDS,
91+
new SynchronousQueue<>(),
92+
new DaemonThreadFactory("okhttp-dispatch", false) // TODO, remove hardcodes
93+
));
94+
OkHttpClient.Builder builder =
95+
new OkHttpClient.Builder()
96+
.dispatcher(dispatcher)
97+
.connectTimeout(Duration.ofMillis(connectTimeoutMillis))
98+
.callTimeout(Duration.ofMillis(callTimeoutMillis));
99+
100+
this.client = builder.build();
101+
this.url = HttpUrl.get(endpoint);
102+
// this.compressor = compressor;
103+
this.exportAsJson = exportAsJson;
104+
this.mediaType =
105+
MediaType.get("application/json"); // TODO: Fix, make dynamic relative to exportAsJson input
106+
this.headerSupplier = headerSupplier;
107+
108+
this.signer = AwsV4HttpSigner.create();
109+
this.awsCredentialsProvider = DefaultCredentialsProvider.create();
110+
}
111+
112+
public CompletableResultCode shutdown() {
113+
client.dispatcher().cancelAll();
114+
client.dispatcher().executorService().shutdownNow();
115+
client.connectionPool().evictAll();
116+
return CompletableResultCode.ofSuccess();
117+
}
118+
119+
public void send(
120+
Marshaler marshaler,
121+
int contentLength,
122+
Consumer<Response> onResponse,
123+
Consumer<Throwable> onError) {
124+
Request.Builder requestBuilder = new Request.Builder().url(url);
125+
126+
Map<String, List<String>> headers = headerSupplier.get();
127+
if (headers != null) {
128+
headers.forEach(
129+
(key, values) -> values.forEach(value -> requestBuilder.addHeader(key, value)));
130+
}
131+
RequestBody body = new RawRequestBody(marshaler, exportAsJson, contentLength, mediaType);
132+
133+
// TODO: Add RequestBody compression logic
134+
requestBuilder.post(body);
135+
136+
Request req = requestBuilder.build();
137+
SdkHttpRequest sdkReq = SigV4Util.toSignableRequest(req);
138+
logger.info("SdkHttpRequest generated: " + sdkReq.toString());
139+
SignedRequest signedRequest =
140+
signer.sign(
141+
r ->
142+
r.identity(awsCredentialsProvider.resolveCredentials())
143+
.request(sdkReq)
144+
.payload(SigV4Util.toContentStream(req))
145+
.putProperty(AwsV4HttpSigner.SERVICE_SIGNING_NAME, this.serviceName)
146+
.putProperty(AwsV4HttpSigner.REGION_NAME, this.regionName));
147+
148+
logger.info(signedRequest.toString());
149+
150+
requestBuilder.removeHeader(HOST);
151+
requestBuilder.addHeader(X_AMZ_DATE, SigV4Util.getSingleHeaderValue(signedRequest, X_AMZ_DATE));
152+
requestBuilder.addHeader(HOST, SigV4Util.getSingleHeaderValue(signedRequest, HOST));
153+
requestBuilder.addHeader(
154+
AUTHORIZATION, SigV4Util.getSingleHeaderValue(signedRequest, AUTHORIZATION));
155+
requestBuilder.addHeader(
156+
X_AMZ_CONTENT_SHA256, SigV4Util.getSingleHeaderValue(signedRequest, X_AMZ_CONTENT_SHA256));
157+
158+
// Rebuild the original request using the signed request headers
159+
Request finalSignedReq = requestBuilder.build();
160+
logger.info("Signed request headers - X_AMZ_DATE: " + finalSignedReq.headers().get(X_AMZ_DATE));
161+
logger.info("Signed request headers - HOST: " + finalSignedReq.headers().get(HOST));
162+
logger.info(
163+
"Signed request headers - AUTHORIZATION: " + finalSignedReq.headers().get(AUTHORIZATION));
164+
logger.info(
165+
"Signed request headers - X_AMZ_CONTENT_SHA256: "
166+
+ finalSignedReq.headers().get(X_AMZ_CONTENT_SHA256));
167+
168+
client
169+
.newCall(finalSignedReq)
170+
.enqueue(
171+
new Callback() {
172+
@Override
173+
public void onFailure(Call call, IOException e) {
174+
onError.accept(e);
175+
}
176+
177+
@Override
178+
public void onResponse(Call call, okhttp3.Response response) {
179+
try (ResponseBody body = response.body()) {
180+
onResponse.accept(
181+
new Response() {
182+
@Nullable private byte[] bodyBytes;
183+
184+
@Override
185+
public int statusCode() {
186+
return response.code();
187+
}
188+
189+
@Override
190+
public String statusMessage() {
191+
return response.message();
192+
}
193+
194+
@Override
195+
public byte[] responseBody() throws IOException {
196+
if (bodyBytes == null) {
197+
bodyBytes = body.bytes();
198+
}
199+
return bodyBytes;
200+
}
201+
});
202+
}
203+
}
204+
});
205+
}
206+
207+
private static class RawRequestBody extends RequestBody {
208+
209+
private final Marshaler marshaler;
210+
private final boolean exportAsJson;
211+
private final int contentLength;
212+
private final MediaType mediaType;
213+
214+
private RawRequestBody(
215+
Marshaler marshaler, boolean exportAsJson, int contentLength, MediaType mediaType) {
216+
this.marshaler = marshaler;
217+
this.exportAsJson = exportAsJson;
218+
this.contentLength = contentLength;
219+
this.mediaType = mediaType;
220+
}
221+
222+
@Override
223+
public long contentLength() {
224+
return contentLength;
225+
}
226+
227+
@Override
228+
public MediaType contentType() {
229+
return mediaType;
230+
}
231+
232+
@Override
233+
public void writeTo(BufferedSink bufferedSink) throws IOException {
234+
if (exportAsJson) {
235+
marshaler.writeJsonTo(bufferedSink.outputStream());
236+
} else {
237+
marshaler.writeBinaryTo(bufferedSink.outputStream());
238+
}
239+
}
240+
}
241+
242+
// Visible for testing
243+
String getUrl() {
244+
return url.toString();
245+
}
246+
}

0 commit comments

Comments
 (0)