Skip to content

Commit 951abf7

Browse files
committed
Either CommandFailed or CommandSucceeded event should be published, not both (#793)
This fixes a bug in the synchronous driver in which the driver sends a CommandSucceededEvent, followed by a CommandFailedEvent (for the same command) in the case were the server replied successfully but the Decoder configured to decode the response throws an exception. JAVA-4321
1 parent 8a3e4db commit 951abf7

File tree

2 files changed

+80
-16
lines changed

2 files changed

+80
-16
lines changed

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -315,16 +315,17 @@ public <T> T sendAndReceive(final CommandMessage message, final Decoder<T> decod
315315

316316
try {
317317
sendCommandMessage(message, bsonOutput, sessionContext);
318-
if (message.isResponseExpected()) {
319-
return receiveCommandMessageResponse(decoder, commandEventSender, sessionContext, 0);
320-
} else {
321-
commandEventSender.sendSucceededEventForOneWayCommand();
322-
return null;
323-
}
324318
} catch (RuntimeException e) {
325319
commandEventSender.sendFailedEvent(e);
326320
throw e;
327321
}
322+
323+
if (message.isResponseExpected()) {
324+
return receiveCommandMessageResponse(decoder, commandEventSender, sessionContext, 0);
325+
} else {
326+
commandEventSender.sendSucceededEventForOneWayCommand();
327+
return null;
328+
}
328329
}
329330

330331
@Override
@@ -341,7 +342,7 @@ public <T> void send(final CommandMessage message, final Decoder<T> decoder, fin
341342
@Override
342343
public <T> T receive(final Decoder<T> decoder, final SessionContext sessionContext) {
343344
isTrue("Response is expected", hasMoreToCome);
344-
return receiveCommandMessageResponse(decoder, null, sessionContext, 0);
345+
return receiveCommandMessageResponse(decoder, new NoOpCommandEventSender(), sessionContext, 0);
345346
}
346347

347348
@Override
@@ -352,7 +353,7 @@ public boolean supportsAdditionalTimeout() {
352353
@Override
353354
public <T> T receive(final Decoder<T> decoder, final SessionContext sessionContext, final int additionalTimeout) {
354355
isTrue("Response is expected", hasMoreToCome);
355-
return receiveCommandMessageResponse(decoder, null, sessionContext, additionalTimeout);
356+
return receiveCommandMessageResponse(decoder, new NoOpCommandEventSender(), sessionContext, additionalTimeout);
356357
}
357358

358359
@Override
@@ -392,27 +393,32 @@ private void sendCommandMessage(final CommandMessage message,
392393
private <T> T receiveCommandMessageResponse(final Decoder<T> decoder,
393394
final CommandEventSender commandEventSender, final SessionContext sessionContext,
394395
final int additionalTimeout) {
396+
boolean commandSuccessful = false;
395397
try (ResponseBuffers responseBuffers = receiveMessageWithAdditionalTimeout(additionalTimeout)) {
396398
updateSessionContext(sessionContext, responseBuffers);
397399
if (!isCommandOk(responseBuffers)) {
398-
throw getCommandFailureException(responseBuffers.getResponseDocument(responseTo, new BsonDocumentCodec()),
399-
description.getServerAddress());
400+
throw getCommandFailureException(responseBuffers.getResponseDocument(responseTo,
401+
new BsonDocumentCodec()), description.getServerAddress());
400402
}
401403

402-
if (commandEventSender != null) {
403-
commandEventSender.sendSucceededEvent(responseBuffers);
404-
}
404+
commandSuccessful = true;
405+
commandEventSender.sendSucceededEvent(responseBuffers);
405406

406407
T commandResult = getCommandResult(decoder, responseBuffers, responseTo);
407-
408408
hasMoreToCome = responseBuffers.getReplyHeader().hasMoreToCome();
409409
if (hasMoreToCome) {
410410
responseTo = responseBuffers.getReplyHeader().getRequestId();
411411
} else {
412412
responseTo = 0;
413413
}
414+
414415
return commandResult;
415-
}
416+
} catch (RuntimeException e) {
417+
if (!commandSuccessful) {
418+
commandEventSender.sendFailedEvent(e);
419+
}
420+
throw e;
421+
}
416422
}
417423

418424
@Override

driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.mongodb.internal.connection
1818

19-
2019
import com.mongodb.MongoCommandException
2120
import com.mongodb.MongoInternalException
2221
import com.mongodb.MongoNamespace
@@ -44,10 +43,13 @@ import com.mongodb.internal.session.SessionContext
4443
import com.mongodb.internal.validator.NoOpFieldNameValidator
4544
import org.bson.BsonDocument
4645
import org.bson.BsonInt32
46+
import org.bson.BsonReader
4747
import org.bson.BsonString
4848
import org.bson.ByteBuf
4949
import org.bson.ByteBufNIO
5050
import org.bson.codecs.BsonDocumentCodec
51+
import org.bson.codecs.DecoderContext
52+
import org.bson.codecs.configuration.CodecConfigurationException
5153
import spock.lang.Specification
5254

5355
import java.nio.ByteBuffer
@@ -504,6 +506,29 @@ class InternalStreamConnectionSpecification extends Specification {
504506
new BsonDocument('ok', new BsonInt32(1)), 1000)])
505507
}
506508

