Skip to content

Commit fa283e4

Browse files
committed
emulate waiting for all shards before emitting Oplog updates
artificial delay avoids 100% CPU usage when starting multiple ChangeStreams
1 parent 67cae1a commit fa283e4

File tree

2 files changed

+13
-1
lines changed

2 files changed

+13
-1
lines changed

core/src/main/java/de/bwaldvogel/mongo/oplog/OplogCursor.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package de.bwaldvogel.mongo.oplog;
22

33
import java.util.List;
4+
import java.util.concurrent.TimeUnit;
45
import java.util.function.Function;
56
import java.util.stream.Collectors;
67
import java.util.stream.Stream;
@@ -27,6 +28,8 @@ public boolean isEmpty() {
2728

2829
@Override
2930
public List<Document> takeDocuments(int numberToReturn) {
31+
emulateWaitingForAllShards();
32+
3033
Stream<Document> stream = oplogStream.apply(position);
3134

3235
if (numberToReturn > 0) {
@@ -38,6 +41,16 @@ public List<Document> takeDocuments(int numberToReturn) {
3841
return documents;
3942
}
4043

44+
private void emulateWaitingForAllShards() {
45+
try {
46+
// artificial delay to avoid 100% CPU usage when starting multiple ChangeStreams
47+
// emulates real ChangeStream behaviour of waiting for all shards to provide data
48+
TimeUnit.MILLISECONDS.sleep(100);
49+
} catch (InterruptedException e) {
50+
// ignore
51+
}
52+
}
53+
4154
OplogPosition getPosition() {
4255
return position;
4356
}

test-common/src/main/java/de/bwaldvogel/mongo/backend/AbstractOplogTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,6 @@ private static <T> T getSingleValue(TestSubscriber<T> subscriber) {
464464
}
465465

466466
@Test
467-
@Disabled
468467
public void testMultipleChangeStreams() throws InterruptedException {
469468
Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 1")))
470469
.test().awaitDone(5, TimeUnit.SECONDS).assertComplete();

0 commit comments

Comments
 (0)