77
88import static io .opentelemetry .instrumentation .testing .junit .db .SemconvStabilityUtil .maybeStable ;
99import static io .opentelemetry .sdk .testing .assertj .OpenTelemetryAssertions .equalTo ;
10+ import static io .opentelemetry .semconv .HttpAttributes .HTTP_REQUEST_METHOD ;
11+ import static io .opentelemetry .semconv .HttpAttributes .HTTP_RESPONSE_STATUS_CODE ;
12+ import static io .opentelemetry .semconv .NetworkAttributes .NETWORK_PEER_ADDRESS ;
13+ import static io .opentelemetry .semconv .NetworkAttributes .NETWORK_PEER_PORT ;
14+ import static io .opentelemetry .semconv .NetworkAttributes .NETWORK_PROTOCOL_VERSION ;
15+ import static io .opentelemetry .semconv .ServerAttributes .SERVER_ADDRESS ;
16+ import static io .opentelemetry .semconv .ServerAttributes .SERVER_PORT ;
17+ import static io .opentelemetry .semconv .UrlAttributes .URL_FULL ;
1018import static io .opentelemetry .semconv .incubating .DbIncubatingAttributes .DB_OPERATION ;
1119import static io .opentelemetry .semconv .incubating .DbIncubatingAttributes .DB_STATEMENT ;
1220import static io .opentelemetry .semconv .incubating .DbIncubatingAttributes .DB_SYSTEM ;
1927import io .opentelemetry .testing .internal .armeria .common .HttpStatus ;
2028import io .opentelemetry .testing .internal .armeria .common .MediaType ;
2129import io .opentelemetry .testing .internal .armeria .testing .junit5 .server .mock .MockWebServerExtension ;
22- import java .io .IOException ;
2330import java .util .concurrent .CompletableFuture ;
31+ import java .util .concurrent .CountDownLatch ;
2432import java .util .concurrent .atomic .AtomicReference ;
2533import org .junit .jupiter .api .AfterAll ;
2634import org .junit .jupiter .api .BeforeAll ;
4250import software .amazon .awssdk .regions .Region ;
4351import software .amazon .awssdk .utils .AttributeMap ;
4452
45- @ SuppressWarnings ("deprecation" )
53+ @ SuppressWarnings ("deprecation" ) // using deprecated semconv
4654public class OpenSearchJavaAwsSdk2TransportTest extends AbstractOpenSearchJavaTest {
4755
4856 protected static final MockWebServerExtension server = new MockWebServerExtension ();
@@ -60,6 +68,7 @@ void setUp() throws Exception {
6068 server .start ();
6169 openSearchClient = buildOpenSearchClient ();
6270 openSearchAsyncClient = buildOpenSearchAsyncClient ();
71+ httpHost = server .httpsUri ();
6372 }
6473
6574 @ AfterAll
@@ -140,35 +149,35 @@ protected OpenSearchAsyncClient buildOpenSearchAsyncClient() throws Exception {
140149 return new OpenSearchAsyncClient (transport );
141150 }
142151
143- @ Test
144- @ Override
145- void shouldGetStatusWithTraces () throws IOException {
146- HealthResponse healthResponse = openSearchClient .cluster ().health ();
147- assertThat (healthResponse ).isNotNull ();
148-
149- getTesting ()
150- .waitAndAssertTraces (
151- trace ->
152- trace .hasSpansSatisfyingExactly (
153- span ->
154- span .hasName ("GET" )
155- .hasKind (SpanKind .CLIENT )
156- .hasAttributesSatisfyingExactly (
157- equalTo (maybeStable (DB_SYSTEM ), "opensearch" ),
158- equalTo (maybeStable (DB_OPERATION ), "GET" ),
159- equalTo (maybeStable (DB_STATEMENT ), "GET /_cluster/health" ))));
160- }
161-
162152 @ Test
163153 @ Override
164154 void shouldGetStatusAsyncWithTraces () throws Exception {
165155 AtomicReference <CompletableFuture <HealthResponse >> responseCompletableFuture =
166156 new AtomicReference <>();
157+ CountDownLatch countDownLatch = new CountDownLatch (1 );
167158
168159 getTesting ()
169160 .runWithSpan (
170161 "client" ,
171- () -> responseCompletableFuture .set (openSearchAsyncClient .cluster ().health ()));
162+ () -> {
163+ CompletableFuture <HealthResponse > future = openSearchAsyncClient .cluster ().health ();
164+ responseCompletableFuture .set (future );
165+
166+ future .whenComplete (
167+ (response , throwable ) -> {
168+ getTesting ()
169+ .runWithSpan (
170+ "callback" ,
171+ () -> {
172+ if (throwable != null ) {
173+ throw new RuntimeException (throwable );
174+ }
175+ countDownLatch .countDown ();
176+ });
177+ });
178+ });
179+
180+ countDownLatch .await ();
172181 HealthResponse healthResponse = responseCompletableFuture .get ().get ();
173182 assertThat (healthResponse ).isNotNull ();
174183
@@ -184,6 +193,29 @@ void shouldGetStatusAsyncWithTraces() throws Exception {
184193 .hasAttributesSatisfyingExactly (
185194 equalTo (maybeStable (DB_SYSTEM ), "opensearch" ),
186195 equalTo (maybeStable (DB_OPERATION ), "GET" ),
187- equalTo (maybeStable (DB_STATEMENT ), "GET /_cluster/health" ))));
196+ equalTo (maybeStable (DB_STATEMENT ), "GET /_cluster/health" )),
197+ span ->
198+ span .hasName ("GET" )
199+ .hasKind (SpanKind .CLIENT )
200+ .hasParent (trace .getSpan (1 ))
201+ .hasAttributesSatisfyingExactly (
202+ equalTo (NETWORK_PROTOCOL_VERSION , "1.1" ),
203+ equalTo (SERVER_ADDRESS , httpHost .getHost ()),
204+ equalTo (SERVER_PORT , httpHost .getPort ()),
205+ equalTo (HTTP_REQUEST_METHOD , "GET" ),
206+ equalTo (URL_FULL , httpHost + "/_cluster/health" ),
207+ equalTo (
208+ NETWORK_PEER_ADDRESS ,
209+ httpHost .getHost ()), // Netty 4.1 Instrumentation collects
210+ // NETWORK_PEER_ADDRESS
211+ equalTo (
212+ NETWORK_PEER_PORT ,
213+ httpHost .getPort ()), // Netty 4.1 Instrumentation collects
214+ // NETWORK_PEER_PORT
215+ equalTo (HTTP_RESPONSE_STATUS_CODE , 200L )),
216+ span ->
217+ span .hasName ("callback" )
218+ .hasKind (SpanKind .INTERNAL )
219+ .hasParent (trace .getSpan (1 ))));
188220 }
189221}
0 commit comments