Skip to content

Commit 8e62c8a

Browse files
committed
Test: Race fixes
* Fix race condition in change stream tests where close can async call kill cursors but returns immediately. * Fix CommandMessage race where the server monitor may advance the global currentId.
1 parent 9065102 commit 8e62c8a

File tree

3 files changed

+12
-8
lines changed

3 files changed

+12
-8
lines changed

driver-core/src/test/functional/com/mongodb/operation/AggregateOperationSpecification.groovy

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import com.mongodb.binding.ReadBinding
3131
import com.mongodb.client.model.Collation
3232
import com.mongodb.client.model.CreateCollectionOptions
3333
import com.mongodb.client.model.Filters
34+
import com.mongodb.client.test.CollectionHelper
3435
import com.mongodb.connection.AsyncConnection
3536
import com.mongodb.connection.ClusterId
3637
import com.mongodb.connection.Connection
@@ -195,15 +196,14 @@ class AggregateOperationSpecification extends OperationFunctionalSpecification {
195196
def 'should support changeStreams'() {
196197
given:
197198
def expected = [createExpectedChangeNotification(namespace, 0), createExpectedChangeNotification(namespace, 1)]
198-
199199
def pipeline = ['{$changeStream: {}}', '{$project: {"_id.clusterTime": 0, "_id.uuid": 0}}'].collect { BsonDocument.parse(it) }
200200
def operation = new AggregateOperation<BsonDocument>(namespace, pipeline, new BsonDocumentCodec())
201+
def helper = getCollectionHelper()
201202

202203
when:
203-
getCollectionHelper().drop()
204-
getCollectionHelper().create(getCollectionHelper().getNamespace().getCollectionName(), new CreateCollectionOptions())
204+
helper.create(helper.getNamespace().getCollectionName(), new CreateCollectionOptions())
205205
def cursor = execute(operation, async)
206-
getCollectionHelper().insertDocuments(['{_id: 0, a: 0}', '{_id: 1, a: 1}'].collect { BsonDocument.parse(it) })
206+
helper.insertDocuments(['{_id: 0, a: 0}', '{_id: 1, a: 1}'].collect { BsonDocument.parse(it) })
207207

208208
then:
209209
def next = next(cursor, async).collect { doc ->
@@ -214,6 +214,7 @@ class AggregateOperationSpecification extends OperationFunctionalSpecification {
214214

215215
cleanup:
216216
cursor?.close()
217+
helper?.drop()
217218

218219
where:
219220
async << [true, false]

driver-core/src/test/functional/com/mongodb/operation/ChangeStreamOperationSpecification.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
133133
}
134134

135135
cleanup:
136-
//cursor?.close()
136+
cursor?.close()
137+
helper?.drop()
137138

138139
where:
139140
async << [true, false]

driver-core/src/test/unit/com/mongodb/connection/CommandMessageSpecification.groovy

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class CommandMessageSpecification extends Specification {
5757
def byteBuf = new ByteBufNIO(ByteBuffer.wrap(output.toByteArray()))
5858
def messageHeader = new MessageHeader(byteBuf, 512)
5959
messageHeader.opCode == OpCode.OP_MSG.value
60-
messageHeader.requestId == RequestMessage.currentGlobalId - 1
60+
messageHeader.requestId < RequestMessage.currentGlobalId
6161
messageHeader.responseTo == 0
6262

6363
def expectedCommandDocument = command.clone()
@@ -167,13 +167,14 @@ class CommandMessageSpecification extends Specification {
167167

168168
then:
169169
messageHeader.opCode == OpCode.OP_MSG.value
170-
messageHeader.requestId == RequestMessage.currentGlobalId - 1
170+
messageHeader.requestId < RequestMessage.currentGlobalId
171171
messageHeader.responseTo == 0
172172
byteBuf.getInt() == 0
173173
payload.getPosition() == pos1
174174
payload.hasAnotherSplit()
175175

176176
when:
177+
def initialRequestId = messageHeader.requestId
177178
payload = payload.getNextSplit()
178179
message = new CommandMessage(namespace, command, fieldNameValidator, ReadPreference.primary(), messageSettings,
179180
false, payload, fieldNameValidator)
@@ -184,7 +185,8 @@ class CommandMessageSpecification extends Specification {
184185

185186
then:
186187
messageHeader.opCode == OpCode.OP_MSG.value
187-
messageHeader.requestId == RequestMessage.currentGlobalId - 1
188+
messageHeader.requestId < RequestMessage.currentGlobalId
189+
messageHeader.requestId > initialRequestId
188190
messageHeader.responseTo == 0
189191
byteBuf.getInt() == 1 << 1
190192
payload.getPosition() == pos2

0 commit comments

Comments
 (0)