Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,16 @@
<!-- OpenTelemetry components, imported as a BOM -->
<!-- opentelemetry-instrumentation-bom-alpha includes the main and alpha-bom inside.-->
<!-- Also includes semantic conventions-->


<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom-alpha</artifactId>
<version>1.56.0-alpha-SNAPSHOT</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-bom-alpha</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler;
import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler;
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigurationException;
import io.opentelemetry.sdk.common.InternalTelemetryVersion;
Expand Down Expand Up @@ -166,7 +163,7 @@ private SpanExporter createOtlpGrpcSpanExporter(OtlpExporterRuntimeConfig export

OtlpExporterTracesConfig tracesConfig = exporterRuntimeConfig.traces();

return new VertxGrpcSpanExporter(new GrpcExporter<TraceRequestMarshaler>(
return new VertxGrpcSpanExporter(new GrpcExporter(
new VertxGrpcSender(
baseUri,
VertxGrpcSender.GRPC_TRACE_SERVICE_NAME,
Expand All @@ -178,7 +175,7 @@ private SpanExporter createOtlpGrpcSpanExporter(OtlpExporterRuntimeConfig export
InternalTelemetryVersion.LATEST,
ComponentId.generateLazy(StandardComponentId.ExporterType.OTLP_GRPC_SPAN_EXPORTER), // use the same as OTel does
MeterProvider::noop,
baseUri.toASCIIString()));
baseUri));
}

private SpanExporter createHttpSpanExporter(OtlpExporterRuntimeConfig exporterRuntimeConfig, Vertx vertx,
Expand All @@ -189,7 +186,7 @@ private SpanExporter createHttpSpanExporter(OtlpExporterRuntimeConfig exporterRu

boolean exportAsJson = false; //TODO: this will be enhanced in the future

return new VertxHttpSpanExporter(new HttpExporter<TraceRequestMarshaler>(
return new VertxHttpSpanExporter(new HttpExporter(
ComponentId.generateLazy(StandardComponentId.ExporterType.OTLP_HTTP_SPAN_EXPORTER),
new VertxHttpSender(
baseUri,
Expand All @@ -202,7 +199,8 @@ private SpanExporter createHttpSpanExporter(OtlpExporterRuntimeConfig exporterRu
vertx),
MeterProvider::noop,
InternalTelemetryVersion.LATEST,
baseUri.toASCIIString()));
baseUri,
false));
}
};
}
Expand Down Expand Up @@ -233,7 +231,7 @@ public MetricExporter apply(SyntheticCreationalContext<MetricExporter> context)
String protocol = metricsConfig.protocol().get();
if (GRPC.equals(protocol)) {
metricExporter = new VertxGrpcMetricExporter(
new GrpcExporter<MetricsRequestMarshaler>(
new GrpcExporter(
new VertxGrpcSender(
baseUri,
VertxGrpcSender.GRPC_METRIC_SERVICE_NAME,
Expand All @@ -245,13 +243,13 @@ public MetricExporter apply(SyntheticCreationalContext<MetricExporter> context)
InternalTelemetryVersion.LATEST,
ComponentId.generateLazy(OTLP_GRPC_METRIC_EXPORTER), // use the same as OTel does
MeterProvider::noop,
baseUri.toASCIIString()),
baseUri),
aggregationTemporalityResolver(metricsConfig),
aggregationResolver(metricsConfig));
} else if (HTTP_PROTOBUF.equals(protocol)) {
boolean exportAsJson = false; //TODO: this will be enhanced in the future
metricExporter = new VertxHttpMetricsExporter(
new HttpExporter<MetricsRequestMarshaler>(
new HttpExporter(
ComponentId.generateLazy(
StandardComponentId.ExporterType.OTLP_HTTP_METRIC_EXPORTER),
new VertxHttpSender(
Expand All @@ -265,7 +263,8 @@ public MetricExporter apply(SyntheticCreationalContext<MetricExporter> context)
vertx.get()),
MeterProvider::noop,
InternalTelemetryVersion.LATEST,
baseUri.toASCIIString()),
baseUri,
false),
aggregationTemporalityResolver(metricsConfig),
aggregationResolver(metricsConfig));
} else {
Expand Down Expand Up @@ -307,7 +306,7 @@ public LogRecordExporter apply(SyntheticCreationalContext<LogRecordExporter> con
String protocol = logsConfig.protocol().get();
if (GRPC.equals(protocol)) {
logRecordExporter = new VertxGrpcLogRecordExporter(
new GrpcExporter<LogsRequestMarshaler>(
new GrpcExporter(
new VertxGrpcSender(
baseUri,
VertxGrpcSender.GRPC_LOG_SERVICE_NAME,
Expand All @@ -320,11 +319,11 @@ public LogRecordExporter apply(SyntheticCreationalContext<LogRecordExporter> con
ComponentId.generateLazy(
StandardComponentId.ExporterType.OTLP_GRPC_LOG_EXPORTER), // use the same as OTel does
MeterProvider::noop,
baseUri.toASCIIString()));
baseUri));
} else if (HTTP_PROTOBUF.equals(protocol)) {
boolean exportAsJson = false; //TODO: this will be enhanced in the future
logRecordExporter = new VertxHttpLogRecordExporter(
new HttpExporter<LogsRequestMarshaler>(
new HttpExporter(
ComponentId.generateLazy(
StandardComponentId.ExporterType.OTLP_HTTP_LOG_EXPORTER),
new VertxHttpSender(
Expand All @@ -338,7 +337,8 @@ public LogRecordExporter apply(SyntheticCreationalContext<LogRecordExporter> con
vertx.get()),
MeterProvider::noop,
InternalTelemetryVersion.LATEST,
baseUri.toASCIIString()));
baseUri,
false));
} else {
throw new IllegalArgumentException(String.format("Unsupported OTLP protocol %s specified. " +
"Please check `quarkus.otel.exporter.otlp.logs.protocol` property", protocol));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import io.opentelemetry.sdk.logs.export.LogRecordExporter;

public class VertxGrpcLogRecordExporter implements LogRecordExporter {
private final GrpcExporter<LogsRequestMarshaler> delegate;
private final GrpcExporter delegate;

public VertxGrpcLogRecordExporter(GrpcExporter<LogsRequestMarshaler> delegate) {
public VertxGrpcLogRecordExporter(GrpcExporter delegate) {
this.delegate = delegate;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import io.opentelemetry.sdk.logs.export.LogRecordExporter;

public class VertxHttpLogRecordExporter implements LogRecordExporter {
private final HttpExporter<LogsRequestMarshaler> delegate;
private final HttpExporter delegate;

public VertxHttpLogRecordExporter(HttpExporter<LogsRequestMarshaler> delegate) {
public VertxHttpLogRecordExporter(HttpExporter delegate) {
this.delegate = delegate;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

public class VertxGrpcMetricExporter implements MetricExporter {

private final GrpcExporter<MetricsRequestMarshaler> delegate;
private final GrpcExporter delegate;
private final AggregationTemporalitySelector aggregationTemporalitySelector;
private final DefaultAggregationSelector defaultAggregationSelector;

public VertxGrpcMetricExporter(GrpcExporter<MetricsRequestMarshaler> grpcExporter,
public VertxGrpcMetricExporter(GrpcExporter grpcExporter,
AggregationTemporalitySelector aggregationTemporalitySelector,
DefaultAggregationSelector defaultAggregationSelector) {
this.delegate = grpcExporter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

public class VertxHttpMetricsExporter implements MetricExporter {

private final HttpExporter<MetricsRequestMarshaler> delegate;
private final HttpExporter delegate;
private final AggregationTemporalitySelector aggregationTemporalitySelector;
private final DefaultAggregationSelector defaultAggregationSelector;

public VertxHttpMetricsExporter(HttpExporter<MetricsRequestMarshaler> delegate,
public VertxHttpMetricsExporter(HttpExporter delegate,
AggregationTemporalitySelector aggregationTemporalitySelector,
DefaultAggregationSelector defaultAggregationSelector) {
this.delegate = delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -17,16 +18,19 @@
import java.util.logging.Logger;

import io.netty.handler.codec.http.QueryStringDecoder;
import io.opentelemetry.exporter.internal.grpc.GrpcResponse;
import io.opentelemetry.exporter.internal.grpc.GrpcSender;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.grpc.GrpcMessageWriter;
import io.opentelemetry.exporter.grpc.GrpcResponse;
import io.opentelemetry.exporter.grpc.GrpcSender;
import io.opentelemetry.exporter.grpc.GrpcStatusCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.quarkus.opentelemetry.runtime.exporter.otlp.OTelExporterUtil;
import io.quarkus.vertx.core.runtime.BufferOutputStream;
import io.smallrye.common.annotation.SuppressForbidden;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientOptions;
Expand Down Expand Up @@ -88,15 +92,17 @@ public VertxGrpcSender(
}

@Override
public void send(Marshaler request, Consumer onSuccess, Consumer onError) {
public void send(GrpcMessageWriter messageWriter,
Consumer<GrpcResponse> onResponse,
Consumer<Throwable> onError) {
if (isShutdown.get()) {
return;
}

final String marshalerType = request.getClass().getSimpleName();
final String marshalerType = messageWriter.getClass().getSimpleName();
var onSuccessHandler = new ClientRequestOnSuccessHandler(client, server, headers, compressionEnabled,
request,
loggedUnimplemented, logger, marshalerType, onSuccess, onError, 1, grpcEndpointPath,
messageWriter,
loggedUnimplemented, logger, marshalerType, onResponse, onError, 1, grpcEndpointPath,
isShutdown::get, exportTimeout);

initiateSend(client, server, MAX_ATTEMPTS, onSuccessHandler, exportTimeout, new Consumer<>() {
Expand Down Expand Up @@ -195,11 +201,11 @@ private static final class ClientRequestOnSuccessHandler implements Handler<Grpc
private final Map<String, String> headers;
private final boolean compressionEnabled;

private final Marshaler marshaler;
private final GrpcMessageWriter messageWriter;
private final AtomicBoolean loggedUnimplemented;
private final ThrottlingLogger logger;
private final String type;
private final Consumer<GrpcResponse> onSuccess;
private final Consumer<GrpcResponse> onResponse;
private final Consumer<Throwable> onError;
private final String grpcEndpointPath;

Expand All @@ -211,11 +217,11 @@ public ClientRequestOnSuccessHandler(GrpcClient client,
SocketAddress server,
Map<String, String> headers,
boolean compressionEnabled,
Marshaler marshaler,
GrpcMessageWriter messageWriter,
AtomicBoolean loggedUnimplemented,
ThrottlingLogger logger,
String type,
Consumer<GrpcResponse> onSuccess,
Consumer<GrpcResponse> onResponse,
Consumer<Throwable> onError,
int attemptNumber,
String grpcEndpointPath,
Expand All @@ -226,11 +232,11 @@ public ClientRequestOnSuccessHandler(GrpcClient client,
this.grpcEndpointPath = grpcEndpointPath;
this.headers = headers;
this.compressionEnabled = compressionEnabled;
this.marshaler = marshaler;
this.messageWriter = messageWriter;
this.loggedUnimplemented = loggedUnimplemented;
this.logger = logger;
this.type = type;
this.onSuccess = onSuccess;
this.onResponse = onResponse;
this.onError = onError;
this.attemptNumber = attemptNumber;
this.isShutdown = isShutdown;
Expand All @@ -255,10 +261,10 @@ public void handle(GrpcClientRequest<Buffer, Buffer> request) {
}

try {
int messageSize = marshaler.getBinarySerializedSize();
int messageSize = messageWriter.contentLength();
Buffer buffer = Buffer.buffer(messageSize);
var os = new BufferOutputStream(buffer);
marshaler.writeBinaryTo(os);
messageWriter.writeMessage(os);
request.send(buffer).onSuccess(new Handler<>() {
@Override
public void handle(GrpcClientResponse<Buffer, Buffer> response) {
Expand Down Expand Up @@ -291,7 +297,44 @@ public void handle(GrpcError error) {
public void handle(Void ignored) {
GrpcStatus status = getStatus(response);
if (status == GrpcStatus.OK) {
onSuccess.accept(GrpcResponse.create(status.code, status.toString()));
// onResponse.accept(GrpcResponse.create(status.code, status.toString()));
onResponse.accept(new GrpcResponse() {
@Override
public GrpcStatusCode getStatusCode() {
return GrpcStatusCode.fromValue(status.code);
}

@Override
public String getStatusDescription() {
return status.name();
}

@Override
public byte[] getResponseMessage() {
if (response == null) {
return null;
}
Promise<String> promise = Promise.promise();
StringBuilder sb = new StringBuilder();
response.handler(msg -> {
sb.append(msg.toString());
});
response.endHandler(v -> {
// Done reading stream
promise.complete(sb.toString());
});
response.exceptionHandler(promise::fail);
String result = promise.future()
.timeout(exportTimeout.toMillis(), MILLISECONDS)
.recover(throwable -> Future.succeededFuture(
"Response error: " + throwable.getMessage()))
.result();
if (result == null || result.isEmpty()) {
return null;
}
return result.getBytes(StandardCharsets.UTF_8);
}
});
} else {
handleError(status, response);
}
Expand Down Expand Up @@ -447,8 +490,8 @@ private void failOnClientRequest(Throwable t, Consumer<Throwable> onError, int a
}

public ClientRequestOnSuccessHandler newAttempt() {
return new ClientRequestOnSuccessHandler(client, server, headers, compressionEnabled, marshaler,
loggedUnimplemented, logger, type, onSuccess, onError, attemptNumber + 1,
return new ClientRequestOnSuccessHandler(client, server, headers, compressionEnabled, messageWriter,
loggedUnimplemented, logger, type, onResponse, onError, attemptNumber + 1,
grpcEndpointPath, isShutdown, exportTimeout);
}
}
Expand Down
Loading
Loading