Skip to content

Commit 5012ffd

Browse files
authored
Merge pull request #815 from scalecube/remove-try-emit
Remove try emit
2 parents d129bcf + b7979fe commit 5012ffd

File tree

4 files changed

+35
-35
lines changed

4 files changed

+35
-35
lines changed

services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import reactor.core.publisher.Mono;
3838
import reactor.core.publisher.SignalType;
3939
import reactor.core.publisher.Sinks;
40-
import reactor.core.publisher.Sinks.EmitFailureHandler;
4140
import reactor.core.publisher.Sinks.EmitResult;
4241

4342
public final class ScalecubeServiceDiscovery implements ServiceDiscovery {
@@ -173,13 +172,13 @@ public Mono<Void> shutdown() {
173172
return Mono.defer(
174173
() -> {
175174
if (cluster == null) {
176-
sink.emitComplete(RetryEmitFailureHandler.INSTANCE);
175+
sink.emitComplete(EmitFailureHandler.RETRY_NOT_SERIALIZED);
177176
return Mono.empty();
178177
}
179178
cluster.shutdown();
180179
return cluster
181180
.onShutdown()
182-
.doFinally(s -> sink.emitComplete(RetryEmitFailureHandler.INSTANCE));
181+
.doFinally(s -> sink.emitComplete(EmitFailureHandler.RETRY_NOT_SERIALIZED));
183182
});
184183
}
185184

@@ -196,7 +195,7 @@ private void onMembershipEvent(MembershipEvent membershipEvent) {
196195

197196
if (discoveryEvent != null) {
198197
LOGGER.debug("Publish discoveryEvent: {}", discoveryEvent);
199-
sink.emitNext(discoveryEvent, RetryEmitFailureHandler.INSTANCE);
198+
sink.emitNext(discoveryEvent, EmitFailureHandler.RETRY_NOT_SERIALIZED);
200199
}
201200
}
202201

@@ -290,9 +289,9 @@ private void onDiscoveryEvent(ServiceDiscoveryEvent event) {
290289
}
291290
}
292291

