Skip to content

Commit 988027b

Browse files
committed
Replace usages of RxJava2 in testing with Reactor Test
1 parent f4a24aa commit 988027b

37 files changed

+454
-710
lines changed

sdk/cosmos/azure-cosmos-encryption/pom.xml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -182,13 +182,6 @@ Licensed under the MIT License.
182182
<scope>test</scope>
183183
</dependency>
184184

185-
<dependency>
186-
<groupId>io.reactivex.rxjava2</groupId>
187-
<artifactId>rxjava</artifactId>
188-
<version>2.2.21</version> <!-- {x-version-update;io.reactivex.rxjava2:rxjava;external_dependency} -->
189-
<scope>test</scope>
190-
</dependency>
191-
192185
<dependency>
193186
<groupId>org.mockito</groupId>
194187
<artifactId>mockito-core</artifactId>

sdk/cosmos/azure-cosmos-tests/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,6 @@ Licensed under the MIT License.
184184
<version>3.7.11</version> <!-- {x-version-update;io.projectreactor:reactor-test;external_dependency} -->
185185
<scope>test</scope>
186186
</dependency>
187-
<dependency>
188-
<groupId>io.reactivex.rxjava2</groupId>
189-
<artifactId>rxjava</artifactId>
190-
<version>2.2.21</version> <!-- {x-version-update;io.reactivex.rxjava2:rxjava;external_dependency} -->
191-
<scope>test</scope>
192-
</dependency>
193187
<dependency>
194188
<groupId>org.mockito</groupId>
195189
<artifactId>mockito-core</artifactId>

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosMultiHashTest.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,21 @@
2424
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
2525
import com.fasterxml.jackson.databind.node.ObjectNode;
2626
import com.fasterxml.jackson.databind.node.TextNode;
27-
import io.reactivex.subscribers.TestSubscriber;
2827
import org.testng.Assert;
2928
import org.testng.annotations.AfterClass;
3029
import org.testng.annotations.BeforeClass;
3130
import org.testng.annotations.Factory;
3231
import org.testng.annotations.Test;
32+
import reactor.test.StepVerifier;
3333

34+
import java.time.Duration;
3435
import java.util.ArrayList;
3536
import java.util.List;
3637
import java.util.Map;
38+
import java.util.Objects;
3739
import java.util.UUID;
3840
import java.util.concurrent.ConcurrentHashMap;
39-
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicReference;
4042
import java.util.stream.Collectors;
4143

4244
import static org.assertj.core.api.Assertions.assertThat;
@@ -594,14 +596,14 @@ private void testPartialPKContinuationToken() {
594596
options.setMaxDegreeOfParallelism(2);
595597
CosmosPagedFlux<ObjectNode> queryObservable = cosmosAsyncContainer.queryItems(query, options, ObjectNode.class);
596598

597-
TestSubscriber<FeedResponse<ObjectNode>> testSubscriber = new TestSubscriber<>();
598-
queryObservable.byPage(requestContinuation, 1).subscribe(testSubscriber);
599-
testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS);
600-
testSubscriber.assertNoErrors();
601-
testSubscriber.assertComplete();
599+
AtomicReference<FeedResponse<ObjectNode>> value = new AtomicReference<>();
600+
StepVerifier.create(queryObservable.byPage(requestContinuation, 1))
601+
.assertNext(value::set)
602+
.thenConsumeWhile(Objects::nonNull)
603+
.expectComplete()
604+
.verify(Duration.ofMillis(TIMEOUT));
602605

603-
@SuppressWarnings("unchecked")
604-
FeedResponse<ObjectNode> firstPage = (FeedResponse<ObjectNode>) testSubscriber.getEvents().get(0).get(0);
606+
FeedResponse<ObjectNode> firstPage = value.get();
605607
requestContinuation = firstPage.getContinuationToken();
606608
receivedDocuments.addAll(firstPage.getResults());
607609
assertThat(firstPage.getResults().size()).isEqualTo(1);

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@
6161
import io.netty.buffer.ByteBufUtil;
6262
import io.netty.handler.codec.http.HttpMethod;
6363
import io.netty.util.ReferenceCountUtil;
64-
import io.reactivex.subscribers.TestSubscriber;
6564
import org.mockito.ArgumentMatchers;
6665
import org.mockito.Mockito;
6766
import org.testng.annotations.AfterClass;
6867
import org.testng.annotations.Test;
6968
import reactor.core.publisher.Mono;
69+
import reactor.test.StepVerifier;
7070

7171
import java.lang.reflect.Field;
7272
import java.net.URISyntaxException;
@@ -955,30 +955,24 @@ public void throttlingExceptionGatewayModeScenario() {
955955
}
956956

