Skip to content

Commit b6ea6d6

Browse files
authored
Replace cassandra session wrapper with jdk proxy (#8041)
Resolves #8026
1 parent 083acdd commit b6ea6d6

File tree

4 files changed

+135
-299
lines changed

4 files changed

+135
-299
lines changed

instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CompletionStageFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,6 @@ public Object apply(Object session) {
1919
if (session.getClass().getName().endsWith("cassandra4.TracingCqlSession")) {
2020
return session;
2121
}
22-
return new TracingCqlSession((CqlSession) session);
22+
return TracingCqlSession.wrapSession((CqlSession) session);
2323
}
2424
}

instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/TracingCqlSession.java

Lines changed: 58 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -7,194 +7,106 @@
77

88
import static io.opentelemetry.javaagent.instrumentation.cassandra.v4_0.CassandraSingletons.instrumenter;
99

10-
import com.datastax.oss.driver.api.core.CqlIdentifier;
1110
import com.datastax.oss.driver.api.core.CqlSession;
1211
import com.datastax.oss.driver.api.core.DriverException;
13-
import com.datastax.oss.driver.api.core.context.DriverContext;
1412
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
1513
import com.datastax.oss.driver.api.core.cql.BoundStatement;
1614
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
17-
import com.datastax.oss.driver.api.core.cql.PrepareRequest;
18-
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
1915
import com.datastax.oss.driver.api.core.cql.ResultSet;
2016
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
2117
import com.datastax.oss.driver.api.core.cql.Statement;
22-
import com.datastax.oss.driver.api.core.metadata.Metadata;
23-
import com.datastax.oss.driver.api.core.metrics.Metrics;
24-
import com.datastax.oss.driver.api.core.session.Request;
25-
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
2618
import io.opentelemetry.context.Context;
2719
import io.opentelemetry.context.Scope;
28-
import java.util.Optional;
20+
import java.lang.reflect.Proxy;
21+
import java.util.ArrayList;
22+
import java.util.Arrays;
23+
import java.util.List;
2924
import java.util.concurrent.CompletableFuture;
3025
import java.util.concurrent.CompletionStage;
3126
import java.util.function.Supplier;
3227
import javax.annotation.Nullable;
3328