293-
private static class RetryEmitFailureHandler implements EmitFailureHandler {
292+
private static class EmitFailureHandler implements Sinks.EmitFailureHandler {
294293

295-
private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();
294+
private static final EmitFailureHandler RETRY_NOT_SERIALIZED = new EmitFailureHandler();
296295

297296
@Override
298297
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {

services/src/main/java/io/scalecube/services/Microservices.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
import reactor.core.publisher.Mono;
6262
import reactor.core.publisher.SignalType;
6363
import reactor.core.publisher.Sinks;
64-
import reactor.core.publisher.Sinks.EmitFailureHandler;
6564
import reactor.core.publisher.Sinks.EmitResult;
6665
import reactor.core.scheduler.Scheduler;
6766
import reactor.core.scheduler.Schedulers;
@@ -172,7 +171,7 @@ private Microservices(Builder builder) {
172171
shutdown
173172
.asMono()
174173
.then(doShutdown())
175-
.doFinally(s -> onShutdown.emitEmpty(RetryEmitFailureHandler.INSTANCE))
174+
.doFinally(s -> onShutdown.emitEmpty(EmitFailureHandler.RETRY_NOT_SERIALIZED))
176175
.subscribe(
177176
null, ex -> LOGGER.warn("[{}][doShutdown] Exception occurred: {}", id, ex.toString()));
178177
}
@@ -351,7 +350,7 @@ public Flux<ServiceDiscoveryEvent> listenDiscovery() {
351350
public Mono<Void> shutdown() {
352351
return Mono.defer(
353352
() -> {
354-
shutdown.emitEmpty(RetryEmitFailureHandler.INSTANCE);
353+
shutdown.emitEmpty(EmitFailureHandler.RETRY_NOT_SERIALIZED);
355354
return onShutdown.asMono();
356355
});
357356
}
@@ -650,7 +649,7 @@ private Mono<? extends Void> start0(String id, ServiceDiscovery discovery) {
650649
.subscribeOn(scheduler)
651650
.publishOn(scheduler)
652651
.doOnNext(event -> onDiscoveryEvent(microservices, event))
653-
.doOnNext(event -> sink.emitNext(event, RetryEmitFailureHandler.INSTANCE))
652+
.doOnNext(event -> sink.emitNext(event, EmitFailureHandler.RETRY_NOT_SERIALIZED))
654653
.subscribe());
655654

656655
return Mono.deferContextual(context -> discovery.start())
@@ -674,7 +673,7 @@ public Mono<Void> shutdown() {
674673
return Mono.defer(
675674
() -> {
676675
disposables.dispose();
677-
sink.emitComplete(RetryEmitFailureHandler.INSTANCE);
676+
sink.emitComplete(EmitFailureHandler.RETRY_NOT_SERIALIZED);
678677
return Mono.whenDelayError(
679678
discoveryInstances.values().stream()
680679
.map(ServiceDiscovery::shutdown)
@@ -911,9 +910,9 @@ private static String asString(ServiceInfo serviceInfo) {
911910
}
912911
}
913912

914-
private static class RetryEmitFailureHandler implements EmitFailureHandler {
913+
private static class EmitFailureHandler implements Sinks.EmitFailureHandler {
915914

916-
private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();
915+
private static final EmitFailureHandler RETRY_NOT_SERIALIZED = new EmitFailureHandler();
917916

918917
@Override
919918
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {

services/src/test/java/io/scalecube/services/ServiceLocalTest.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static org.junit.jupiter.api.Assertions.assertEquals;
44
import static org.junit.jupiter.api.Assertions.assertThrows;
55
import static org.junit.jupiter.api.Assertions.assertTrue;
6+
import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
67

78
import io.scalecube.services.api.ServiceMessage;
89
import io.scalecube.services.sut.GreetingRequest;
@@ -292,8 +293,8 @@ public void test_local_bidi_greeting_expect_NotAuthorized() {
292293

293294
// call the service.
294295

295-
requests.tryEmitNext(new GreetingRequest("joe-1"));
296-
requests.tryEmitComplete();
296+
requests.emitNext(new GreetingRequest("joe-1"), FAIL_FAST);
297+
requests.emitComplete(FAIL_FAST);
297298

298299
StepVerifier.create(responses)
299300
.expectErrorMessage("Not authorized")
@@ -318,7 +319,7 @@ public void test_local_bidi_greeting_message_expect_NotAuthorized() {
318319
.map(ServiceMessage::data);
319320

320321
StepVerifier.create(responses)
321-
.then(() -> requests.tryEmitNext(new GreetingRequest("joe-1")))
322+
.then(() -> requests.emitNext(new GreetingRequest("joe-1"), FAIL_FAST))
322323
.expectErrorMessage("Not authorized")
323324
.verify(Duration.ofSeconds(3));
324325
}
@@ -332,10 +333,10 @@ public void test_local_bidi_greeting_expect_GreetingResponse() {
332333

333334
// call the service.
334335

335-
requests.tryEmitNext(new GreetingRequest("joe-1"));
336-
requests.tryEmitNext(new GreetingRequest("joe-2"));
337-
requests.tryEmitNext(new GreetingRequest("joe-3"));
338-
requests.tryEmitComplete();
336+
requests.emitNext(new GreetingRequest("joe-1"), FAIL_FAST);
337+
requests.emitNext(new GreetingRequest("joe-2"), FAIL_FAST);
338+
requests.emitNext(new GreetingRequest("joe-3"), FAIL_FAST);
339+
requests.emitComplete(FAIL_FAST);
339340

340341
// call the service.
341342
Flux<GreetingResponse> responses =
@@ -366,13 +367,13 @@ public void test_local_bidi_greeting_expect_message_GreetingResponse() {
366367
.map(ServiceMessage::data);
367368

368369
StepVerifier.create(responses)
369-
.then(() -> requests.tryEmitNext(new GreetingRequest("joe-1")))
370+
.then(() -> requests.emitNext(new GreetingRequest("joe-1"), FAIL_FAST))
370371
.expectNextMatches(resp -> resp.getResult().equals(" hello to: joe-1"))
371-
.then(() -> requests.tryEmitNext(new GreetingRequest("joe-2")))
372+
.then(() -> requests.emitNext(new GreetingRequest("joe-2"), FAIL_FAST))
372373
.expectNextMatches(resp -> resp.getResult().equals(" hello to: joe-2"))
373-
.then(() -> requests.tryEmitNext(new GreetingRequest("joe-3")))
374+
.then(() -> requests.emitNext(new GreetingRequest("joe-3"), FAIL_FAST))
374375
.expectNextMatches(resp -> resp.getResult().equals(" hello to: joe-3"))
375-
.then(requests::tryEmitComplete)
376+
.then(() -> requests.emitComplete(FAIL_FAST))
376377
.expectComplete()
377378
.verify(Duration.ofSeconds(3));
378379
}

services/src/test/java/io/scalecube/services/ServiceRemoteTest.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static org.junit.jupiter.api.Assertions.assertEquals;
44
import static org.junit.jupiter.api.Assertions.assertThrows;
55
import static org.junit.jupiter.api.Assertions.assertTrue;
6+
import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
67

78
import io.scalecube.net.Address;
89
import io.scalecube.services.api.ServiceMessage;
@@ -404,8 +405,8 @@ public void test_remote_bidi_greeting_expect_NotAuthorized() {
404405

405406
// call the service.
406407

407-
requests.tryEmitNext(new GreetingRequest("joe-1"));
408-
requests.tryEmitComplete();
408+
requests.emitNext(new GreetingRequest("joe-1"), FAIL_FAST);
409+
requests.emitComplete(FAIL_FAST);
409410

410411
StepVerifier.create(responses)
411412
.expectErrorMessage("Not authorized")
@@ -431,7 +432,7 @@ public void test_remote_bidi_greeting_message_expect_NotAuthorized() {
431432
.map(ServiceMessage::data);
432433

433434
StepVerifier.create(responses)
434-
.then(() -> requests.tryEmitNext(new GreetingRequest("joe-1")))
435+
.then(() -> requests.emitNext(new GreetingRequest("joe-1"), FAIL_FAST))
435436
.expectErrorMessage("Not authorized")
436437
.verify(Duration.ofSeconds(3));
437438
}
@@ -446,10 +447,10 @@ public void test_remote_bidi_greeting_expect_GreetingResponse() {
446447

447448
// call the service.
448449

449-
requests.tryEmitNext(new GreetingRequest("joe-1"));
450-
requests.tryEmitNext(new GreetingRequest("joe-2"));
451-
requests.tryEmitNext(new GreetingRequest("joe-3"));
452-
requests.tryEmitComplete();
450+
requests.emitNext(new GreetingRequest("joe-1"), FAIL_FAST);
451+
requests.emitNext(new GreetingRequest("joe-2"), FAIL_FAST);
452+
requests.emitNext(new GreetingRequest("joe-3"), FAIL_FAST);
453+
requests.emitComplete(FAIL_FAST);
453454

454455
// call the service.
455456
Flux<GreetingResponse> responses =
@@ -482,13 +483,13 @@ public void test_remote_bidi_greeting_message_expect_GreetingResponse() {
482483
.map(ServiceMessage::data);
483484

484485
StepVerifier.create(responses)
485-
.then(() -> requests.tryEmitNext(new GreetingRequest("joe-1")))
486+
.then(() -> requests.emitNext(new GreetingRequest("joe-1"), FAIL_FAST))
486487
.expectNextMatches(resp -> resp.getResult().equals(" hello to: joe-1"))
487-
.then(() -> requests.tryEmitNext(new GreetingRequest("joe-2")))
488+
.then(() -> requests.emitNext(new GreetingRequest("joe-2"), FAIL_FAST))
488489
.expectNextMatches(resp -> resp.getResult().equals(" hello to: joe-2"))
489-
.then(() -> requests.tryEmitNext(new GreetingRequest("joe-3")))
490+
.then(() -> requests.emitNext(new GreetingRequest("joe-3"), FAIL_FAST))
490491
.expectNextMatches(resp -> resp.getResult().equals(" hello to: joe-3"))
491-
.then(requests::tryEmitComplete)
492+
.then(() -> requests.emitComplete(FAIL_FAST))
492493
.expectComplete()
493494
.verify(Duration.ofSeconds(3));
494495
}

0 commit comments

Comments
 (0)