52
52
import com .mongodb .internal .session .SessionContext ;
53
53
import com .mongodb .internal .time .Timeout ;
54
54
import com .mongodb .internal .tracing .Span ;
55
- import com .mongodb .internal .tracing .TraceContext ;
56
55
import com .mongodb .internal .tracing .TracingManager ;
57
56
import com .mongodb .lang .Nullable ;
58
57
import org .bson .BsonBinaryReader ;
97
96
import static com .mongodb .internal .connection .ProtocolHelper .isCommandOk ;
98
97
import static com .mongodb .internal .logging .LogMessage .Level .DEBUG ;
99
98
import static com .mongodb .internal .thread .InterruptionUtil .translateInterruptedException ;
99
+ import static com .mongodb .internal .tracing .Tags .CLIENT_CONNECTION_ID ;
100
+ import static com .mongodb .internal .tracing .Tags .CURSOR_ID ;
101
+ import static com .mongodb .internal .tracing .Tags .NAMESPACE ;
102
+ import static com .mongodb .internal .tracing .Tags .QUERY_SUMMARY ;
103
+ import static com .mongodb .internal .tracing .Tags .QUERY_TEXT ;
104
+ import static com .mongodb .internal .tracing .Tags .SERVER_ADDRESS ;
105
+ import static com .mongodb .internal .tracing .Tags .SERVER_CONNECTION_ID ;
106
+ import static com .mongodb .internal .tracing .Tags .SERVER_PORT ;
107
+ import static com .mongodb .internal .tracing .Tags .SERVER_TYPE ;
108
+ import static com .mongodb .internal .tracing .Tags .SESSION_ID ;
109
+ import static com .mongodb .internal .tracing .Tags .SYSTEM ;
110
+ import static com .mongodb .internal .tracing .Tags .TRANSACTION_NUMBER ;
100
111
import static java .util .Arrays .asList ;
101
112
102
113
/**
@@ -377,24 +388,13 @@ public boolean isClosed() {
377
388
public <T > T sendAndReceive (final CommandMessage message , final Decoder <T > decoder , final OperationContext operationContext ) {
378
389
Supplier <T > sendAndReceiveInternal = () -> sendAndReceiveInternal (
379
390
message , decoder , operationContext );
380
-
381
- Span tracingSpan = createTracingSpan (message , operationContext );
382
-
383
391
try {
384
392
return sendAndReceiveInternal .get ();
385
- } catch (MongoCommandException e ) {
386
- if (tracingSpan != null ) {
387
- tracingSpan .error (e );
388
- }
389
-
393
+ } catch (Throwable e ) {
390
394
if (reauthenticationIsTriggered (e )) {
391
395
return reauthenticateAndRetry (sendAndReceiveInternal , operationContext );
392
396
}
393
397
throw e ;
394
- } finally {
395
- if (tracingSpan != null ) {
396
- tracingSpan .end ();
397
- }
398
398
}
399
399
}
400
400
@@ -406,9 +406,7 @@ public <T> void sendAndReceiveAsync(final CommandMessage message, final Decoder<
406
406
AsyncSupplier <T > sendAndReceiveAsyncInternal = c -> sendAndReceiveAsyncInternal (
407
407
message , decoder , operationContext , c );
408
408
409
- beginAsync ().<T >thenSupply (c -> {
410
- sendAndReceiveAsyncInternal .getAsync (c );
411
- }).onErrorIf (e -> reauthenticationIsTriggered (e ), (t , c ) -> {
409
+ beginAsync ().thenSupply (sendAndReceiveAsyncInternal ::getAsync ).onErrorIf (this ::reauthenticationIsTriggered , (t , c ) -> {
412
410
reauthenticateAndRetryAsync (sendAndReceiveAsyncInternal , operationContext , c );
413
411
}).finish (callback );
414
412
}
@@ -447,15 +445,44 @@ public boolean reauthenticationIsTriggered(@Nullable final Throwable t) {
447
445
private <T > T sendAndReceiveInternal (final CommandMessage message , final Decoder <T > decoder ,
448
446
final OperationContext operationContext ) {
449
447
CommandEventSender commandEventSender ;
448
+
450
449
try (ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput (this )) {
451
450
message .encode (bsonOutput , operationContext );
452
- commandEventSender = createCommandEventSender (message , bsonOutput , operationContext );
453
- commandEventSender .sendStartedEvent ();
451
+ Span tracingSpan = createTracingSpan (message , operationContext , bsonOutput );
452
+
453
+ boolean isLoggingCommandNeeded = isLoggingCommandNeeded ();
454
+ boolean isTracingCommandPayloadNeeded = tracingSpan != null && operationContext .getTracingManager ().isCommandPayloadEnabled ();
455
+
456
+ // Only hydrate the command document if necessary
457
+ BsonDocument commandDocument = null ;
458
+ if (isLoggingCommandNeeded || isTracingCommandPayloadNeeded ) {
459
+ commandDocument = message .getCommandDocument (bsonOutput );
460
+ }
461
+ if (isLoggingCommandNeeded ) {
462
+ commandEventSender = new LoggingCommandEventSender (
463
+ SECURITY_SENSITIVE_COMMANDS , SECURITY_SENSITIVE_HELLO_COMMANDS , description , commandListener ,
464
+ operationContext , message , commandDocument ,
465
+ COMMAND_PROTOCOL_LOGGER , loggerSettings );
466
+ commandEventSender .sendStartedEvent ();
467
+ } else {
468
+ commandEventSender = new NoOpCommandEventSender ();
469
+ }
470
+ if (isTracingCommandPayloadNeeded ) {
471
+ tracingSpan .tag (QUERY_TEXT , commandDocument .toJson ());
472
+ }
473
+
454
474
try {
455
475
sendCommandMessage (message , bsonOutput , operationContext );
456
476
} catch (Exception e ) {
477
+ if (tracingSpan != null ) {
478
+ tracingSpan .error (e );
479
+ }
457
480
commandEventSender .sendFailedEvent (e );
458
481
throw e ;
482
+ } finally {
483
+ if (tracingSpan != null ) {
484
+ tracingSpan .end ();
485
+ }
459
486
}
460
487
}
461
488
@@ -568,7 +595,18 @@ private <T> void sendAndReceiveAsyncInternal(final CommandMessage message, final
568
595
569
596
try {
570
597
message .encode (bsonOutput , operationContext );
571
- CommandEventSender commandEventSender = createCommandEventSender (message , bsonOutput , operationContext );
598
+
599
+ CommandEventSender commandEventSender ;
600
+ if (isLoggingCommandNeeded ()) {
601
+ BsonDocument commandDocument = message .getCommandDocument (bsonOutput );
602
+ commandEventSender = new LoggingCommandEventSender (
603
+ SECURITY_SENSITIVE_COMMANDS , SECURITY_SENSITIVE_HELLO_COMMANDS , description , commandListener ,
604
+ operationContext , message , commandDocument ,
605
+ COMMAND_PROTOCOL_LOGGER , loggerSettings );
606
+ } else {
607
+ commandEventSender = new NoOpCommandEventSender ();
608
+ }
609
+
572
610
commandEventSender .sendStartedEvent ();
573
611
Compressor localSendCompressor = sendCompressor ;
574
612
if (localSendCompressor == null || SECURITY_SENSITIVE_COMMANDS .contains (message .getCommandDocument (bsonOutput ).getFirstKey ())) {
@@ -887,42 +925,6 @@ public ByteBuf getBuffer(final int size) {
887
925
return stream .getBuffer (size );
888
926
}
889
927
890
- @ Nullable
891
- private Span createTracingSpan (final CommandMessage message , final OperationContext operationContext ) {
892
- TracingManager tracingManager = operationContext .getTracingManager ();
893
- Span span ;
894
- if (tracingManager .isEnabled ()) {
895
- BsonDocument command = message .getCommand ();
896
- TraceContext parentContext = null ;
897
- long cursorId = -1 ;
898
- if (command .containsKey ("getMore" )) {
899
- cursorId = command .getInt64 ("getMore" ).longValue ();
900
- parentContext = tracingManager .getCursorParentContext (cursorId );
901
- } else {
902
- parentContext = tracingManager .getParentContext (operationContext .getId ());
903
- }
904
-
905
- span = tracingManager .addSpan ("Command " + command .getFirstKey (), parentContext );
906
- span .tag ("db.system" , "mongodb" );
907
- span .tag ("db.namespace" , message .getNamespace ().getFullName ());
908
- span .tag ("db.query.summary" , command .getFirstKey ());
909
- span .tag ("db.query.opcode" , String .valueOf (message .getOpCode ()));
910
- span .tag ("db.query.text" , command .toString ());
911
- if (cursorId != -1 ) {
912
- span .tag ("db.mongodb.cursor_id" , String .valueOf (cursorId ));
913
- }
914
- span .tag ("server.address" , serverId .getAddress ().getHost ());
915
- span .tag ("server.port" , String .valueOf (serverId .getAddress ().getPort ()));
916
- span .tag ("server.type" , message .getSettings ().getServerType ().name ());
917
-
918
- span .tag ("db.mongodb.server_connection_id" , this .description .getConnectionId ().toString ());
919
- } else {
920
- span = null ;
921
- }
922
-
923
- return span ;
924
- }
925
-
926
928
private class MessageHeaderCallback implements SingleResultCallback <ByteBuf > {
927
929
private final OperationContext operationContext ;
928
930
private final SingleResultCallback <ResponseBuffers > callback ;
@@ -1003,19 +1005,75 @@ public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t
1003
1005
1004
1006
private static final StructuredLogger COMMAND_PROTOCOL_LOGGER = new StructuredLogger ("protocol.command" );
1005
1007
1006
- private CommandEventSender createCommandEventSender (final CommandMessage message , final ByteBufferBsonOutput bsonOutput ,
1007
- final OperationContext operationContext ) {
1008
+ private boolean isLoggingCommandNeeded () {
1008
1009
boolean listensOrLogs = commandListener != null || COMMAND_PROTOCOL_LOGGER .isRequired (DEBUG , getClusterId ());
1009
- if (!recordEverything && (isMonitoringConnection || !opened () || !authenticated .get () || !listensOrLogs )) {
1010
- return new NoOpCommandEventSender ();
1011
- }
1012
- return new LoggingCommandEventSender (
1013
- SECURITY_SENSITIVE_COMMANDS , SECURITY_SENSITIVE_HELLO_COMMANDS , description , commandListener ,
1014
- operationContext , message , bsonOutput ,
1015
- COMMAND_PROTOCOL_LOGGER , loggerSettings );
1010
+ return recordEverything || (!isMonitoringConnection && opened () && authenticated .get () && listensOrLogs );
1016
1011
}
1017
1012
1018
1013
private ClusterId getClusterId () {
1019
1014
return description .getConnectionId ().getServerId ().getClusterId ();
1020
1015
}
1016
+
1017
+ /**
1018
+ * Creates a tracing span for the given command message.
1019
+ * <p>
1020
+ * The span is only created if tracing is enabled and the command is not security-sensitive.
1021
+ * It attaches various tags to the span, such as database system, namespace, query summary, opcode,
1022
+ * server address, port, server type, client and server connection IDs, and, if applicable,
1023
+ * transaction number and session ID. For cursor fetching commands, the parent context is retrieved using the cursor ID.
1024
+ * If command payload tracing is enabled, the command document is also attached as a tag.
1025
+ *
1026
+ * @param message the command message to trace
1027
+ * @param operationContext the operation context containing tracing and session information
1028
+ * @param bsonOutput the BSON output used to serialize the command
1029
+ * @return the created {@link Span}, or {@code null} if tracing is not enabled or the command is security-sensitive
1030
+ */
1031
+ @ Nullable
1032
+ private Span createTracingSpan (final CommandMessage message , final OperationContext operationContext , final ByteBufferBsonOutput bsonOutput ) {
1033
+
1034
+ TracingManager tracingManager = operationContext .getTracingManager ();
1035
+ BsonDocument command = message .getCommandDocument (bsonOutput );
1036
+
1037
+ // BsonDocument command = message.getCommand();
1038
+ String commandName = command .getFirstKey ();
1039
+ // Span newSpan = tracingManager.addSpan("_____Command_____[ " + commandName + " ]", myparentContext);
1040
+ if (!tracingManager .isEnabled ()
1041
+ || SECURITY_SENSITIVE_COMMANDS .contains (commandName )
1042
+ || SECURITY_SENSITIVE_HELLO_COMMANDS .contains (commandName )) {
1043
+ return null ;
1044
+ }
1045
+
1046
+ Span span = tracingManager
1047
+ .addSpan ("Command " + commandName , operationContext .getTracingSpanContext ())
1048
+ .tag (SYSTEM , "mongodb" )
1049
+ .tag (NAMESPACE , message .getNamespace ().getDatabaseName ())
1050
+ .tag (QUERY_SUMMARY , commandName );
1051
+
1052
+ if (command .containsKey ("getMore" )) {
1053
+ span .tag (CURSOR_ID , command .getInt64 ("getMore" ).longValue ());
1054
+ }
1055
+
1056
+ tagServerAndConnectionInfo (span , message );
1057
+ tagSessionAndTransactionInfo (span , operationContext );
1058
+
1059
+ return span ;
1060
+ }
1061
+
1062
+ private void tagServerAndConnectionInfo (final Span span , final CommandMessage message ) {
1063
+ span .tag (SERVER_ADDRESS , serverId .getAddress ().getHost ())
1064
+ .tag (SERVER_PORT , String .valueOf (serverId .getAddress ().getPort ()))
1065
+ .tag (SERVER_TYPE , message .getSettings ().getServerType ().name ())
1066
+ .tag (CLIENT_CONNECTION_ID , this .description .getConnectionId ().toString ())
1067
+ .tag (SERVER_CONNECTION_ID , String .valueOf (this .description .getConnectionId ().getServerValue ()));
1068
+ }
1069
+
1070
+ private void tagSessionAndTransactionInfo (final Span span , final OperationContext operationContext ) {
1071
+ SessionContext sessionContext = operationContext .getSessionContext ();
1072
+ if (sessionContext .hasSession () && !sessionContext .isImplicitSession ()) {
1073
+ span .tag (TRANSACTION_NUMBER , String .valueOf (sessionContext .getTransactionNumber ()))
1074
+ .tag (SESSION_ID , String .valueOf (sessionContext .getSessionId ()
1075
+ .get (sessionContext .getSessionId ().getFirstKey ())
1076
+ .asBinary ().asUuid ()));
1077
+ }
1078
+ }
1021
1079
}
0 commit comments