@@ -72,14 +72,15 @@ public class PrometheusRSocketClient {
7272 *
7373 * @param registryAndScrape the registry and scrape meter
7474 * @param transport the client transport
75- * @param retry the retry configuration
75+ * @param reconnectRetry the reconnectRetry configuration
7676 * @param onKeyReceived the callback if a key has been received
7777 */
7878 private PrometheusRSocketClient (MeterRegistryAndScrape <?> registryAndScrape ,
7979 ClientTransport transport ,
80- Retry retry ,
80+ Retry reconnectRetry ,
8181 Runnable onKeyReceived ) {
82- this (registryAndScrape , transport , retry , Duration .ofSeconds (5 ), onKeyReceived );
82+ this (registryAndScrape , transport , reconnectRetry , Retry .backoff (6 , Duration .ofMillis (100 ))
83+ .maxBackoff (Duration .ofSeconds (5 )), Duration .ofSeconds (5 ), onKeyReceived );
8384 }
8485
8586 /**
@@ -93,6 +94,7 @@ private PrometheusRSocketClient(MeterRegistryAndScrape<?> registryAndScrape,
9394 */
9495 private PrometheusRSocketClient (MeterRegistryAndScrape <?> registryAndScrape ,
9596 ClientTransport transport ,
97+ Retry reconnectRetry ,
9698 Retry retry ,
9799 Duration timeout ,
98100 Runnable onKeyReceived ) {
@@ -103,7 +105,7 @@ private PrometheusRSocketClient(MeterRegistryAndScrape<?> registryAndScrape,
103105 .reconnect (new Retry () {
104106 @ Override
105107 public Publisher <?> generateCompanion (Flux <RetrySignal > retrySignals ) {
106- return retry .generateCompanion (retrySignals
108+ return reconnectRetry .generateCompanion (retrySignals
107109 .doOnNext (retrySignal -> {
108110 Throwable failure = retrySignal .failure ();
109111 DistributionSummary .builder ("prometheus.connection.retry" )
@@ -118,35 +120,57 @@ public Publisher<?> generateCompanion(Flux<RetrySignal> retrySignals) {
118120 }
119121 })
120122 .acceptor ((payload , r ) -> {
123+ LOGGER .trace ("Acceptor for requestResponse and fireAndForget has been set up." );
121124 this .sendingSocket = r ;
122125 return Mono .just (new RSocket () {
123126 @ Override
124127 public Mono <Payload > requestResponse (Payload payload ) {
125- PublicKey key = decodePublicKey (payload .getData ());
126- latestKey .set (key );
127- onKeyReceived .run ();
128- return Mono .fromCallable (() -> scrapePayload (key ));
128+ try {
129+ LOGGER .trace ("Received public key from Prometheus proxy in requestResponse." );
130+ PublicKey key = decodePublicKey (payload .getData ());
131+ latestKey .set (key );
132+ onKeyReceived .run ();
133+ return Mono .fromCallable (() -> scrapePayload (key ));
134+ } finally {
135+ payload .release ();
136+ }
129137 }
130138
131139 @ Override
132140 public Mono <Void > fireAndForget (Payload payload ) {
133- latestKey .set (decodePublicKey (payload .getData ()));
134- onKeyReceived .run ();
135- return Mono .empty ();
141+ try {
142+ LOGGER .trace ("Received public key from Prometheus proxy in fireAndForget." );
143+ latestKey .set (decodePublicKey (payload .getData ()));
144+ onKeyReceived .run ();
145+ return Mono .empty ();
146+ } finally {
147+ payload .release ();
148+ }
136149 }
137150 });
138151 })
139152 .connect (transport )
140- .doOnError (t -> Counter .builder ("prometheus.connection.error" )
141- .baseUnit ("errors" )
142- .tag ("exception" , t .getClass ().getSimpleName () == null ? t .getClass ().getName () : t .getClass ().getSimpleName ())
143- .register (registryAndScrape .registry )
144- .increment ())
153+ .doOnError (t -> {
154+ LOGGER .trace ("Failed to connect to Prometheus proxy." , t );
155+ Counter .builder ("prometheus.connection.error" )
156+ .baseUnit ("errors" )
157+ .tag ("exception" , t .getClass ().getSimpleName () == null ? t .getClass ().getName () : t .getClass ().getSimpleName ())
158+ .register (registryAndScrape .registry )
159+ .increment ();
160+ })
161+ .doOnSuccess (connection -> {
162+ LOGGER .trace ("Successfully established connection to Prometheus proxy." );
163+ Counter .builder ("prometheus.connection.success" )
164+ .baseUnit ("connections" )
165+ .register (registryAndScrape .registry )
166+ .increment ();
167+ })
145168 .doOnNext (connection -> this .connection = connection )
146169 .flatMap (socket -> socket .onClose ()
147- .map ( v -> 1 ) // https://github.com/rsocket/rsocket-java/issues/819
170+ .then ( Mono . fromCallable (() -> 1 ) ) // https://github.com/rsocket/rsocket-java/issues/819
148171 .onErrorReturn (1 ))
149172 .repeat (() -> !requestedDisconnect )
173+ .retryWhen (retry )
150174 .subscribe ();
151175 }
152176
@@ -299,9 +323,12 @@ public static class Builder {
299323 private MeterRegistryAndScrape <?> registryAndScrape ;
300324 private final ClientTransport clientTransport ;
301325
302- private Retry retry = Retry .backoff (Long .MAX_VALUE , Duration .ofSeconds (10 ))
326+ private Retry reconnectRetry = Retry .backoff (Long .MAX_VALUE , Duration .ofSeconds (10 ))
303327 .maxBackoff (Duration .ofMinutes (10 ));
304328
329+ private Retry retry = Retry .backoff (6 , Duration .ofMillis (100 ))
330+ .maxBackoff (Duration .ofSeconds (5 ));
331+
305332 private Duration timeout = Duration .ofSeconds (5 );
306333
307334 private Runnable onKeyReceived = () -> {
@@ -312,6 +339,17 @@ <M extends MeterRegistry> Builder(M registry, Supplier<String> scrape, ClientTra
312339 this .clientTransport = clientTransport ;
313340 }
314341
342+ /**
343+ * Configures the reconnectRetry for {@link PrometheusRSocketClient}.
344+ *
345+ * @param reconnectRetry the reconnectRetry configuration
346+ * @return the {@link Builder}
347+ */
348+ public Builder reconnectRetry (Retry reconnectRetry ) {
349+ this .reconnectRetry = reconnectRetry ;
350+ return this ;
351+ }
352+
315353 /**
316354 * Configures the retry for {@link PrometheusRSocketClient}.
317355 *
@@ -366,6 +404,7 @@ public PrometheusRSocketClient connect(Duration timeout) {
366404 return new PrometheusRSocketClient (
367405 registryAndScrape ,
368406 clientTransport ,
407+ reconnectRetry ,
369408 retry ,
370409 timeout ,
371410 () -> {
@@ -387,6 +426,8 @@ public PrometheusRSocketClient connectBlockingly() {
387426 /**
388427 * Connects the {@link PrometheusRSocketClient} blockingly with the given timeout.
389428 *
429+ * @param timeout the timeout to wait for the connection to be established
430+ *
390431 * @return the {@link PrometheusRSocketClient}
391432 */
392433 public PrometheusRSocketClient connectBlockingly (Duration timeout ) {
@@ -395,6 +436,7 @@ public PrometheusRSocketClient connectBlockingly(Duration timeout) {
395436 PrometheusRSocketClient client = new PrometheusRSocketClient (
396437 registryAndScrape ,
397438 clientTransport ,
439+ reconnectRetry ,
398440 retry ,
399441 timeout ,
400442 () -> {
0 commit comments