Skip to content

Commit 9339c15

Browse files
heyamstrask
andauthored
Fix Cassandra target (#10357)
Co-authored-by: Trask Stalnaker <[email protected]>
1 parent f7312c3 commit 9339c15

File tree

8 files changed

+183
-12
lines changed

8 files changed

+183
-12
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;
7+
8+
import com.datastax.driver.core.ExecutionInfo;
9+
import io.opentelemetry.api.common.AttributesBuilder;
10+
import io.opentelemetry.context.Context;
11+
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
12+
import io.opentelemetry.semconv.SemanticAttributes;
13+
import javax.annotation.Nullable;
14+
15+
public class CassandraAttributesExtractor
16+
implements AttributesExtractor<CassandraRequest, ExecutionInfo> {
17+
@Override
18+
public void onStart(AttributesBuilder attributes, Context context, CassandraRequest request) {}
19+
20+
@Override
21+
public void onEnd(
22+
AttributesBuilder attributes,
23+
Context context,
24+
CassandraRequest request,
25+
@Nullable ExecutionInfo executionInfo,
26+
@Nullable Throwable error) {
27+
if (executionInfo == null) {
28+
return;
29+
}
30+
attributes.put(
31+
SemanticAttributes.SERVER_ADDRESS,
32+
executionInfo.getQueriedHost().getSocketAddress().getHostString());
33+
attributes.put(
34+
SemanticAttributes.SERVER_PORT,
35+
executionInfo.getQueriedHost().getSocketAddress().getPort());
36+
}
37+
}

instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraSingletons.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public final class CassandraSingletons {
3838
.build())
3939
.addAttributesExtractor(
4040
NetworkAttributesExtractor.create(new CassandraNetworkAttributesGetter()))
41+
.addAttributesExtractor(new CassandraAttributesExtractor())
4142
.buildInstrumenter(SpanKindExtractor.alwaysClient());
4243
}
4344

instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/CassandraClientTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ void syncTest(Parameter parameter) {
8686
.hasNoParent()
8787
.hasAttributesSatisfyingExactly(
8888
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
89+
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
90+
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
8991
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
9092
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
9193
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),
@@ -99,6 +101,8 @@ void syncTest(Parameter parameter) {
99101
.hasNoParent()
100102
.hasAttributesSatisfyingExactly(
101103
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
104+
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
105+
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
102106
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
103107
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
104108
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),
@@ -116,6 +120,8 @@ void syncTest(Parameter parameter) {
116120
.hasNoParent()
117121
.hasAttributesSatisfyingExactly(
118122
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
123+
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
124+
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
119125
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
120126
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
121127
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),
@@ -153,6 +159,8 @@ void asyncTest(Parameter parameter) {
153159
.hasNoParent()
154160
.hasAttributesSatisfyingExactly(
155161
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
162+
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
163+
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
156164
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
157165
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
158166
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),
@@ -167,6 +175,8 @@ void asyncTest(Parameter parameter) {
167175
.hasParent(trace.getSpan(0))
168176
.hasAttributesSatisfyingExactly(
169177
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
178+
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
179+
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
170180
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
171181
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
172182
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),
@@ -189,6 +199,8 @@ void asyncTest(Parameter parameter) {
189199
.hasParent(trace.getSpan(0))
190200
.hasAttributesSatisfyingExactly(
191201
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
202+
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
203+
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
192204
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
193205
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
194206
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),

instrumentation/cassandra/cassandra-4-common/testing/src/main/java/io/opentelemetry/cassandra/v4/common/AbstractCassandraTest.java

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@
1919
import static io.opentelemetry.semconv.SemanticAttributes.DB_STATEMENT;
2020
import static io.opentelemetry.semconv.SemanticAttributes.DB_SYSTEM;
2121
import static io.opentelemetry.semconv.SemanticAttributes.NETWORK_TYPE;
22+
import static io.opentelemetry.semconv.SemanticAttributes.SERVER_ADDRESS;
23+
import static io.opentelemetry.semconv.SemanticAttributes.SERVER_PORT;
24+
import static org.assertj.core.api.Assertions.assertThat;
2225
import static org.junit.jupiter.api.Named.named;
2326

2427
import com.datastax.oss.driver.api.core.CqlSession;
28+
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
2529
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
2630
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
2731
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
@@ -90,8 +94,20 @@ void syncTest(Parameter parameter) {
9094
.hasKind(SpanKind.CLIENT)
9195
.hasNoParent()
9296
.hasAttributesSatisfyingExactly(
93-
equalTo(NETWORK_TYPE, "ipv4"),
94-
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
97+
satisfies(
98+
NETWORK_TYPE,
99+
val ->
100+
val.satisfiesAnyOf(
101+
v -> assertThat(v).isEqualTo("ipv4"),
102+
v -> assertThat(v).isEqualTo("ipv6"))),
103+
equalTo(SERVER_ADDRESS, "localhost"),
104+
equalTo(SERVER_PORT, cassandraPort),
105+
satisfies(
106+
NetworkAttributes.NETWORK_PEER_ADDRESS,
107+
val ->
108+
val.satisfiesAnyOf(
109+
v -> assertThat(v).isEqualTo("127.0.0.1"),
110+
v -> assertThat(v).isEqualTo("0:0:0:0:0:0:0:1"))),
95111
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
96112
equalTo(DB_SYSTEM, "cassandra"),
97113
equalTo(DB_NAME, parameter.keyspace),
@@ -137,8 +153,20 @@ void asyncTest(Parameter parameter) throws Exception {
137153
.hasKind(SpanKind.CLIENT)
138154
.hasParent(trace.getSpan(0))
139155
.hasAttributesSatisfyingExactly(
140-
equalTo(NETWORK_TYPE, "ipv4"),
141-
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
156+
satisfies(
157+
NETWORK_TYPE,
158+
val ->
159+
val.satisfiesAnyOf(
160+
v -> assertThat(v).isEqualTo("ipv4"),
161+
v -> assertThat(v).isEqualTo("ipv6"))),
162+
equalTo(SERVER_ADDRESS, "localhost"),
163+
equalTo(SERVER_PORT, cassandraPort),
164+
satisfies(
165+
NetworkAttributes.NETWORK_PEER_ADDRESS,
166+
val ->
167+
val.satisfiesAnyOf(
168+
v -> assertThat(v).isEqualTo("127.0.0.1"),
169+
v -> assertThat(v).isEqualTo("0:0:0:0:0:0:0:1"))),
142170
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
143171
equalTo(DB_SYSTEM, "cassandra"),
144172
equalTo(DB_NAME, parameter.keyspace),
@@ -302,11 +330,15 @@ protected CqlSession getSession(String keyspace) {
302330
.withDuration(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, Duration.ofSeconds(10))
303331
.build();
304332
return wrap(
305-
CqlSession.builder()
306-
.addContactPoint(new InetSocketAddress("localhost", cassandraPort))
333+
addContactPoint(CqlSession.builder())
307334
.withConfigLoader(configLoader)
308335
.withLocalDatacenter("datacenter1")
309336
.withKeyspace(keyspace)
310337
.build());
311338
}
339+
340+
protected CqlSessionBuilder addContactPoint(CqlSessionBuilder sessionBuilder) {
341+
sessionBuilder.addContactPoint(new InetSocketAddress("localhost", cassandraPort));
342+
return sessionBuilder;
343+
}
312344
}

instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraAttributesExtractor.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import io.opentelemetry.context.Context;
1515
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
1616
import io.opentelemetry.semconv.SemanticAttributes;
17+
import java.net.InetSocketAddress;
18+
import java.net.SocketAddress;
1719
import javax.annotation.Nullable;
1820

1921
final class CassandraAttributesExtractor
@@ -36,6 +38,12 @@ public void onEnd(
3638

3739
Node coordinator = executionInfo.getCoordinator();
3840
if (coordinator != null) {
41+
SocketAddress address = coordinator.getEndPoint().resolve();
42+
if (address instanceof InetSocketAddress) {
43+
attributes.put(
44+
SemanticAttributes.SERVER_ADDRESS, ((InetSocketAddress) address).getHostString());
45+
attributes.put(SemanticAttributes.SERVER_PORT, ((InetSocketAddress) address).getPort());
46+
}
3947
if (coordinator.getDatacenter() != null) {
4048
attributes.put(SemanticAttributes.DB_CASSANDRA_COORDINATOR_DC, coordinator.getDatacenter());
4149
}

instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraAttributesExtractor.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,28 @@
99
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
1010
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
1111
import com.datastax.oss.driver.api.core.cql.Statement;
12+
import com.datastax.oss.driver.api.core.metadata.EndPoint;
1213
import com.datastax.oss.driver.api.core.metadata.Node;
14+
import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint;
15+
import com.datastax.oss.driver.internal.core.metadata.SniEndPoint;
1316
import io.opentelemetry.api.common.AttributesBuilder;
1417
import io.opentelemetry.context.Context;
1518
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
1619
import io.opentelemetry.semconv.SemanticAttributes;
20+
import java.lang.reflect.Field;
21+
import java.net.InetSocketAddress;
22+
import java.util.logging.Level;
23+
import java.util.logging.Logger;
1724
import javax.annotation.Nullable;
1825

1926
final class CassandraAttributesExtractor
2027
implements AttributesExtractor<CassandraRequest, ExecutionInfo> {
2128

29+
private static final Logger logger =
30+
Logger.getLogger(CassandraAttributesExtractor.class.getName());
31+
32+
private static final Field proxyAddressField = getProxyAddressField();
33+
2234
@Override
2335
public void onStart(
2436
AttributesBuilder attributes, Context parentContext, CassandraRequest request) {}
@@ -36,6 +48,8 @@ public void onEnd(
3648

3749
Node coordinator = executionInfo.getCoordinator();
3850
if (coordinator != null) {
51+
updateServerAddressAndPort(attributes, coordinator);
52+
3953
if (coordinator.getDatacenter() != null) {
4054
attributes.put(SemanticAttributes.DB_CASSANDRA_COORDINATOR_DC, coordinator.getDatacenter());
4155
}
@@ -74,4 +88,40 @@ public void onEnd(
7488
}
7589
attributes.put(SemanticAttributes.DB_CASSANDRA_IDEMPOTENCE, idempotent);
7690
}
91+
92+
private static void updateServerAddressAndPort(AttributesBuilder attributes, Node coordinator) {
93+
EndPoint endPoint = coordinator.getEndPoint();
94+
if (endPoint instanceof DefaultEndPoint) {
95+
InetSocketAddress address = ((DefaultEndPoint) endPoint).resolve();
96+
attributes.put(SemanticAttributes.SERVER_ADDRESS, address.getHostString());
97+
attributes.put(SemanticAttributes.SERVER_PORT, address.getPort());
98+
} else if (endPoint instanceof SniEndPoint && proxyAddressField != null) {
99+
SniEndPoint sniEndPoint = (SniEndPoint) endPoint;
100+
Object object = null;
101+
try {
102+
object = proxyAddressField.get(sniEndPoint);
103+
} catch (Exception e) {
104+
logger.log(
105+
Level.FINE,
106+
"Error when accessing the private field proxyAddress of SniEndPoint using reflection.",
107+
e);
108+
}
109+
if (object instanceof InetSocketAddress) {
110+
InetSocketAddress address = (InetSocketAddress) object;
111+
attributes.put(SemanticAttributes.SERVER_ADDRESS, address.getHostString());
112+
attributes.put(SemanticAttributes.SERVER_PORT, address.getPort());
113+
}
114+
}
115+
}
116+
117+
@Nullable
118+
private static Field getProxyAddressField() {
119+
try {
120+
Field field = SniEndPoint.class.getDeclaredField("proxyAddress");
121+
field.setAccessible(true);
122+
return field;
123+
} catch (Exception e) {
124+
return null;
125+
}
126+
}
77127
}

instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraNetworkAttributesGetter.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
package io.opentelemetry.instrumentation.cassandra.v4_4;
77

88
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
9+
import com.datastax.oss.driver.api.core.metadata.EndPoint;
910
import com.datastax.oss.driver.api.core.metadata.Node;
11+
import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint;
12+
import com.datastax.oss.driver.internal.core.metadata.SniEndPoint;
1013
import io.opentelemetry.instrumentation.api.semconv.network.NetworkAttributesGetter;
1114
import java.net.InetSocketAddress;
12-
import java.net.SocketAddress;
1315
import javax.annotation.Nullable;
1416

1517
final class CassandraNetworkAttributesGetter
@@ -27,8 +29,12 @@ public InetSocketAddress getNetworkPeerInetSocketAddress(
2729
return null;
2830
}
2931
// resolve() returns an existing InetSocketAddress, it does not do a dns resolve,
30-
// at least in the only current EndPoint implementation (DefaultEndPoint)
31-
SocketAddress address = coordinator.getEndPoint().resolve();
32-
return address instanceof InetSocketAddress ? (InetSocketAddress) address : null;
32+
EndPoint endPoint = coordinator.getEndPoint();
33+
if (endPoint instanceof DefaultEndPoint) {
34+
return (InetSocketAddress) coordinator.getEndPoint().resolve();
35+
} else if (endPoint instanceof SniEndPoint) {
36+
return ((SniEndPoint) endPoint).resolve();
37+
}
38+
return null;
3339
}
3440
}

instrumentation/cassandra/cassandra-4.4/testing/src/main/java/io/opentelemetry/testing/cassandra/v4_4/AbstractCassandra44Test.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,18 @@
1919
import static io.opentelemetry.semconv.SemanticAttributes.DB_STATEMENT;
2020
import static io.opentelemetry.semconv.SemanticAttributes.DB_SYSTEM;
2121
import static io.opentelemetry.semconv.SemanticAttributes.NETWORK_TYPE;
22+
import static io.opentelemetry.semconv.SemanticAttributes.SERVER_ADDRESS;
23+
import static io.opentelemetry.semconv.SemanticAttributes.SERVER_PORT;
24+
import static org.assertj.core.api.Assertions.assertThat;
2225
import static org.junit.jupiter.api.Named.named;
2326

2427
import com.datastax.oss.driver.api.core.CqlSession;
28+
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
29+
import com.datastax.oss.driver.internal.core.metadata.SniEndPoint;
2530
import io.opentelemetry.api.trace.SpanKind;
2631
import io.opentelemetry.cassandra.v4.common.AbstractCassandraTest;
2732
import io.opentelemetry.instrumentation.api.semconv.network.internal.NetworkAttributes;
33+
import java.net.InetSocketAddress;
2834
import java.util.stream.Stream;
2935
import org.junit.jupiter.params.ParameterizedTest;
3036
import org.junit.jupiter.params.provider.Arguments;
@@ -56,8 +62,20 @@ void reactiveTest(Parameter parameter) {
5662
.hasKind(SpanKind.CLIENT)
5763
.hasParent(trace.getSpan(0))
5864
.hasAttributesSatisfyingExactly(
59-
equalTo(NETWORK_TYPE, "ipv4"),
60-
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
65+
satisfies(
66+
NETWORK_TYPE,
67+
val ->
68+
val.satisfiesAnyOf(
69+
v -> assertThat(v).isEqualTo("ipv4"),
70+
v -> assertThat(v).isEqualTo("ipv6"))),
71+
equalTo(SERVER_ADDRESS, "localhost"),
72+
equalTo(SERVER_PORT, cassandraPort),
73+
satisfies(
74+
NetworkAttributes.NETWORK_PEER_ADDRESS,
75+
val ->
76+
val.satisfiesAnyOf(
77+
v -> assertThat(v).isEqualTo("127.0.0.1"),
78+
v -> assertThat(v).isEqualTo("0:0:0:0:0:0:0:1"))),
6179
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
6280
equalTo(DB_SYSTEM, "cassandra"),
6381
equalTo(DB_NAME, parameter.keyspace),
@@ -135,4 +153,11 @@ private static Stream<Arguments> provideReactiveParameters() {
135153
"SELECT",
136154
"users"))));
137155
}
156+
157+
@Override
158+
protected CqlSessionBuilder addContactPoint(CqlSessionBuilder sessionBuilder) {
159+
InetSocketAddress address = new InetSocketAddress("localhost", cassandraPort);
160+
sessionBuilder.addContactEndPoint(new SniEndPoint(address, "localhost"));
161+
return sessionBuilder;
162+
}
138163
}

0 commit comments

Comments
 (0)