957957
private StoreResponse validateSuccess(Mono<StoreResponse> storeResponse) {
958-
TestSubscriber<StoreResponse> testSubscriber = new TestSubscriber<>();
959-
storeResponse.subscribe(testSubscriber);
960-
testSubscriber.awaitTerminalEvent(60000, TimeUnit.MILLISECONDS);
961-
testSubscriber.assertNoErrors();
962-
testSubscriber.assertComplete();
963-
testSubscriber.assertValueCount(1);
964-
return testSubscriber.values().get(0);
958+
AtomicReference<StoreResponse> value = new AtomicReference<>();
959+
StepVerifier.create(storeResponse)
960+
.assertNext(value::set)
961+
.expectComplete()
962+
.verify(Duration.ofMillis(60_000));
963+
964+
return value.get();
965965
}
966966

967967
private void validateFailure(Mono<StoreResponse> storeResponse) {
968-
TestSubscriber<StoreResponse> testSubscriber = new TestSubscriber<>();
969-
storeResponse.subscribe(testSubscriber);
970-
testSubscriber.awaitTerminalEvent(60000, TimeUnit.MILLISECONDS);
971-
testSubscriber.assertNotComplete();
972-
testSubscriber.assertTerminated();
968+
StepVerifier.create(storeResponse).expectError().verify(Duration.ofMillis(60_000));
973969
}
974970

975971
private void validateServiceResponseSuccess(Mono<ResourceResponse<Document>> documentServiceResponseMono) {
976-
TestSubscriber<ResourceResponse<Document>> testSubscriber = new TestSubscriber<>();
977-
documentServiceResponseMono.subscribe(testSubscriber);
978-
testSubscriber.awaitTerminalEvent(60000, TimeUnit.MILLISECONDS);
979-
testSubscriber.assertNoErrors();
980-
testSubscriber.assertComplete();
981-
testSubscriber.assertValueCount(1);
972+
StepVerifier.create(documentServiceResponseMono)
973+
.expectNextCount(1)
974+
.expectComplete()
975+
.verify(Duration.ofMillis(60_000));
982976
}
983977

