Skip to content

Commit 4d9c1a6

Browse files
committed
Message size for send message event must be calculated before the message is written. Otherwise, ByteBuf.remaining() returns 0.
JAVA-1743
1 parent a4f68ed commit 4d9c1a6

File tree

2 files changed

+51
-16
lines changed

2 files changed

+51
-16
lines changed

driver-core/src/main/com/mongodb/connection/InternalStreamConnection.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,9 @@ public void sendMessage(final List<ByteBuf> byteBuffers, final int lastRequestId
197197

198198
writerLock.lock();
199199
try {
200+
int messageSize = getMessageSize(byteBuffers);
200201
stream.write(byteBuffers);
201-
connectionListener.messagesSent(new ConnectionMessagesSentEvent(getId(), lastRequestId, getTotalRemaining(byteBuffers)));
202+
connectionListener.messagesSent(new ConnectionMessagesSentEvent(getId(), lastRequestId, messageSize));
202203
} catch (Exception e) {
203204
close();
204205
throw translateWriteException(e);
@@ -289,6 +290,7 @@ public void sendMessageAsync(final List<ByteBuf> byteBuffers, final int lastRequ
289290
}
290291

291292
private void writeAsync(final SendMessageRequest request) {
293+
final int messageSize = getMessageSize(request.getByteBuffers());
292294
stream.writeAsync(request.getByteBuffers(), new AsyncCompletionHandler<Void>() {
293295
@Override
294296
public void completed(final Void v) {
@@ -303,8 +305,7 @@ public void completed(final Void v) {
303305
writerLock.unlock();
304306
}
305307

306-
connectionListener.messagesSent(new ConnectionMessagesSentEvent(getId(), request.getMessageId(),
307-
getTotalRemaining(request.getByteBuffers())));
308+
connectionListener.messagesSent(new ConnectionMessagesSentEvent(getId(), request.getMessageId(), messageSize));
308309
request.getCallback().onResult(null, null);
309310

310311
if (nextMessage != null) {
@@ -579,7 +580,7 @@ public void onResult(final ByteBuf result, final Throwable t) {
579580
}
580581
}
581582

582-
private int getTotalRemaining(final List<ByteBuf> byteBuffers) {
583+
private int getMessageSize(final List<ByteBuf> byteBuffers) {
583584
int messageSize = 0;
584585
for (final ByteBuf cur : byteBuffers) {
585586
messageSize += cur.remaining();

driver-core/src/test/unit/com/mongodb/connection/InternalStreamConnectionSpecification.groovy

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import com.mongodb.ServerAddress
2727
import com.mongodb.async.FutureResultCallback
2828
import com.mongodb.async.SingleResultCallback
2929
import com.mongodb.event.ConnectionListener
30+
import com.mongodb.event.ConnectionMessageReceivedEvent
31+
import com.mongodb.event.ConnectionMessagesSentEvent
3032
import org.bson.BsonBinaryWriter
3133
import org.bson.BsonDocument
3234
import org.bson.BsonInt32
@@ -50,6 +52,7 @@ import java.util.concurrent.ExecutorService
5052
import java.util.concurrent.Executors
5153

5254
import static MongoNamespace.COMMAND_COLLECTION_NAME
55+
import static com.mongodb.CustomMatchers.compare
5356
import static com.mongodb.connection.ConnectionDescription.getDefaultMaxMessageSize
5457
import static com.mongodb.connection.ConnectionDescription.getDefaultMaxWriteBatchSize
5558
import static com.mongodb.connection.ServerDescription.getDefaultMaxDocumentSize
@@ -61,7 +64,9 @@ class InternalStreamConnectionSpecification extends Specification {
6164

6265
def helper = new StreamHelper()
6366
def serverAddress = new ServerAddress()
64-
def connectionDescription = new ConnectionDescription(new ConnectionId(SERVER_ID, 1, 1), new ServerVersion(), ServerType.STANDALONE,
67+
def connectionId = new ConnectionId(SERVER_ID, 1, 1)
68+
69+
def connectionDescription = new ConnectionDescription(connectionId, new ServerVersion(), ServerType.STANDALONE,
6570
getDefaultMaxWriteBatchSize(), getDefaultMaxDocumentSize(),
6671
getDefaultMaxMessageSize())
6772
def stream = Mock(Stream) {
@@ -106,23 +111,31 @@ class InternalStreamConnectionSpecification extends Specification {
106111
given:
107112
def connection = getOpenedConnection()
108113
def (buffers1, messageId1) = helper.isMaster()
109-
stream.read(_) >>> helper.read([messageId1])
114+
def messageSize = helper.remaining(buffers1)
115+
stream.write(_) >> {
116+
helper.write(buffers1)
117+
}
110118
when:
111119
connection.sendMessage(buffers1, messageId1)
112120

121+
113122
then:
114-
1 * listener.messagesSent(_)
123+
1 * listener.messagesSent {
124+
compare(new ConnectionMessagesSentEvent(connectionId, messageId1, messageSize), it)
125+
}
115126
}
116127

117128
@Category(Async)
118129
@IgnoreIf({ javaVersion < 1.7 })
119130
def 'should fire message sent event asynchronously'() {
120-
stream.writeAsync(_, _) >> { List<ByteBuf> buffers, AsyncCompletionHandler<Void> callback ->
121-
callback.completed(null)
122-
}
123131
def (buffers1, messageId1) = helper.isMaster()
132+
def messageSize = helper.remaining(buffers1)
124133
def connection = getOpenedConnection()
125134
def latch = new CountDownLatch(1);
135+
stream.writeAsync(_, _) >> { List<ByteBuf> buffers, AsyncCompletionHandler<Void> callback ->
136+
helper.write(buffers1)
137+
callback.completed(null)
138+
}
126139

127140
when:
128141
connection.sendMessageAsync(buffers1, messageId1, new SingleResultCallback<Void>() {
@@ -134,7 +147,9 @@ class InternalStreamConnectionSpecification extends Specification {
134147
latch.await()
135148

136149
then:
137-
1 * listener.messagesSent(_)
150+
1 * listener.messagesSent {
151+
compare(new ConnectionMessagesSentEvent(connectionId, messageId1, messageSize), it)
152+
}
138153
}
139154

140155
def 'should fire message received event'() {
@@ -147,7 +162,9 @@ class InternalStreamConnectionSpecification extends Specification {
147162
connection.receiveMessage(messageId1)
148163

149164
then:
150-
1 * listener.messageReceived(_)
165+
1 * listener.messageReceived {
166+
compare(new ConnectionMessageReceivedEvent(connectionId, messageId1, 110), it)
167+
}
151168
}
152169

153170
@Category(Async)
@@ -175,7 +192,9 @@ class InternalStreamConnectionSpecification extends Specification {
175192
latch.await()
176193

177194
then:
178-
1 * listener.messageReceived(_)
195+
1 * listener.messageReceived {
196+
compare(new ConnectionMessageReceivedEvent(connectionId, messageId1, 110), it)
197+
}
179198
}
180199

181200
def 'should change the connection description when opened'() {
@@ -653,6 +672,21 @@ class InternalStreamConnectionSpecification extends Specification {
653672
class StreamHelper {
654673
int nextMessageId = 900000 // Generates a message then adds one to the id
655674

675+
def remaining(List<ByteBuf> buffers) {
676+
int remaining = 0
677+
buffers.each {
678+
remaining += it.remaining()
679+
}
680+
remaining
681+
}
682+
683+
def write(List<ByteBuf> buffers) {
684+
buffers.each {
685+
it.get(new byte[it.remaining()])
686+
}
687+
}
688+
689+
656690
def read(List<Integer> messageIds) {
657691
read(messageIds, true)
658692
}
@@ -706,10 +740,10 @@ class InternalStreamConnectionSpecification extends Specification {
706740
def command = new CommandMessage(new MongoNamespace('admin', COMMAND_COLLECTION_NAME).getFullName(),
707741
new BsonDocument('ismaster', new BsonInt32(1)),
708742
false, MessageSettings.builder().build());
709-
OutputBuffer buffer = new BasicOutputBuffer();
710-
command.encode(buffer);
743+
OutputBuffer outputBuffer = new BasicOutputBuffer();
744+
command.encode(outputBuffer);
711745
nextMessageId++
712-
[buffer.byteBuffers, nextMessageId]
746+
[outputBuffer.byteBuffers, nextMessageId]
713747
}
714748

715749
def isMasterAsync() {

0 commit comments

Comments
 (0)