Skip to content

Commit 3196681

Browse files
stIncMalerozza
andauthored
Propagate the batch size in ChangeStreamPublisherImpl.withDocumentClass (#664)
Propagate the batch size in `ChangeStreamPublisherImpl.withDocumentClass` JAVA-4010 Co-authored-by: Ross Lawley <[email protected]>
1 parent 96d59ac commit 3196681

File tree

3 files changed

+12
-3
lines changed

3 files changed

+12
-3
lines changed

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,14 @@ abstract class BatchCursorPublisher<T> implements Publisher<T> {
3838
private Integer batchSize;
3939

4040
BatchCursorPublisher(@Nullable final ClientSession clientSession, final MongoOperationPublisher<T> mongoOperationPublisher) {
41+
this(clientSession, mongoOperationPublisher, null);
42+
}
43+
44+
BatchCursorPublisher(@Nullable final ClientSession clientSession, final MongoOperationPublisher<T> mongoOperationPublisher,
45+
@Nullable final Integer batchSize) {
4146
this.clientSession = clientSession;
4247
this.mongoOperationPublisher = notNull("mongoOperationPublisher", mongoOperationPublisher);
48+
this.batchSize = batchSize;
4349
}
4450

4551
abstract AsyncReadOperation<AsyncBatchCursor<T>> asAsyncReadOperation(int initialBatchSize);

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ChangeStreamPublisherImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ public ChangeStreamPublisher<T> collation(@Nullable final Collation collation) {
112112

113113
@Override
114114
public <TDocument> Publisher<TDocument> withDocumentClass(final Class<TDocument> clazz) {
115-
return new BatchCursorPublisher<TDocument>(getClientSession(), getMongoOperationPublisher().withDocumentClass(clazz)) {
115+
return new BatchCursorPublisher<TDocument>(getClientSession(), getMongoOperationPublisher().withDocumentClass(clazz),
116+
getBatchSize()) {
116117
@Override
117118
AsyncReadOperation<AsyncBatchCursor<TDocument>> asAsyncReadOperation(final int initialBatchSize) {
118119
return createChangeStreamOperation(getMongoOperationPublisher().getCodecRegistry().get(clazz), initialBatchSize);

driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/ChangeStreamPublisherImplTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,15 @@ void shouldBuildTheExpectedOperationWhenSettingDocumentClass() {
8888
List<BsonDocument> pipeline = singletonList(BsonDocument.parse("{'$match': 1}"));
8989
TestOperationExecutor executor = createOperationExecutor(singletonList(getBatchCursor()));
9090

91+
int batchSize = 100;
9192
Publisher<BsonDocument> publisher = new ChangeStreamPublisherImpl<>(null, createMongoOperationPublisher(executor),
9293
Document.class, pipeline, ChangeStreamLevel.COLLECTION)
93-
.withDocumentClass(BsonDocument.class);
94+
.batchSize(batchSize)
95+
.withDocumentClass(BsonDocument.class);
9496

9597
ChangeStreamOperation<BsonDocument> expectedOperation =
9698
new ChangeStreamOperation<>(NAMESPACE, FullDocument.DEFAULT, pipeline, getDefaultCodecRegistry().get(BsonDocument.class))
97-
.batchSize(Integer.MAX_VALUE)
99+
.batchSize(batchSize)
98100
.retryReads(true);
99101

100102
// default input should be as expected

0 commit comments

Comments
 (0)