984978
private static class TestRetryPolicy extends DocumentClientRetryPolicy {

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,26 @@
55

66
import com.azure.cosmos.BridgeInternal;
77
import com.azure.cosmos.CosmosException;
8-
import com.azure.cosmos.implementation.directconnectivity.WFConstants;
9-
import java.util.Map;
10-
import java.util.HashMap;
118
import com.azure.cosmos.ThrottlingRetryOptions;
12-
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker;
139
import com.azure.cosmos.implementation.directconnectivity.ChannelAcquisitionException;
14-
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
10+
import com.azure.cosmos.implementation.directconnectivity.WFConstants;
1511
import com.azure.cosmos.implementation.perPartitionAutomaticFailover.GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover;
12+
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker;
13+
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
1614
import io.netty.handler.timeout.ReadTimeoutException;
17-
import io.reactivex.subscribers.TestSubscriber;
1815
import org.mockito.Mockito;
1916
import org.testng.Assert;
2017
import org.testng.annotations.DataProvider;
2118
import org.testng.annotations.Test;
2219
import reactor.core.publisher.Mono;
20+
import reactor.test.StepVerifier;
2321

2422
import javax.net.ssl.SSLHandshakeException;
2523
import java.net.SocketException;
2624
import java.net.URI;
2725
import java.time.Duration;
28-
import java.util.concurrent.TimeUnit;
26+
import java.util.HashMap;
27+
import java.util.Map;
2928

3029
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
3130

@@ -687,13 +686,9 @@ public static void validateSuccess(Mono<ShouldRetryResult> single,
687686
public static void validateSuccess(Mono<ShouldRetryResult> single,
688687
ShouldRetryValidator validator,
689688
long timeout) {
690-
TestSubscriber<ShouldRetryResult> testSubscriber = new TestSubscriber<>();
691-
692-
single.flux().subscribe(testSubscriber);
693-
testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
694-
testSubscriber.assertComplete();
695-
testSubscriber.assertNoErrors();
696-
testSubscriber.assertValueCount(1);
697-
validator.validate(testSubscriber.values().get(0));
689+
StepVerifier.create(single)
690+
.assertNext(validator::validate)
691+
.expectComplete()
692+
.verify(Duration.ofMillis(timeout));
698693
}
699694
}

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RetryUtilsTest.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
77
import com.azure.cosmos.implementation.directconnectivity.StoreResponseValidator;
88
import com.fasterxml.jackson.databind.node.ObjectNode;
9-
import io.reactivex.subscribers.TestSubscriber;
109
import org.mockito.ArgumentMatchers;
1110
import org.mockito.Mockito;
1211
import org.mockito.invocation.InvocationOnMock;
@@ -15,14 +14,13 @@
1514
import org.testng.annotations.Test;
1615
import reactor.core.Exceptions;
1716
import reactor.core.publisher.Mono;
17+
import reactor.test.StepVerifier;
1818

1919
import java.time.Duration;
20-
import java.util.concurrent.TimeUnit;
2120
import java.util.function.Function;
2221

2322
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
2423
import static com.azure.cosmos.implementation.TestUtils.mockDocumentServiceRequest;
25-
import static org.assertj.core.api.Assertions.assertThat;
2624
import static org.assertj.core.api.Assertions.fail;
2725

2826
public class RetryUtilsTest {
@@ -109,26 +107,20 @@ public void toRetryWithAlternateFuncTestingMethodTwo() {
109107
}
110108

111109
private void validateFailure(Mono<StoreResponse> single, long timeout, Class<? extends Throwable> class1) {
112-
113-
TestSubscriber<StoreResponse> testSubscriber = new TestSubscriber<>();
114-
single.subscribe(testSubscriber);
115-
testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
116-
testSubscriber.assertNotComplete();
117-
testSubscriber.assertTerminated();
118-
assertThat(testSubscriber.errorCount()).isEqualTo(1);
119-
Throwable throwable = Exceptions.unwrap(testSubscriber.errors().get(0));
120-
if (!(throwable.getClass().equals(class1))) {
121-
fail("Not expecting " + testSubscriber.getEvents().get(1).get(0));
122-
}
110+
StepVerifier.create(single)
111+
.expectErrorSatisfies(thrown -> {
112+
Throwable throwable = Exceptions.unwrap(thrown);
113+
if (!(throwable.getClass().equals(class1))) {
114+
fail("Not expecting " + thrown);
115+
}
116+
}).verify(Duration.ofMillis(timeout));
123117
}
124118

125119
private void validateSuccess(Mono<StoreResponse> single, StoreResponseValidator validator, long timeout) {
126-
127-
TestSubscriber<StoreResponse> testSubscriber = new TestSubscriber<>();
128-
single.subscribe(testSubscriber);
129-
testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
130-
assertThat(testSubscriber.valueCount()).isEqualTo(1);
131-
validator.validate(testSubscriber.values().get(0));
120+
StepVerifier.create(single)
121+
.assertNext(validator::validate)
122+
.expectComplete()
123+
.verify(Duration.ofMillis(timeout));
132124
}
133125

134126
private void toggleMockFuncBtwFailureSuccess(

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import com.azure.cosmos.ConsistencyLevel;
77
import com.azure.cosmos.CosmosException;
8-
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker;
98
import com.azure.cosmos.implementation.directconnectivity.GatewayServiceConfigurationReader;
109
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
1110
import com.azure.cosmos.implementation.http.HttpClient;
@@ -14,17 +13,16 @@
1413
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
1514
import io.netty.channel.ConnectTimeoutException;
1615
import io.netty.handler.timeout.ReadTimeoutException;
17-
import io.reactivex.subscribers.TestSubscriber;
1816
import org.mockito.ArgumentCaptor;
1917
import org.mockito.Mockito;
2018
import org.testng.annotations.DataProvider;
2119
import org.testng.annotations.Test;
2220
import reactor.core.publisher.Mono;
21+
import reactor.test.StepVerifier;
2322

2423
import java.net.SocketException;
2524
import java.net.URI;
2625
import java.time.Duration;
27-
import java.util.concurrent.TimeUnit;
2826

2927
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
3028
import static org.assertj.core.api.Assertions.assertThat;
@@ -303,13 +301,9 @@ public void validateFailure(Mono<RxDocumentServiceResponse> observable,
303301
public static void validateFailure(Mono<RxDocumentServiceResponse> observable,
304302
FailureValidator validator,
305303
long timeout) {
306-
TestSubscriber<RxDocumentServiceResponse> testSubscriber = new TestSubscriber<>();
307-
observable.subscribe(testSubscriber);
308-
testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
309-
testSubscriber.assertNotComplete();
310-
testSubscriber.assertTerminated();
311-
assertThat(testSubscriber.errorCount()).isEqualTo(1);
312-
validator.validate(testSubscriber.errors().get(0));
304+
StepVerifier.create(observable)
305+
.expectErrorSatisfies(validator::validate)
306+
.verify(Duration.ofMillis(timeout));
313307
}
314308

315309
enum SessionTokenType {

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,18 @@
3131
import com.fasterxml.jackson.core.type.TypeReference;
3232
import com.fasterxml.jackson.databind.DeserializationFeature;
3333
import com.fasterxml.jackson.databind.ObjectMapper;
34-
import io.reactivex.subscribers.TestSubscriber;
3534
import org.apache.commons.lang3.ObjectUtils;
3635
import org.apache.commons.lang3.StringUtils;
3736
import org.mockito.Mockito;
3837
import org.mockito.stubbing.Answer;
39-
import org.testng.ITestContext;
4038
import org.testng.annotations.AfterSuite;
4139
import org.testng.annotations.BeforeSuite;
4240
import org.testng.annotations.DataProvider;
4341
import org.testng.annotations.Listeners;
4442
import reactor.core.publisher.Flux;
4543
import reactor.core.publisher.Mono;
4644
import reactor.core.scheduler.Schedulers;
45+
import reactor.test.StepVerifier;
4746

4847
import java.time.Duration;
4948
import java.util.ArrayList;
@@ -52,7 +51,6 @@
5251
import java.util.concurrent.TimeUnit;
5352
import java.util.stream.Collectors;
5453

55-
import static org.assertj.core.api.Assertions.assertThat;
5654
import static org.mockito.Mockito.doAnswer;
5755

5856
@Listeners({TestNGLogListener.class, CosmosNettyLeakDetectorFactory.class})
@@ -766,15 +764,10 @@ public <T extends Resource> void validateSuccess(Mono<ResourceResponse<T>> obser
766764

767765
public static <T extends Resource> void validateSuccess(Mono<ResourceResponse<T>> observable,
768766
ResourceResponseValidator<T> validator, long timeout) {
769-
770-
TestSubscriber<ResourceResponse<T>> testSubscriber = new TestSubscriber<>();
771-
772-
observable.subscribe(testSubscriber);
773-
testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
774-
testSubscriber.assertNoErrors();
775-
testSubscriber.assertComplete();
776-
testSubscriber.assertValueCount(1);
777-
validator.validate(testSubscriber.values().get(0));
767+
StepVerifier.create(observable)
768+
.assertNext(validator::validate)
769+
.expectComplete()
770+
.verify(Duration.ofMillis(timeout));
778771
}
779772

780773
public <T extends Resource> void validateFailure(Mono<ResourceResponse<T>> observable,
@@ -784,15 +777,9 @@ public <T extends Resource> void validateFailure(Mono<ResourceResponse<T>> obser
784777

785778
public static <T extends Resource> void validateFailure(Mono<ResourceResponse<T>> observable,
786779
FailureValidator validator, long timeout) {
787-
788-
TestSubscriber<ResourceResponse<T>> testSubscriber = new TestSubscriber<>();
789-
790-
observable.subscribe(testSubscriber);
791-
testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
792-
testSubscriber.assertNotComplete();
793-
testSubscriber.assertTerminated();
794-
assertThat(testSubscriber.errorCount()).isEqualTo(1);
795-
validator.validate((Throwable) testSubscriber.getEvents().get(1).get(0));
780+
StepVerifier.create(observable)
781+
.expectErrorSatisfies(validator::validate)
782+
.verify(Duration.ofMillis(timeout));
796783
}
797784

798785
public <T extends Resource> void validateQuerySuccess(Flux<FeedResponse<T>> observable,
@@ -802,14 +789,10 @@ public <T extends Resource> void validateQuerySuccess(Flux<FeedResponse<T>> obse
802789

803790
public static <T extends Resource> void validateQuerySuccess(Flux<FeedResponse<T>> observable,
804791
FeedResponseListValidator<T> validator, long timeout) {
805-
806-
TestSubscriber<FeedResponse<T>> testSubscriber = new TestSubscriber<>();
807-
808-
observable.subscribe(testSubscriber);
809-
testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
810-
testSubscriber.assertNoErrors();
811-
testSubscriber.assertComplete();
812-
validator.validate(testSubscriber.values());
792+
StepVerifier.create(observable.collectList())
793+
.assertNext(validator::validate)
794+
.expectComplete()
795+
.verify(Duration.ofMillis(timeout));
813796
}
814797

815798
public <T extends Resource> void validateQueryFailure(Flux<FeedResponse<T>> observable,
@@ -819,15 +802,9 @@ public <T extends Resource> void validateQueryFailure(Flux<FeedResponse<T>> obse
819802

820803
public static <T extends Resource> void validateQueryFailure(Flux<FeedResponse<T>> observable,
821804
FailureValidator validator, long timeout) {
822-
823-
TestSubscriber<FeedResponse<T>> testSubscriber = new TestSubscriber<>();
824-
825-
observable.subscribe(testSubscriber);
826-
testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
827-
testSubscriber.assertNotComplete();
828-
testSubscriber.assertTerminated();
829-
assertThat(testSubscriber.errorCount()).isEqualTo(1);
830-
validator.validate((Throwable) testSubscriber.getEvents().get(1).get(0));
805+
StepVerifier.create(observable)
806+
.expectErrorSatisfies(validator::validate)
807+
.verify(Duration.ofMillis(timeout));
831808
}
832809

833810
@DataProvider

0 commit comments

Comments
 (0)