Skip to content

Commit dcc1572

Browse files
committed
Test: updated reactive syncadapter to use reactor
1 parent f67efcb commit dcc1572

File tree

9 files changed

+180
-489
lines changed

9 files changed

+180
-489
lines changed

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SingleResultSubscriber.java

Lines changed: 0 additions & 74 deletions
This file was deleted.

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncAggregateIterable.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020
import com.mongodb.lang.Nullable;
2121
import com.mongodb.reactivestreams.client.AggregatePublisher;
2222
import org.bson.conversions.Bson;
23+
import reactor.core.publisher.Mono;
2324

2425
import java.util.concurrent.TimeUnit;
2526

27+
import static com.mongodb.ClusterFixture.TIMEOUT_DURATION;
28+
2629
class SyncAggregateIterable<T> extends SyncMongoIterable<T> implements AggregateIterable<T> {
2730
private final AggregatePublisher<T> wrapped;
2831

@@ -33,9 +36,7 @@ class SyncAggregateIterable<T> extends SyncMongoIterable<T> implements Aggregate
3336

3437
@Override
3538
public void toCollection() {
36-
SingleResultSubscriber<Void> subscriber = new SingleResultSubscriber<>();
37-
wrapped.toCollection().subscribe(subscriber);
38-
subscriber.get();
39+
Mono.from(wrapped.toCollection()).block(TIMEOUT_DURATION);
3940
}
4041

4142
@Override

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncClientSession.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import com.mongodb.session.ServerSession;
2626
import org.bson.BsonDocument;
2727
import org.bson.BsonTimestamp;
28+
import reactor.core.publisher.Mono;
29+
30+
import static com.mongodb.ClusterFixture.TIMEOUT_DURATION;
2831

2932
class SyncClientSession implements ClientSession {
3033
private final com.mongodb.reactivestreams.client.ClientSession wrapped;
@@ -131,16 +134,12 @@ public void startTransaction(final TransactionOptions transactionOptions) {
131134

132135
@Override
133136
public void commitTransaction() {
134-
SingleResultSubscriber<Void> subscriber = new SingleResultSubscriber<>();
135-
wrapped.commitTransaction().subscribe(subscriber);
136-
subscriber.get();
137+
Mono.from(wrapped.commitTransaction()).block(TIMEOUT_DURATION);
137138
}
138139

139140
@Override
140141
public void abortTransaction() {
141-
SingleResultSubscriber<Void> subscriber = new SingleResultSubscriber<>();
142-
wrapped.abortTransaction().subscribe(subscriber);
143-
subscriber.get();
142+
Mono.from(wrapped.abortTransaction()).block(TIMEOUT_DURATION);
144143
}
145144

146145
@Override

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMapReduceIterable.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@
2222
import com.mongodb.lang.Nullable;
2323
import com.mongodb.reactivestreams.client.MapReducePublisher;
2424
import org.bson.conversions.Bson;
25+
import reactor.core.publisher.Mono;
2526

2627
import java.util.concurrent.TimeUnit;
2728

29+
import static com.mongodb.ClusterFixture.TIMEOUT_DURATION;
30+
2831
@SuppressWarnings("deprecation")
2932
class SyncMapReduceIterable<T> extends SyncMongoIterable<T> implements MapReduceIterable<T> {
3033
private final MapReducePublisher<T> wrapped;
@@ -36,9 +39,7 @@ class SyncMapReduceIterable<T> extends SyncMongoIterable<T> implements MapReduce
3639

3740
@Override
3841
public void toCollection() {
39-
SingleResultSubscriber<Void> subscriber = new SingleResultSubscriber<>();
40-
wrapped.toCollection().subscribe(subscriber);
41-
subscriber.get();
42+
Mono.from(wrapped.toCollection()).block(TIMEOUT_DURATION);
4243
}
4344

4445
@Override

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@
2626
import com.mongodb.connection.ClusterDescription;
2727
import org.bson.Document;
2828
import org.bson.conversions.Bson;
29+
import reactor.core.publisher.Mono;
2930

3031
import java.util.List;
3132

33+
import static com.mongodb.ClusterFixture.TIMEOUT_DURATION;
3234
import static java.util.Objects.requireNonNull;
3335

3436
public class SyncMongoClient implements MongoClient {
@@ -45,20 +47,12 @@ public MongoDatabase getDatabase(final String databaseName) {
4547

4648
@Override
4749
public ClientSession startSession() {
48-
SingleResultSubscriber<com.mongodb.reactivestreams.client.ClientSession> subscriber = createSubscriber();
49-
wrapped.startSession().subscribe(subscriber);
50-
return new SyncClientSession(requireNonNull(subscriber.get()), this);
51-
}
52-
53-
private <TResult> SingleResultSubscriber<TResult> createSubscriber() {
54-
return new SingleResultSubscriber<>();
50+
return new SyncClientSession(requireNonNull(Mono.from(wrapped.startSession()).block(TIMEOUT_DURATION)), this);
5551
}
5652

5753
@Override
5854
public ClientSession startSession(final ClientSessionOptions options) {
59-
SingleResultSubscriber<com.mongodb.reactivestreams.client.ClientSession> subscriber = createSubscriber();
60-
wrapped.startSession(options).subscribe(subscriber);
61-
return new SyncClientSession(requireNonNull(subscriber.get()), this);
55+
return new SyncClientSession(requireNonNull(Mono.from(wrapped.startSession(options)).block(TIMEOUT_DURATION)), this);
6256
}
6357

6458
@Override

0 commit comments

Comments
 (0)