Skip to content

Commit f650d43

Browse files
committed
Add tracing support using Micrometer
1 parent 4b2967e commit f650d43

File tree

24 files changed

+546
-39
lines changed

24 files changed

+546
-39
lines changed

driver-core/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ dependencies {
5454

5555
optionalImplementation(libs.snappy.java)
5656
optionalImplementation(libs.zstd.jni)
57+
optionalImplementation(libs.micrometer)
5758

5859
testImplementation(project(path = ":bson", configuration = "testArtifacts"))
5960
testImplementation(libs.reflections)

driver-core/src/main/com/mongodb/MongoClientSettings.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
import com.mongodb.connection.SslSettings;
3131
import com.mongodb.connection.TransportSettings;
3232
import com.mongodb.event.CommandListener;
33+
import com.mongodb.internal.tracing.TracingManager;
3334
import com.mongodb.lang.Nullable;
3435
import com.mongodb.spi.dns.DnsClient;
3536
import com.mongodb.spi.dns.InetAddressResolver;
37+
import com.mongodb.tracing.Tracer;
3638
import org.bson.UuidRepresentation;
3739
import org.bson.codecs.BsonCodecProvider;
3840
import org.bson.codecs.BsonValueCodecProvider;
@@ -118,6 +120,7 @@ public final class MongoClientSettings {
118120
private final InetAddressResolver inetAddressResolver;
119121
@Nullable
120122
private final Long timeoutMS;
123+
private final TracingManager tracingManager;
121124

122125
/**
123126
* Gets the default codec registry. It includes the following providers:
@@ -238,6 +241,7 @@ public static final class Builder {
238241
private ContextProvider contextProvider;
239242
private DnsClient dnsClient;
240243
private InetAddressResolver inetAddressResolver;
244+
private TracingManager tracingManager;
241245

242246
private Builder() {
243247
}
@@ -275,6 +279,7 @@ private Builder(final MongoClientSettings settings) {
275279
if (settings.heartbeatSocketTimeoutSetExplicitly) {
276280
heartbeatSocketTimeoutMS = settings.heartbeatSocketSettings.getReadTimeout(MILLISECONDS);
277281
}
282+
tracingManager = settings.tracingManager;
278283
}
279284

280285
/**
@@ -723,6 +728,20 @@ Builder heartbeatSocketTimeoutMS(final int heartbeatSocketTimeoutMS) {
723728
return this;
724729
}
725730

731+
/**
732+
* Sets the tracer to use for creating Spans for operations and commands.
733+
*
734+
* @param tracer the tracer
735+
* @see com.mongodb.tracing.MicrometerTracer
736+
* @return this
737+
* @since 5.5
738+
*/
739+
@Alpha(Reason.CLIENT)
740+
public Builder tracer(final Tracer tracer) {
741+
this.tracingManager = new TracingManager(tracer);
742+
return this;
743+
}
744+
726745
/**
727746
* Build an instance of {@code MongoClientSettings}.
728747
*
@@ -1040,6 +1059,17 @@ public ContextProvider getContextProvider() {
10401059
return contextProvider;
10411060
}
10421061

1062+
/**
1063+
* Get the tracer to create Spans for operations and commands.
1064+
*
1065+
* @return this
1066+
* @since 5.5
1067+
*/
1068+
@Alpha(Reason.CLIENT)
1069+
public TracingManager getTracingManager() {
1070+
return tracingManager;
1071+
}
1072+
10431073
@Override
10441074
public boolean equals(final Object o) {
10451075
if (this == o) {
@@ -1156,5 +1186,6 @@ private MongoClientSettings(final Builder builder) {
11561186
heartbeatConnectTimeoutSetExplicitly = builder.heartbeatConnectTimeoutMS != 0;
11571187
contextProvider = builder.contextProvider;
11581188
timeoutMS = builder.timeoutMS;
1189+
tracingManager = builder.tracingManager;
11591190
}
11601191
}

driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,10 @@ BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) {
186186
}
187187
}
188188

189+
BsonDocument getCommand() {
190+
return command;
191+
}
192+
189193
/**
190194
* Get the field name from a buffer positioned at the start of the document sequence identifier of an OP_MSG Section of type
191195
* `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE`.

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@
5151
import com.mongodb.internal.logging.StructuredLogger;
5252
import com.mongodb.internal.session.SessionContext;
5353
import com.mongodb.internal.time.Timeout;
54+
import com.mongodb.internal.tracing.Span;
55+
import com.mongodb.internal.tracing.TraceContext;
56+
import com.mongodb.internal.tracing.TracingManager;
5457
import com.mongodb.lang.Nullable;
5558
import org.bson.BsonBinaryReader;
5659
import org.bson.BsonDocument;
@@ -374,13 +377,24 @@ public boolean isClosed() {
374377
public <T> T sendAndReceive(final CommandMessage message, final Decoder<T> decoder, final OperationContext operationContext) {
375378
Supplier<T> sendAndReceiveInternal = () -> sendAndReceiveInternal(
376379
message, decoder, operationContext);
380+
381+
Span tracingSpan = createTracingSpan(message, operationContext);
382+
377383
try {
378384
return sendAndReceiveInternal.get();
379385
} catch (MongoCommandException e) {
386+
if (tracingSpan != null) {
387+
tracingSpan.error(e);
388+
}
389+
380390
if (reauthenticationIsTriggered(e)) {
381391
return reauthenticateAndRetry(sendAndReceiveInternal, operationContext);
382392
}
383393
throw e;
394+
} finally {
395+
if (tracingSpan != null) {
396+
tracingSpan.end();
397+
}
384398
}
385399
}
386400

@@ -391,6 +405,7 @@ public <T> void sendAndReceiveAsync(final CommandMessage message, final Decoder<
391405

392406
AsyncSupplier<T> sendAndReceiveAsyncInternal = c -> sendAndReceiveAsyncInternal(
393407
message, decoder, operationContext, c);
408+
394409
beginAsync().<T>thenSupply(c -> {
395410
sendAndReceiveAsyncInternal.getAsync(c);
396411
}).onErrorIf(e -> reauthenticationIsTriggered(e), (t, c) -> {
@@ -872,6 +887,42 @@ public ByteBuf getBuffer(final int size) {
872887
return stream.getBuffer(size);
873888
}
874889

890+
@Nullable
891+
private Span createTracingSpan(final CommandMessage message, final OperationContext operationContext) {
892+
TracingManager tracingManager = operationContext.getTracingManager();
893+
Span span;
894+
if (tracingManager.isEnabled()) {
895+
BsonDocument command = message.getCommand();
896+
TraceContext parentContext = null;
897+
long cursorId = -1;
898+
if (command.containsKey("getMore")) {
899+
cursorId = command.getInt64("getMore").longValue();
900+
parentContext = tracingManager.getCursorParentContext(cursorId);
901+
} else {
902+
parentContext = tracingManager.getParentContext(operationContext.getId());
903+
}
904+
905+
span = tracingManager.addSpan("Command " + command.getFirstKey(), parentContext);
906+
span.tag("db.system", "mongodb");
907+
span.tag("db.namespace", message.getNamespace().getFullName());
908+
span.tag("db.query.summary", command.getFirstKey());
909+
span.tag("db.query.opcode", String.valueOf(message.getOpCode()));
910+
span.tag("db.query.text", command.toString());
911+
if (cursorId != -1) {
912+
span.tag("db.mongodb.cursor_id", String.valueOf(cursorId));
913+
}
914+
span.tag("server.address", serverId.getAddress().getHost());
915+
span.tag("server.port", String.valueOf(serverId.getAddress().getPort()));
916+
span.tag("server.type", message.getSettings().getServerType().name());
917+
918+
span.tag("db.mongodb.server_connection_id", this.description.getConnectionId().toString());
919+
} else {
920+
span = null;
921+
}
922+
923+
return span;
924+
}
925+
875926
private class MessageHeaderCallback implements SingleResultCallback<ByteBuf> {
876927
private final OperationContext operationContext;
877928
private final SingleResultCallback<ResponseBuffers> callback;

driver-core/src/main/com/mongodb/internal/connection/OperationContext.java

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.mongodb.internal.TimeoutSettings;
2828
import com.mongodb.internal.VisibleForTesting;
2929
import com.mongodb.internal.session.SessionContext;
30+
import com.mongodb.internal.tracing.TracingManager;
3031
import com.mongodb.lang.Nullable;
3132
import com.mongodb.selector.ServerSelector;
3233

@@ -47,19 +48,25 @@ public class OperationContext {
4748
private final SessionContext sessionContext;
4849
private final RequestContext requestContext;
4950
private final TimeoutContext timeoutContext;
51+
private final TracingManager tracingManager;
5052
@Nullable
5153
private final ServerApi serverApi;
5254
@Nullable
5355
private final String operationName;
5456

5557
public OperationContext(final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext,
5658
@Nullable final ServerApi serverApi) {
57-
this(requestContext, sessionContext, timeoutContext, serverApi, null);
59+
this(requestContext, sessionContext, timeoutContext, TracingManager.NO_OP, serverApi, null);
5860
}
5961

6062
public OperationContext(final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext,
61-
@Nullable final ServerApi serverApi, @Nullable final String operationName) {
62-
this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), serverApi, operationName);
63+
final TracingManager tracingManager,
64+
@Nullable final ServerApi serverApi,
65+
@Nullable final String operationName) {
66+
this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, new ServerDeprioritization(),
67+
tracingManager,
68+
serverApi,
69+
operationName);
6370
}
6471

6572
public static OperationContext simpleOperationContext(
@@ -68,35 +75,45 @@ public static OperationContext simpleOperationContext(
6875
IgnorableRequestContext.INSTANCE,
6976
NoOpSessionContext.INSTANCE,
7077
new TimeoutContext(timeoutSettings),
78+
TracingManager.NO_OP,
7179
serverApi,
72-
null);
80+
null
81+
);
7382
}
7483

7584
public static OperationContext simpleOperationContext(final TimeoutContext timeoutContext) {
7685
return new OperationContext(
7786
IgnorableRequestContext.INSTANCE,
7887
NoOpSessionContext.INSTANCE,
7988
timeoutContext,
89+
TracingManager.NO_OP,
8090
null,
8191
null);
8292
}
8393

8494
public OperationContext withSessionContext(final SessionContext sessionContext) {
85-
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, operationName);
95+
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, tracingManager, serverApi,
96+
operationName);
8697
}
8798

8899
public OperationContext withTimeoutContext(final TimeoutContext timeoutContext) {
89-
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, operationName);
100+
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, tracingManager, serverApi,
101+
operationName);
90102
}
91103

92104
public OperationContext withOperationName(final String operationName) {
93-
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, operationName);
105+
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, tracingManager, serverApi,
106+
operationName);
94107
}
95108

96109
public long getId() {
97110
return id;
98111
}
99112

113+
public TracingManager getTracingManager() {
114+
return tracingManager;
115+
}
116+
100117
public SessionContext getSessionContext() {
101118
return sessionContext;
102119
}
@@ -121,33 +138,37 @@ public String getOperationName() {
121138

122139
@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
123140
public OperationContext(final long id,
124-
final RequestContext requestContext,
125-
final SessionContext sessionContext,
126-
final TimeoutContext timeoutContext,
127-
final ServerDeprioritization serverDeprioritization,
128-
@Nullable final ServerApi serverApi,
129-
@Nullable final String operationName) {
141+
final RequestContext requestContext,
142+
final SessionContext sessionContext,
143+
final TimeoutContext timeoutContext,
144+
final ServerDeprioritization serverDeprioritization,
145+
final TracingManager tracingManager,
146+
@Nullable final ServerApi serverApi,
147+
@Nullable final String operationName) {
130148
this.id = id;
131149
this.serverDeprioritization = serverDeprioritization;
132150
this.requestContext = requestContext;
133151
this.sessionContext = sessionContext;
134152
this.timeoutContext = timeoutContext;
153+
this.tracingManager = tracingManager;
135154
this.serverApi = serverApi;
136155
this.operationName = operationName;
137156
}
138157

139158
@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
140159
public OperationContext(final long id,
141-
final RequestContext requestContext,
142-
final SessionContext sessionContext,
143-
final TimeoutContext timeoutContext,
144-
@Nullable final ServerApi serverApi,
145-
@Nullable final String operationName) {
160+
final RequestContext requestContext,
161+
final SessionContext sessionContext,
162+
final TimeoutContext timeoutContext,
163+
final TracingManager tracingManager,
164+
@Nullable final ServerApi serverApi,
165+
@Nullable final String operationName) {
146166
this.id = id;
147167
this.serverDeprioritization = new ServerDeprioritization();
148168
this.requestContext = requestContext;
149169
this.sessionContext = sessionContext;
150170
this.timeoutContext = timeoutContext;
171+
this.tracingManager = tracingManager;
151172
this.serverApi = serverApi;
152173
this.operationName = operationName;
153174
}

driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
7575
@Nullable
7676
private List<T> nextBatch;
7777
private boolean resetTimeoutWhenClosing;
78+
private final long cursorId;
7879

7980
CommandBatchCursor(
8081
final TimeoutMode timeoutMode,
@@ -95,10 +96,13 @@ class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
9596
operationContext = connectionSource.getOperationContext();
9697
this.timeoutMode = timeoutMode;
9798

99+
ServerCursor serverCursor = commandCursorResult.getServerCursor();
100+
this.cursorId = serverCursor != null ? serverCursor.getId() : -1;
101+
98102
operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS);
99103

100104
Connection connectionToPin = connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER ? connection : null;
101-
resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor());
105+
resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, serverCursor);
102106
resetTimeoutWhenClosing = true;
103107
}
104108

@@ -169,6 +173,7 @@ public void remove() {
169173

170174
@Override
171175
public void close() {
176+
operationContext.getTracingManager().removeCursorParentContext(cursorId);
172177
resourceManager.close();
173178
}
174179

0 commit comments

Comments
 (0)