|
7 | 7 | import static de.bwaldvogel.mongo.backend.TestUtils.json; |
8 | 8 | import static de.bwaldvogel.mongo.backend.TestUtils.toArray; |
9 | 9 | import static java.util.Collections.singletonList; |
| 10 | +import static org.assertj.core.groups.Tuple.tuple; |
10 | 11 | import static org.assertj.core.api.Assertions.assertThatExceptionOfType; |
11 | 12 |
|
12 | 13 | import java.time.Duration; |
13 | 14 | import java.time.Instant; |
| 15 | +import java.util.AbstractMap; |
14 | 16 | import java.util.ArrayList; |
15 | 17 | import java.util.Arrays; |
16 | 18 | import java.util.Date; |
17 | 19 | import java.util.List; |
| 20 | +import java.util.Map; |
18 | 21 | import java.util.NoSuchElementException; |
19 | 22 | import java.util.UUID; |
20 | 23 | import java.util.concurrent.TimeUnit; |
| 24 | +import java.util.stream.IntStream; |
21 | 25 |
|
| 26 | +import org.assertj.core.api.Assertions; |
22 | 27 | import org.bson.BsonDocument; |
23 | 28 | import org.bson.BsonInt32; |
24 | 29 | import org.bson.BsonTimestamp; |
|
40 | 45 | import com.mongodb.reactivestreams.client.Success; |
41 | 46 |
|
42 | 47 | import de.bwaldvogel.mongo.oplog.OperationType; |
| 48 | +import io.reactivex.Flowable; |
| 49 | +import io.reactivex.schedulers.Schedulers; |
43 | 50 | import io.reactivex.subscribers.TestSubscriber; |
44 | 51 |
|
45 | 52 | public abstract class AbstractOplogTest extends AbstractTest { |
@@ -456,4 +463,67 @@ private static <T> T getSingleValue(TestSubscriber<T> subscriber) { |
456 | 463 | return subscriber.values().get(0); |
457 | 464 | } |
458 | 465 |
|
| 466 | + @Test |
| 467 | + @Disabled |
| 468 | + public void testMultipleChangeStreams() throws InterruptedException { |
| 469 | + Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 1"))) |
| 470 | + .test().awaitDone(5, TimeUnit.SECONDS).assertComplete(); |
| 471 | + |
| 472 | + final int changeStreamCount = 32; |
| 473 | + |
| 474 | + List<Bson> pipeline = singletonList(match(Filters.eq("fullDocument.bu", "abc"))); |
| 475 | + |
| 476 | + final TestSubscriber<Map<Integer, List<ChangeStreamDocument<Document>>>> streamSubscriber |
| 477 | + = new TestSubscriber<>(); |
| 478 | + |
| 479 | + Flowable.range(1, changeStreamCount) |
| 480 | + .flatMapSingle(index -> { |
| 481 | + return Flowable.fromPublisher(asyncCollection.watch(pipeline)) |
| 482 | + .take(2) |
| 483 | + .toList() |
| 484 | + .map(changeStreamDocuments -> { |
| 485 | + return new AbstractMap.SimpleEntry<>(index, changeStreamDocuments); |
| 486 | + }) |
| 487 | + .subscribeOn(Schedulers.io()); // subscribe to change streams concurrently |
| 488 | + }) |
| 489 | + .toMap(Map.Entry::getKey, Map.Entry::getValue) |
| 490 | + .toFlowable() |
| 491 | + .subscribe(streamSubscriber); |
| 492 | + |
| 493 | + // give time for all ChangeStream Publishers to be subscribed to |
| 494 | + // todo: expose API to get cursors from Backend and wait until 'changeStreamCount' cursors |
| 495 | + TimeUnit.SECONDS.sleep(5); |
| 496 | + |
| 497 | + Flowable.concat( |
| 498 | + Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 2, bu: 'abc'"))), |
| 499 | + Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 3, bu: 'xyz'"))), |
| 500 | + Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 4, bu: 'abc'"))) |
| 501 | + ).test().awaitDone(15, TimeUnit.SECONDS).assertComplete(); |
| 502 | + |
| 503 | + final Map<Integer, List<ChangeStreamDocument<Document>>> results = streamSubscriber |
| 504 | + .awaitDone(30, TimeUnit.SECONDS) |
| 505 | + .assertComplete() |
| 506 | + .assertValueCount(1) |
| 507 | + .values().get(0); |
| 508 | + |
| 509 | + Assertions.assertThat(IntStream.rangeClosed(1, changeStreamCount)) |
| 510 | + .allSatisfy(index -> { |
| 511 | + Assertions.assertThat(results).containsKey(index); |
| 512 | + |
| 513 | + final List<ChangeStreamDocument<Document>> emits = results.get(index); |
| 514 | + Assertions.assertThat(emits).isNotNull() |
| 515 | + .extracting( |
| 516 | + document -> { |
| 517 | + return document.getDocumentKey().getInt32("_id").getValue(); |
| 518 | + }, |
| 519 | + document -> { |
| 520 | + return document.getFullDocument() != null |
| 521 | + ? document.getFullDocument().getString("bu") |
| 522 | + : null; |
| 523 | + } |
| 524 | + ) |
| 525 | + .containsExactly(tuple(2, "abc"), tuple(4, "abc")); |
| 526 | + }); |
| 527 | + } |
| 528 | + |
459 | 529 | } |
0 commit comments