34-
public class TracingCqlSession implements CqlSession {
35-
private final CqlSession session;
29+
final class TracingCqlSession {
30+
private TracingCqlSession() {}
3631

37-
public TracingCqlSession(CqlSession session) {
38-
this.session = session;
39-
}
40-
41-
@Override
42-
public PreparedStatement prepare(SimpleStatement statement) {
43-
return session.prepare(statement);
44-
}
45-
46-
@Override
47-
public PreparedStatement prepare(String query) {
48-
return session.prepare(query);
49-
}
50-
51-
@Override
52-
public PreparedStatement prepare(PrepareRequest request) {
53-
return session.prepare(request);
54-
}
55-
56-
@Override
57-
public CompletionStage<PreparedStatement> prepareAsync(SimpleStatement statement) {
58-
return session.prepareAsync(statement);
59-
}
60-
61-
@Override
62-
public CompletionStage<PreparedStatement> prepareAsync(String query) {
63-
return session.prepareAsync(query);
64-
}
65-
66-
@Override
67-
public CompletionStage<PreparedStatement> prepareAsync(PrepareRequest request) {
68-
return session.prepareAsync(request);
69-
}
70-
71-
@Override
72-
public String getName() {
73-
return session.getName();
74-
}
75-
76-
@Override
77-
public Metadata getMetadata() {
78-
return session.getMetadata();
79-
}
80-
81-
@Override
82-
public boolean isSchemaMetadataEnabled() {
83-
return session.isSchemaMetadataEnabled();
84-
}
85-
86-
@Override
87-
public CompletionStage<Metadata> setSchemaMetadataEnabled(@Nullable Boolean newValue) {
88-
return session.setSchemaMetadataEnabled(newValue);
89-
}
90-
91-
@Override
92-
public CompletionStage<Metadata> refreshSchemaAsync() {
93-
return session.refreshSchemaAsync();
94-
}
95-
96-
@Override
97-
public Metadata refreshSchema() {
98-
return session.refreshSchema();
99-
}
100-
101-
@Override
102-
public CompletionStage<Boolean> checkSchemaAgreementAsync() {
103-
return session.checkSchemaAgreementAsync();
104-
}
105-
106-
@Override
107-
public boolean checkSchemaAgreement() {
108-
return session.checkSchemaAgreement();
109-
}
110-
111-
@Override
112-
public DriverContext getContext() {
113-
return session.getContext();
114-
}
115-
116-
@Override
117-
public Optional<CqlIdentifier> getKeyspace() {
118-
return session.getKeyspace();
119-
}
120-
121-
@Override
122-
public Optional<Metrics> getMetrics() {
123-
return session.getMetrics();
124-
}
125-
126-
@Override
127-
public CompletionStage<Void> closeFuture() {
128-
return session.closeFuture();
129-
}
130-
131-
@Override
132-
public boolean isClosed() {
133-
return session.isClosed();
134-
}
135-
136-
@Override
137-
public CompletionStage<Void> closeAsync() {
138-
return session.closeAsync();
139-
}
140-
141-
@Override
142-
public CompletionStage<Void> forceCloseAsync() {
143-
return session.forceCloseAsync();
144-
}
145-
146-
@Override
147-
public void close() {
148-
session.close();
149-
}
150-
151-
@Override
152-
@Nullable
153-
public <REQUEST extends Request, RESULT> RESULT execute(
154-
REQUEST request, GenericType<RESULT> resultType) {
155-
return session.execute(request, resultType);
156-
}
32+
static CqlSession wrapSession(CqlSession session) {
33+
if (session == null) {
34+
return null;
35+
}
15736

158-
@Override
159-
public ResultSet execute(String query) {
37+
List<Class<?>> interfaces = new ArrayList<>();
38+
Class<?> clazz = session.getClass();
39+
while (clazz != Object.class) {
40+
interfaces.addAll(Arrays.asList(clazz.getInterfaces()));
41+
clazz = clazz.getSuperclass();
42+
}
43+
return (CqlSession)
44+
Proxy.newProxyInstance(
45+
session.getClass().getClassLoader(),
46+
interfaces.toArray(new Class<?>[0]),
47+
(proxy, method, args) -> {
48+
if ("execute".equals(method.getName()) && method.getParameterCount() == 1) {
49+
if (method.getParameterTypes()[0] == String.class) {
50+
String query = (String) args[0];
51+
return execute(session, query);
52+
}
53+
if (method.getParameterTypes()[0] == Statement.class) {
54+
Statement<?> statement = (Statement<?>) args[0];
55+
return execute(session, statement);
56+
}
57+
} else if ("executeAsync".equals(method.getName())
58+
&& method.getParameterCount() == 1) {
59+
if (method.getParameterTypes()[0] == String.class) {
60+
String query = (String) args[0];
61+
return executeAsync(session, query);
62+
}
63+
if (method.getParameterTypes()[0] == Statement.class) {
64+
Statement<?> statement = (Statement<?>) args[0];
65+
return executeAsync(session, statement);
66+
}
67+
}
68+
69+
return method.invoke(session, args);
70+
});
71+
}
72+
73+
private static ResultSet execute(CqlSession session, String query) {
16074
CassandraRequest request = CassandraRequest.create(session, query);
16175
Context context = instrumenter().start(Context.current(), request);
16276
ResultSet resultSet;
16377
try (Scope ignored = context.makeCurrent()) {
16478
resultSet = session.execute(query);
165-
} catch (RuntimeException e) {
166-
instrumenter().end(context, request, getExecutionInfo(e), e);
167-
throw e;
79+
} catch (Throwable exception) {
80+
instrumenter().end(context, request, getExecutionInfo(exception), exception);
81+
throw exception;
16882
}
16983
instrumenter().end(context, request, resultSet.getExecutionInfo(), null);
17084
return resultSet;
17185
}
17286

173-
@Override
174-
public ResultSet execute(Statement<?> statement) {
87+
private static ResultSet execute(CqlSession session, Statement<?> statement) {
17588
String query = getQuery(statement);
17689
CassandraRequest request = CassandraRequest.create(session, query);
17790
Context context = instrumenter().start(Context.current(), request);
17891
ResultSet resultSet;
17992
try (Scope ignored = context.makeCurrent()) {
18093
resultSet = session.execute(statement);
181-
} catch (RuntimeException e) {
182-
instrumenter().end(context, request, getExecutionInfo(e), e);
183-
throw e;
94+
} catch (Throwable exception) {
95+
instrumenter().end(context, request, getExecutionInfo(exception), exception);
96+
throw exception;
18497
}
18598
instrumenter().end(context, request, resultSet.getExecutionInfo(), null);
18699
return resultSet;
187100
}
188101

189-
@Override
190-
public CompletionStage<AsyncResultSet> executeAsync(Statement<?> statement) {
102+
private static CompletionStage<AsyncResultSet> executeAsync(
103+
CqlSession session, Statement<?> statement) {
191104
String query = getQuery(statement);
192105
CassandraRequest request = CassandraRequest.create(session, query);
193106
return executeAsync(request, () -> session.executeAsync(statement));
194107
}
195108

196-
@Override
197-
public CompletionStage<AsyncResultSet> executeAsync(String query) {
109+
private static CompletionStage<AsyncResultSet> executeAsync(CqlSession session, String query) {
198110
CassandraRequest request = CassandraRequest.create(session, query);
199111
return executeAsync(request, () -> session.executeAsync(query));
200112
}
@@ -218,7 +130,7 @@ private static CompletionStage<AsyncResultSet> executeAsync(
218130
}
219131
}
220132

221-
static <T> CompletableFuture<T> wrap(CompletionStage<T> future, Context context) {
133+
private static <T> CompletableFuture<T> wrap(CompletionStage<T> future, Context context) {
222134
CompletableFuture<T> result = new CompletableFuture<>();
223135
future.whenComplete(
224136
(T value, Throwable throwable) -> {

instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraTelemetry.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ public static CassandraTelemetryBuilder builder(OpenTelemetry openTelemetry) {
2626
return new CassandraTelemetryBuilder(openTelemetry);
2727
}
2828

29-
private final Instrumenter<CassandraRequest, ExecutionInfo> instrumenter;
29+
private final TracingCqlSession tracingCqlSession;
3030

3131
protected CassandraTelemetry(Instrumenter<CassandraRequest, ExecutionInfo> instrumenter) {
32-
this.instrumenter = instrumenter;
32+
this.tracingCqlSession = new TracingCqlSession(instrumenter);
3333
}
3434

3535
/**
@@ -39,6 +39,6 @@ protected CassandraTelemetry(Instrumenter<CassandraRequest, ExecutionInfo> instr
3939
* @return a {@link TracingCqlSession}.
4040
*/
4141
public CqlSession wrap(CqlSession session) {
42-
return new TracingCqlSession(session, instrumenter);
42+
return tracingCqlSession.wrapSession(session);
4343
}
4444
}

0 commit comments

Comments
 (0)