33
44package com .azure .monitor .opentelemetry .exporter ;
55
6+ import com .azure .core .credential .TokenCredential ;
7+ import com .azure .core .http .HttpPipelineCallContext ;
8+ import com .azure .core .http .HttpPipelineNextPolicy ;
9+ import com .azure .core .http .HttpPipelineNextSyncPolicy ;
10+ import com .azure .core .http .HttpResponse ;
611import com .azure .core .http .policy .HttpPipelinePolicy ;
712import com .azure .core .test .annotation .LiveOnly ;
13+ import com .azure .core .tracing .opentelemetry .OpenTelemetryTracingOptions ;
14+ import com .azure .core .util .ClientOptions ;
815import com .azure .core .util .FluxUtil ;
16+ import com .azure .core .util .logging .ClientLogger ;
17+ import com .azure .core .util .Configuration ;
918import com .azure .messaging .eventhubs .EventData ;
1019import com .azure .messaging .eventhubs .EventHubClientBuilder ;
1120import com .azure .messaging .eventhubs .EventHubProducerAsyncClient ;
1423import com .azure .messaging .eventhubs .LoadBalancingStrategy ;
1524import com .azure .messaging .eventhubs .checkpointstore .blob .BlobCheckpointStore ;
1625import com .azure .messaging .eventhubs .models .CreateBatchOptions ;
26+ import com .azure .monitor .opentelemetry .exporter .implementation .localstorage .LocalStorageTelemetryPipelineListener ;
27+ import com .azure .monitor .opentelemetry .exporter .implementation .models .MonitorDomain ;
28+ import com .azure .monitor .opentelemetry .exporter .implementation .models .RemoteDependencyData ;
29+ import com .azure .monitor .opentelemetry .exporter .implementation .models .TelemetryItem ;
1730import com .azure .monitor .opentelemetry .exporter .implementation .utils .TestUtils ;
1831import com .azure .storage .blob .BlobContainerAsyncClient ;
1932import com .azure .storage .blob .BlobContainerClientBuilder ;
33+ import io .opentelemetry .api .OpenTelemetry ;
2034import io .opentelemetry .api .trace .Span ;
2135import io .opentelemetry .api .trace .Tracer ;
2236import io .opentelemetry .context .Scope ;
2337import org .junit .jupiter .api .Disabled ;
2438import org .junit .jupiter .api .Test ;
2539import reactor .core .publisher .Mono ;
40+ import reactor .test .StepVerifier ;
2641
2742import java .nio .charset .StandardCharsets ;
43+ import java .time .Duration ;
44+ import java .util .List ;
2845import java .util .concurrent .CountDownLatch ;
2946import java .util .concurrent .TimeUnit ;
3047
@@ -37,38 +54,80 @@ public class EventHubsExporterIntegrationTest extends MonitorExporterClientTestB
3754 private static final String STORAGE_CONNECTION_STRING = System .getenv ("STORAGE_CONNECTION_STRING" );
3855 private static final String CONTAINER_NAME = System .getenv ("STORAGE_CONTAINER_NAME" );
3956
57+ private static final ClientLogger LOGGER = new ClientLogger (EventHubsExporterIntegrationTest .class );
58+
59+ private TokenCredential credential ;
60+
61+ @ Override
62+ public void beforeTest () {
63+ super .beforeTest ();
64+ credential = TokenCredentialUtil .getTestTokenCredential (interceptorManager );
65+ }
66+
4067 @ Test
68+ @ SuppressWarnings ("try" )
4169 public void producerTest () throws InterruptedException {
70+ String ehNamespace = Configuration .getGlobalConfiguration ().get ("AZURE_EVENTHUBS_FULLY_QUALIFIED_DOMAIN_NAME" );
71+ String ehName = Configuration .getGlobalConfiguration ().get ("AZURE_EVENTHUBS_EVENT_HUB_NAME" );
72+
4273 CountDownLatch exporterCountDown = new CountDownLatch (2 );
4374 String spanName = "event-hubs-producer-testing" ;
44- HttpPipelinePolicy validationPolicy = (context , next ) -> {
45- Mono <String > asyncString = FluxUtil .collectBytesInByteBufferStream (context .getHttpRequest ().getBody ())
46- .map (bytes -> new String (bytes , StandardCharsets .UTF_8 ));
47- asyncString .subscribe (value -> {
48- if (value .contains (spanName )) {
49- exporterCountDown .countDown ();
50- }
51- if (value .contains ("EventHubs.send" )) {
52- exporterCountDown .countDown ();
75+ HttpPipelinePolicy validationPolicy = new HttpPipelinePolicy () {
76+ @ Override
77+ public Mono <HttpResponse > process (HttpPipelineCallContext context , HttpPipelineNextPolicy next ) {
78+ checkTelemetry (context );
79+ return next .process ();
80+ }
81+
82+ @ Override
83+ public HttpResponse processSync (HttpPipelineCallContext context , HttpPipelineNextSyncPolicy next ) {
84+ checkTelemetry (context );
85+ return next .processSync ();
86+ }
87+
88+ private void checkTelemetry (HttpPipelineCallContext context ) {
89+ byte [] asyncBytes = LocalStorageTelemetryPipelineListener
90+ .ungzip (context .getHttpRequest ().getBodyAsBinaryData ().toBytes ());
91+ List <TelemetryItem > telemetryItems = TestUtils .deserialize (asyncBytes );
92+
93+ for (TelemetryItem telemetryItem : telemetryItems ) {
94+ MonitorDomain monitorDomain = telemetryItem .getData ().getBaseData ();
95+ RemoteDependencyData remoteDependencyData = TestUtils .toRemoteDependencyData (monitorDomain );
96+ String remoteDependencyName = remoteDependencyData .getName ();
97+ if (remoteDependencyName .contains (spanName )) {
98+ exporterCountDown .countDown ();
99+ LOGGER .info ("Count down " + spanName );
100+ } else if (("send " + ehName ).equals (remoteDependencyName )) {
101+ exporterCountDown .countDown ();
102+ LOGGER .info ("Count down eventHubs send" );
103+ } else {
104+ LOGGER .info ("remoteDependencyName = " + remoteDependencyName );
105+ }
53106 }
54- });
55- return next .process ();
107+ }
56108 };
57- Tracer tracer = TestUtils .createOpenTelemetrySdk (getHttpPipeline (validationPolicy )).getTracer ("Sample" );
58- EventHubProducerAsyncClient producer
59- = new EventHubClientBuilder ().connectionString (CONNECTION_STRING ).buildAsyncProducerClient ();
60- Span span = tracer .spanBuilder (spanName ).startSpan ();
61- Scope scope = span .makeCurrent ();
62- try {
63- producer .createBatch ().flatMap (batch -> {
64- batch .tryAdd (new EventData ("test event" ));
65- return producer .send (batch );
66- }).subscribe ();
67- } finally {
68- span .end ();
69- scope .close ();
109+
110+ OpenTelemetry otel = TestUtils .createOpenTelemetrySdk (getHttpPipeline (validationPolicy ));
111+ Tracer tracer = otel .getTracer ("Sample" );
112+
113+ try (EventHubProducerAsyncClient producer = new EventHubClientBuilder ().credential (credential )
114+ .fullyQualifiedNamespace (ehNamespace )
115+ .eventHubName (ehName )
116+ .clientOptions (
117+ new ClientOptions ().setTracingOptions (new OpenTelemetryTracingOptions ().setOpenTelemetry (otel )))
118+ .buildAsyncProducerClient ()) {
119+
120+ Span span = tracer .spanBuilder (spanName ).startSpan ();
121+ try (Scope scope = span .makeCurrent ()) {
122+ StepVerifier .create (producer .createBatch ().flatMap (batch -> {
123+ batch .tryAdd (new EventData ("test event" ));
124+ return producer .send (batch );
125+ })).expectComplete ().verify (Duration .ofSeconds (60 ));
126+ } finally {
127+ span .end ();
128+ }
70129 }
71- assertTrue (exporterCountDown .await (5 , TimeUnit .SECONDS ));
130+ assertTrue (exporterCountDown .await (20 , TimeUnit .SECONDS ));
72131 }
73132
74133 @ Disabled ("Processor integration tests require separate consumer group to not have partition contention in CI - https://github.com/Azure/azure-sdk-for-java/issues/23567" )
0 commit comments