Skip to content
This repository was archived by the owner on May 4, 2019. It is now read-only.

Commit ab7ca63

Browse files
committed
added delay to retry for streamBrokerEvents, and changed the error message
1 parent 628b034 commit ab7ca63

File tree

1 file changed

+38
-8
lines changed

1 file changed

+38
-8
lines changed

client/src/main/java/io/netifi/proteus/DefaultProteusBrokerService.java

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,19 @@
2222
import io.rsocket.RSocket;
2323
import io.rsocket.transport.ClientTransport;
2424
import io.rsocket.util.ByteBufPayload;
25+
import org.reactivestreams.Publisher;
2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
2728
import reactor.core.Disposable;
29+
import reactor.core.publisher.Mono;
2830
import reactor.core.publisher.MonoProcessor;
29-
import reactor.retry.Retry;
3031

3132
import java.net.InetSocketAddress;
3233
import java.net.SocketAddress;
3334
import java.time.Duration;
3435
import java.util.*;
36+
import java.util.concurrent.atomic.AtomicLong;
3537
import java.util.function.Function;
36-
import java.util.function.Supplier;
3738

3839
public class DefaultProteusBrokerService implements ProteusBrokerService, Disposable {
3940
private static final Logger logger = LoggerFactory.getLogger(DefaultProteusBrokerService.class);
@@ -117,14 +118,43 @@ public DefaultProteusBrokerService(
117118

118119
this.client = new BrokerInfoServiceClient(unwrappedGroup("com.netifi.proteus.brokerServices"));
119120
this.presenceNotifier = new BrokerInfoPresenceNotifier(client);
120-
121+
121122
Disposable disposable =
122123
client
123124
.streamBrokerEvents(Empty.getDefaultInstance())
124125
.doOnNext(this::handleBrokerEvent)
125126
.filter(event -> event.getType() == Event.Type.JOIN)
126127
.doOnNext(event -> createConnection())
127-
.doOnError(t -> logger.error("error streaming broker events", t))
128+
.doOnError(
129+
t -> {
130+
logger.warn(
131+
"error streaming broker events - make sure access key {} has a valid access token",
132+
accessKey);
133+
logger.trace("error streaming broker events", t);
134+
})
135+
.onErrorResume(
136+
new Function<Throwable, Publisher<? extends Event>>() {
137+
long attempts = 0;
138+
long lastAttempt = System.currentTimeMillis();
139+
140+
@Override
141+
public synchronized Publisher<? extends Event> apply(Throwable throwable) {
142+
if (Duration.ofMillis(System.currentTimeMillis() - lastAttempt).getSeconds()
143+
> 30) {
144+
attempts = 0;
145+
}
146+
147+
Mono<Event> then =
148+
Mono.delay(Duration.ofMillis(attempts * 500)).then(Mono.error(throwable));
149+
if (attempts < 30) {
150+
attempts++;
151+
}
152+
153+
lastAttempt = System.currentTimeMillis();
154+
155+
return then;
156+
}
157+
})
128158
.retry()
129159
.subscribe();
130160

@@ -278,7 +308,7 @@ private ProteusSocket unwrappedGroup(String group) {
278308
},
279309
this::selectRSocket);
280310
}
281-
311+
282312
private ProteusSocket unwrappedBroadcast(String group) {
283313
return new DefaultProteusSocket(
284314
payload -> {
@@ -308,12 +338,12 @@ public ProteusSocket destination(String destination, String group) {
308338
public ProteusSocket group(String group) {
309339
return PresenceAwareRSocket.wrap(unwrappedGroup(group), null, group, presenceNotifier);
310340
}
311-
341+
312342
@Override
313343
public ProteusSocket broadcast(String group) {
314344
return PresenceAwareRSocket.wrap(unwrappedBroadcast(group), null, group, presenceNotifier);
315345
}
316-
346+
317347
@Override
318348
public void dispose() {
319349
onClose.onComplete();
@@ -445,7 +475,7 @@ private WeightedClientTransportSupplier selectClientTransportSupplier() {
445475
}
446476
}
447477
}
448-
478+
449479
logger.info("selecting socket {} with weight {}", supplier.toString(), supplier.weight());
450480
if (logger.isDebugEnabled()) {
451481
logger.debug("selecting socket {} with weight {}", supplier.toString(), supplier.weight());

0 commit comments

Comments
 (0)