diff --git a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporter.java b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporter.java index 0df00bdfc4..ed49b3c723 100644 --- a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporter.java +++ b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporter.java @@ -80,7 +80,8 @@ public static void createAndRegister(OcAgentTraceExporterConfiguration configura configuration.getSslContext(), configuration.getRetryInterval(), configuration.getEnableConfig(), - configuration.getDeadline()); + configuration.getDeadline(), + configuration.getHeaders()); registerInternal(newHandler); } } diff --git a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfiguration.java b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfiguration.java index 3e2ed86379..3d712f4114 100644 --- a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfiguration.java +++ b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfiguration.java @@ -19,6 +19,7 @@ import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import io.netty.handler.ssl.SslContext; import io.opencensus.common.Duration; import javax.annotation.Nullable; @@ -112,6 +113,9 @@ public abstract class OcAgentTraceExporterConfiguration { */ public abstract Duration getDeadline(); + /** Returns custom headers to attach as gRPC metadata. */ + public abstract ImmutableMap getHeaders(); + /** * Returns a new {@link Builder}. * @@ -210,6 +214,13 @@ public abstract static class Builder { abstract Duration getDeadline(); + abstract ImmutableMap.Builder headersBuilder(); + + public final Builder addHeader(String key, String value) { + headersBuilder().put(key, value); + return this; + } + /** * Builds a {@link OcAgentTraceExporterConfiguration}. * diff --git a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterHandler.java b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterHandler.java index edc5e47fe6..c7ea267131 100644 --- a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterHandler.java +++ b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterHandler.java @@ -18,8 +18,10 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.MetadataUtils; import io.netty.handler.ssl.SslContext; import io.opencensus.common.Duration; import io.opencensus.exporter.trace.util.TimeLimitedHandler; @@ -28,6 +30,7 @@ import io.opencensus.proto.agent.trace.v1.TraceServiceGrpc; import io.opencensus.trace.export.SpanData; import java.util.Collection; +import java.util.Map; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -42,6 +45,7 @@ final class OcAgentTraceExporterHandler extends TimeLimitedHandler { private final Node node; private final Boolean useInsecure; @Nullable private final SslContext sslContext; + private final Map headers; @javax.annotation.Nullable private OcAgentTraceServiceExportRpcHandler exportRpcHandler; // Thread-safe @@ -53,12 +57,14 @@ final class OcAgentTraceExporterHandler extends TimeLimitedHandler { @Nullable SslContext sslContext, Duration retryInterval, boolean enableConfig, - Duration deadline) { + Duration deadline, + Map headers) { super(deadline, EXPORT_SPAN_NAME); this.endPoint = endPoint; this.node = OcAgentNodeUtils.getNodeInfo(serviceName); this.useInsecure = useInsecure; this.sslContext = sslContext; + this.headers = headers; } @Override @@ -67,7 +73,7 @@ public void timeLimitedExport(Collection spanDataList) { // If not connected, try to initiate a new connection when a new batch of spans arrive. // Export RPC doesn't respect the retry interval. TraceServiceGrpc.TraceServiceStub stub = - getTraceServiceStub(endPoint, useInsecure, sslContext); + getTraceServiceStub(endPoint, useInsecure, sslContext, headers); exportRpcHandler = createExportRpcHandlerAndConnect(stub, node); } @@ -104,7 +110,7 @@ private static OcAgentTraceServiceExportRpcHandler createExportRpcHandlerAndConn // Creates a TraceServiceStub with the given parameters. // One stub can be used for both Export RPC and Config RPC. private static TraceServiceGrpc.TraceServiceStub getTraceServiceStub( - String endPoint, Boolean useInsecure, SslContext sslContext) { + String endPoint, Boolean useInsecure, SslContext sslContext, Map headers) { ManagedChannelBuilder channelBuilder; if (useInsecure) { channelBuilder = ManagedChannelBuilder.forTarget(endPoint).usePlaintext(); @@ -115,6 +121,11 @@ private static TraceServiceGrpc.TraceServiceStub getTraceServiceStub( .sslContext(sslContext); } ManagedChannel channel = channelBuilder.build(); - return TraceServiceGrpc.newStub(channel); + Metadata metadata = new Metadata(); + for (Map.Entry e : headers.entrySet()) { + metadata.put(Metadata.Key.of(e.getKey(), Metadata.ASCII_STRING_MARSHALLER), e.getValue()); + } + return TraceServiceGrpc.newStub(channel) + .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)); } } diff --git a/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfigurationTest.java b/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfigurationTest.java index daca516d82..e986af7845 100644 --- a/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfigurationTest.java +++ b/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfigurationTest.java @@ -60,6 +60,7 @@ public void setAndGet() throws SSLException { .setRetryInterval(oneMinute) .setEnableConfig(false) .setDeadline(oneMinute) + .addHeader("foo", "bar") .build(); assertThat(configuration.getEndPoint()).isEqualTo("192.168.0.1:50051"); assertThat(configuration.getServiceName()).isEqualTo("service"); @@ -68,5 +69,6 @@ public void setAndGet() throws SSLException { assertThat(configuration.getRetryInterval()).isEqualTo(oneMinute); assertThat(configuration.getEnableConfig()).isFalse(); assertThat(configuration.getDeadline()).isEqualTo(oneMinute); + assertThat(configuration.getHeaders()).containsExactly("foo", "bar"); } } diff --git a/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterIntegrationTest.java b/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterIntegrationTest.java index 6f7e29c5f3..83ffc4645d 100644 --- a/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterIntegrationTest.java +++ b/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterIntegrationTest.java @@ -20,9 +20,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; -import io.grpc.BindableService; -import io.grpc.Server; -import io.grpc.ServerBuilder; +import io.grpc.*; import io.grpc.netty.NettyServerBuilder; import io.opencensus.common.Scope; import io.opencensus.proto.agent.common.v1.Node; @@ -37,13 +35,9 @@ import io.opencensus.trace.samplers.Samplers; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.Executor; +import javax.annotation.concurrent.GuardedBy; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -56,15 +50,22 @@ public class OcAgentTraceExporterIntegrationTest { private Server agent; private FakeOcAgentTraceServiceGrpcImpl fakeOcAgentTraceServiceGrpc; + private HeaderInterceptor headerInterceptor; private final Tracer tracer = Tracing.getTracer(); private static final String SERVICE_NAME = "integration-test"; + private static final String TEST_METADATA_HEADER = "test-header"; + private static final String TEST_METADATA_VALUE = "test-value"; @Before public void setUp() throws IOException { fakeOcAgentTraceServiceGrpc = new FakeOcAgentTraceServiceGrpcImpl(); + headerInterceptor = new HeaderInterceptor(); agent = - getServer(OcAgentTraceExporterConfiguration.DEFAULT_END_POINT, fakeOcAgentTraceServiceGrpc); + getServer( + OcAgentTraceExporterConfiguration.DEFAULT_END_POINT, + fakeOcAgentTraceServiceGrpc, + headerInterceptor); } @After @@ -92,6 +93,7 @@ public void testExportSpans() throws InterruptedException, IOException { .setServiceName(SERVICE_NAME) .setUseInsecure(true) .setEnableConfig(false) + .addHeader(TEST_METADATA_HEADER, TEST_METADATA_VALUE) .build()); // Create one root span and 5 children. @@ -164,6 +166,13 @@ public void testExportSpans() throws InterruptedException, IOException { for (int i = 0; i < 8; i++) { assertThat(exportedSpanNames).contains("second-iteration-child-" + i); } + + for (Metadata metadata : headerInterceptor.getReceivedMetadata()) { + Metadata.Key key = + Metadata.Key.of(TEST_METADATA_HEADER, Metadata.ASCII_STRING_MARSHALLER); + assertThat(metadata.containsKey(key)).isTrue(); + assertThat(metadata.get(key)).isEqualTo(TEST_METADATA_VALUE); + } } @Test @@ -195,11 +204,13 @@ private void doWork(String spanName, int i) { } } - private static Server getServer(String endPoint, BindableService service) throws IOException { + private static Server getServer( + String endPoint, BindableService service, HeaderInterceptor headerInterceptor) + throws IOException { ServerBuilder builder = NettyServerBuilder.forAddress(parseEndpoint(endPoint)); Executor executor = MoreExecutors.directExecutor(); builder.executor(executor); - return builder.addService(service).build(); + return builder.addService(service).intercept(headerInterceptor).build(); } private static InetSocketAddress parseEndpoint(String endPoint) { @@ -212,4 +223,24 @@ private static InetSocketAddress parseEndpoint(String endPoint) { return new InetSocketAddress("localhost", 55678); } } + + private static class HeaderInterceptor implements ServerInterceptor { + @GuardedBy("this") + private final List receivedMetadata = new ArrayList<>(); + + @Override + public ServerCall.Listener interceptCall( + ServerCall call, Metadata headers, ServerCallHandler next) { + addReceivedMetadata(headers); + return next.startCall(call, headers); + } + + private synchronized void addReceivedMetadata(Metadata metadata) { + receivedMetadata.add(metadata); + } + + synchronized List getReceivedMetadata() { + return Collections.unmodifiableList(receivedMetadata); + } + } }