Skip to content

Commit f89fc05

Browse files
authored
Add HttpSender abstraction (#5505)
1 parent ac0b4e4 commit f89fc05

File tree

23 files changed

+687
-573
lines changed

23 files changed

+687
-573
lines changed

exporters/common/src/main/java/io/opentelemetry/exporter/internal/auth/Authenticator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
package io.opentelemetry.exporter.internal.auth;
77

88
import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder;
9-
import io.opentelemetry.exporter.internal.okhttp.OkHttpExporterBuilder;
9+
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
1010
import java.lang.reflect.Field;
1111
import java.util.Map;
1212

@@ -27,7 +27,7 @@ public interface Authenticator {
2727
Map<String, String> getHeaders();
2828

2929
/**
30-
* Reflectively access a {@link GrpcExporterBuilder}, or {@link OkHttpExporterBuilder} instance in
30+
* Reflectively access a {@link GrpcExporterBuilder}, or {@link HttpExporterBuilder} instance in
3131
* field called "delegate" of the instance, and set the {@link Authenticator}.
3232
*
3333
* @param builder export builder to modify
@@ -42,8 +42,8 @@ static void setAuthenticatorOnDelegate(Object builder, Authenticator authenticat
4242
Object value = field.get(builder);
4343
if (value instanceof GrpcExporterBuilder) {
4444
throw new IllegalArgumentException("GrpcExporterBuilder not supported yet.");
45-
} else if (value instanceof OkHttpExporterBuilder) {
46-
((OkHttpExporterBuilder<?>) value).setAuthenticator(authenticator);
45+
} else if (value instanceof HttpExporterBuilder) {
46+
((HttpExporterBuilder<?>) value).setAuthenticator(authenticator);
4747
} else {
4848
throw new IllegalArgumentException(
4949
"Delegate field is not type DefaultGrpcExporterBuilder or OkHttpGrpcExporterBuilder.");
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.exporter.internal.http;
7+
8+
import io.opentelemetry.api.metrics.MeterProvider;
9+
import io.opentelemetry.exporter.internal.ExporterMetrics;
10+
import io.opentelemetry.exporter.internal.grpc.GrpcStatusUtil;
11+
import io.opentelemetry.exporter.internal.marshal.Marshaler;
12+
import io.opentelemetry.sdk.common.CompletableResultCode;
13+
import io.opentelemetry.sdk.internal.ThrottlingLogger;
14+
import java.io.IOException;
15+
import java.io.OutputStream;
16+
import java.util.concurrent.atomic.AtomicBoolean;
17+
import java.util.function.Consumer;
18+
import java.util.function.Supplier;
19+
import java.util.logging.Level;
20+
import java.util.logging.Logger;
21+
import javax.annotation.Nullable;
22+
23+
/**
24+
* An exporter for http/protobuf or http/json using a signal-specific Marshaler.
25+
*
26+
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
27+
* at any time.
28+
*/
29+
@SuppressWarnings("checkstyle:JavadocMethod")
30+
public final class HttpExporter<T extends Marshaler> {
31+
32+
private static final Logger internalLogger = Logger.getLogger(HttpExporter.class.getName());
33+
34+
private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger);
35+
private final AtomicBoolean isShutdown = new AtomicBoolean();
36+
37+
private final String type;
38+
private final HttpSender httpSender;
39+
private final ExporterMetrics exporterMetrics;
40+
private final boolean exportAsJson;
41+
42+
public HttpExporter(
43+
String exporterName,
44+
String type,
45+
HttpSender httpSender,
46+
Supplier<MeterProvider> meterProviderSupplier,
47+
boolean exportAsJson) {
48+
this.type = type;
49+
this.httpSender = httpSender;
50+
this.exporterMetrics =
51+
exportAsJson
52+
? ExporterMetrics.createHttpJson(exporterName, type, meterProviderSupplier)
53+
: ExporterMetrics.createHttpProtobuf(exporterName, type, meterProviderSupplier);
54+
this.exportAsJson = exportAsJson;
55+
}
56+
57+
public CompletableResultCode export(T exportRequest, int numItems) {
58+
if (isShutdown.get()) {
59+
return CompletableResultCode.ofFailure();
60+
}
61+
62+
exporterMetrics.addSeen(numItems);
63+
64+
CompletableResultCode result = new CompletableResultCode();
65+
66+
Consumer<OutputStream> marshaler =
67+
os -> {
68+
try {
69+
if (exportAsJson) {
70+
exportRequest.writeJsonTo(os);
71+
} else {
72+
exportRequest.writeBinaryTo(os);
73+
}
74+
} catch (IOException e) {
75+
throw new IllegalStateException(e);
76+
}
77+
};
78+
79+
httpSender.send(
80+
marshaler,
81+
exportRequest.getBinarySerializedSize(),
82+
httpResponse -> {
83+
int statusCode = httpResponse.statusCode();
84+
85+
if (statusCode >= 200 && statusCode < 300) {
86+
exporterMetrics.addSuccess(numItems);
87+
result.succeed();
88+
return;
89+
}
90+
91+
exporterMetrics.addFailed(numItems);
92+
93+
byte[] body;
94+
try {
95+
body = httpResponse.responseBody();
96+
} catch (IOException ex) {
97+
throw new IllegalStateException(ex);
98+
}
99+
100+
String status = extractErrorStatus(httpResponse.statusMessage(), body);
101+
102+
logger.log(
103+
Level.WARNING,
104+
"Failed to export "
105+
+ type
106+
+ "s. Server responded with HTTP status code "
107+
+ statusCode
108+
+ ". Error message: "
109+
+ status);
110+
result.fail();
111+
},
112+
e -> {
113+
exporterMetrics.addFailed(numItems);
114+
logger.log(
115+
Level.SEVERE,
116+
"Failed to export "
117+
+ type
118+
+ "s. The request could not be executed. Full error message: "
119+
+ e.getMessage(),
120+
e);
121+
result.fail();
122+
});
123+
124+
return result;
125+
}
126+
127+
public CompletableResultCode shutdown() {
128+
if (!isShutdown.compareAndSet(false, true)) {
129+
logger.log(Level.INFO, "Calling shutdown() multiple times.");
130+
return CompletableResultCode.ofSuccess();
131+
}
132+
return httpSender.shutdown();
133+
}
134+
135+
private static String extractErrorStatus(String statusMessage, @Nullable byte[] responseBody) {
136+
if (responseBody == null) {
137+
return "Response body missing, HTTP status message: " + statusMessage;
138+
}
139+
try {
140+
return GrpcStatusUtil.getStatusMessage(responseBody);
141+
} catch (IOException e) {
142+
return "Unable to parse response body, HTTP status message: " + statusMessage;
143+
}
144+
}
145+
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.exporter.internal.http;
7+
8+
import io.opentelemetry.api.GlobalOpenTelemetry;
9+
import io.opentelemetry.api.metrics.MeterProvider;
10+
import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
11+
import io.opentelemetry.exporter.internal.TlsConfigHelper;
12+
import io.opentelemetry.exporter.internal.auth.Authenticator;
13+
import io.opentelemetry.exporter.internal.marshal.Marshaler;
14+
import io.opentelemetry.exporter.internal.okhttp.OkHttpHttpSender;
15+
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
16+
import java.net.URI;
17+
import java.time.Duration;
18+
import java.util.Collections;
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.function.Supplier;
23+
import javax.annotation.Nullable;
24+
import javax.net.ssl.SSLContext;
25+
import javax.net.ssl.X509TrustManager;
26+
27+
/**
28+
* A builder for {@link HttpExporter}.
29+
*
30+
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
31+
* at any time.
32+
*/
33+
@SuppressWarnings("checkstyle:JavadocMethod")
34+
public final class HttpExporterBuilder<T extends Marshaler> {
35+
public static final long DEFAULT_TIMEOUT_SECS = 10;
36+
37+
private final String exporterName;
38+
private final String type;
39+
40+
private String endpoint;
41+
42+
private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS);
43+
private boolean compressionEnabled = false;
44+
private boolean exportAsJson = false;
45+
@Nullable private Map<String, String> headers;
46+
47+
private final TlsConfigHelper tlsConfigHelper = new TlsConfigHelper();
48+
@Nullable private RetryPolicy retryPolicy;
49+
private Supplier<MeterProvider> meterProviderSupplier = GlobalOpenTelemetry::getMeterProvider;
50+
@Nullable private Authenticator authenticator;
51+
52+
public HttpExporterBuilder(String exporterName, String type, String defaultEndpoint) {
53+
this.exporterName = exporterName;
54+
this.type = type;
55+
56+
endpoint = defaultEndpoint;
57+
}
58+
59+
public HttpExporterBuilder<T> setTimeout(long timeout, TimeUnit unit) {
60+
timeoutNanos = unit.toNanos(timeout);
61+
return this;
62+
}
63+
64+
public HttpExporterBuilder<T> setTimeout(Duration timeout) {
65+
return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
66+
}
67+
68+
public HttpExporterBuilder<T> setEndpoint(String endpoint) {
69+
URI uri = ExporterBuilderUtil.validateEndpoint(endpoint);
70+
this.endpoint = uri.toString();
71+
return this;
72+
}
73+
74+
public HttpExporterBuilder<T> setCompression(String compressionMethod) {
75+
this.compressionEnabled = compressionMethod.equals("gzip");
76+
return this;
77+
}
78+
79+
public HttpExporterBuilder<T> addHeader(String key, String value) {
80+
if (headers == null) {
81+
headers = new HashMap<>();
82+
}
83+
headers.put(key, value);
84+
return this;
85+
}
86+
87+
public HttpExporterBuilder<T> setAuthenticator(Authenticator authenticator) {
88+
this.authenticator = authenticator;
89+
return this;
90+
}
91+
92+
public HttpExporterBuilder<T> setTrustManagerFromCerts(byte[] trustedCertificatesPem) {
93+
tlsConfigHelper.setTrustManagerFromCerts(trustedCertificatesPem);
94+
return this;
95+
}
96+
97+
public HttpExporterBuilder<T> setKeyManagerFromCerts(
98+
byte[] privateKeyPem, byte[] certificatePem) {
99+
tlsConfigHelper.setKeyManagerFromCerts(privateKeyPem, certificatePem);
100+
return this;
101+
}
102+
103+
public HttpExporterBuilder<T> setSslContext(
104+
SSLContext sslContext, X509TrustManager trustManager) {
105+
tlsConfigHelper.setSslContext(sslContext, trustManager);
106+
return this;
107+
}
108+
109+
public HttpExporterBuilder<T> setMeterProvider(MeterProvider meterProvider) {
110+
this.meterProviderSupplier = () -> meterProvider;
111+
return this;
112+
}
113+
114+
public HttpExporterBuilder<T> setRetryPolicy(RetryPolicy retryPolicy) {
115+
this.retryPolicy = retryPolicy;
116+
return this;
117+
}
118+
119+
public HttpExporterBuilder<T> exportAsJson() {
120+
this.exportAsJson = true;
121+
return this;
122+
}
123+
124+
public HttpExporter<T> build() {
125+
Map<String, String> headers = this.headers == null ? Collections.emptyMap() : this.headers;
126+
Supplier<Map<String, String>> headerSupplier = () -> headers;
127+
128+
HttpSender httpSender =
129+
new OkHttpHttpSender(
130+
endpoint,
131+
compressionEnabled,
132+
exportAsJson ? "application/json" : "application/x-protobuf",
133+
timeoutNanos,
134+
headerSupplier,
135+
authenticator,
136+
retryPolicy,
137+
tlsConfigHelper.getSslContext(),
138+
tlsConfigHelper.getTrustManager());
139+
140+
return new HttpExporter<>(exporterName, type, httpSender, meterProviderSupplier, exportAsJson);
141+
}
142+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.exporter.internal.http;
7+
8+
import io.opentelemetry.sdk.common.CompletableResultCode;
9+
import java.io.IOException;
10+
import java.io.OutputStream;
11+
import java.util.function.Consumer;
12+
13+
/**
14+
* An abstraction for sending HTTP requests and handling responses.
15+
*
16+
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
17+
* at any time.
18+
*
19+
* @see HttpExporter
20+
* @see HttpExporterBuilder
21+
*/
22+
public interface HttpSender {
23+
24+
/**
25+
* Send an HTTP request, including any retry attempts. {@code onResponse} is called with the HTTP
26+
* response, either a success response or a error response after retries. {@code onError} is
27+
* called when the request could not be executed due to cancellation, connectivity problems, or
28+
* timeout.
29+
*
30+
* @param marshaler the request body marshaler
31+
* @param contentLength the request body content length
32+
* @param onResponse the callback to invoke with the HTTP response
33+
* @param onError the callback to invoke when the HTTP request could not be executed
34+
*/
35+
void send(
36+
Consumer<OutputStream> marshaler,
37+
int contentLength,
38+
Consumer<Response> onResponse,
39+
Consumer<Throwable> onError);
40+
41+
/** Shutdown the sender. */
42+
CompletableResultCode shutdown();
43+
44+
/** The HTTP response. */
45+
interface Response {
46+
47+
/** The HTTP status code. */
48+
int statusCode();
49+
50+
/** The HTTP status message. */
51+
String statusMessage();
52+
53+
/** The HTTP response body. */
54+
byte[] responseBody() throws IOException;
55+
}
56+
}

0 commit comments

Comments
 (0)