509+
def 'should send events for successful command with decoding error'() {
510+
given:
511+
def connection = getOpenedConnection()
512+
def pingCommandDocument = new BsonDocument('ping', new BsonInt32(1))
513+
def commandMessage = new CommandMessage(cmdNamespace, pingCommandDocument, fieldNameValidator, primary(), messageSettings, null)
514+
stream.getBuffer(1024) >> { new ByteBufNIO(ByteBuffer.wrap(new byte[1024])) }
515+
stream.read(16, 0) >> helper.defaultMessageHeader(commandMessage.getId())
516+
stream.read(90, 0) >> helper.defaultReply()
517+
518+
when:
519+
connection.sendAndReceive(commandMessage, {
520+
BsonReader reader, DecoderContext decoderContext -> throw new CodecConfigurationException('')
521+
}, NoOpSessionContext.INSTANCE, IgnorableRequestContext.INSTANCE)
522+
523+
then:
524+
thrown(CodecConfigurationException)
525+
commandListener.eventsWereDelivered([
526+
new CommandStartedEvent(1, connection.getDescription(), 'admin', 'ping',
527+
pingCommandDocument.append('$db', new BsonString('admin'))),
528+
new CommandSucceededEvent(1, connection.getDescription(), 'ping',
529+
new BsonDocument('ok', new BsonInt32(1)), 1000)])
530+
}
531+
507532
def 'should extract cluster and operation time into session context'() {
508533
given:
509534
def connection = getOpenedConnection()
@@ -747,6 +772,39 @@ class InternalStreamConnectionSpecification extends Specification {
747772
new BsonDocument('ok', new BsonInt32(1)), 1000)])
748773
}
749774

775+
def 'should send events for successful asynchronous command with decoding error'() {
776+
given:
777+
def connection = getOpenedConnection()
778+
def pingCommandDocument = new BsonDocument('ping', new BsonInt32(1))
779+
def commandMessage = new CommandMessage(cmdNamespace, pingCommandDocument, fieldNameValidator, primary(), messageSettings, null)
780+
def callback = new FutureResultCallback()
781+
782+
stream.getBuffer(1024) >> { new ByteBufNIO(ByteBuffer.wrap(new byte[1024])) }
783+
stream.writeAsync(_, _) >> { buffers, handler ->
784+
handler.completed(null)
785+
}
786+
stream.readAsync(16, _) >> { numBytes, handler ->
787+
handler.completed(helper.defaultMessageHeader(commandMessage.getId()))
788+
}
789+
stream.readAsync(90, _) >> { numBytes, handler ->
790+
handler.completed(helper.defaultReply())
791+
}
792+
793+
when:
794+
connection.sendAndReceiveAsync(commandMessage, {
795+
BsonReader reader, DecoderContext decoderContext -> throw new CodecConfigurationException('')
796+
}, NoOpSessionContext.INSTANCE, IgnorableRequestContext.INSTANCE, callback)
797+
callback.get()
798+
799+
then:
800+
thrown(CodecConfigurationException)
801+
commandListener.eventsWereDelivered([
802+
new CommandStartedEvent(1, connection.getDescription(), 'admin', 'ping',
803+
pingCommandDocument.append('$db', new BsonString('admin'))),
804+
new CommandSucceededEvent(1, connection.getDescription(), 'ping',
805+
new BsonDocument('ok', new BsonInt32(1)), 1000)])
806+
}
807+
750808

751809
def 'should send events for asynchronous command failure with exception writing message'() {
752810
given:

0 commit comments

Comments
 (0)