@@ -159,6 +159,20 @@ protected Duration pollerInterval() {
159159 return POLLER_INTERVAL ;
160160 }
161161
162+ private void incrementFailedRefreshes (String name ) {
163+ AtomicInteger counter = numFailedRefreshes .get (name );
164+ if (counter != null ) {
165+ counter .incrementAndGet ();
166+ }
167+ }
168+
169+ private void clearFailedRefreshes (String name ) {
170+ AtomicInteger counter = numFailedRefreshes .get (name );
171+ if (counter != null ) {
172+ counter .set (0 );
173+ }
174+ }
175+
162176 /**
163177 * Helper method to make sure the config poll interval is set to a fixed bound every time.
164178 *
@@ -201,7 +215,8 @@ private Mono<ProposedBucketConfigContext> maybeUpdateBucket(final String name, b
201215
202216 if (allowed ) {
203217 List <NodeInfo > nodes = filterEligibleNodes (name );
204- if (numFailedRefreshes .get (name ).get () >= nodes .size ()) {
218+ AtomicInteger counter = numFailedRefreshes .get (name );
219+ if (counter != null && counter .get () >= nodes .size ()) {
205220 provider .signalConfigRefreshFailed (ConfigRefreshFailure .ALL_NODES_TRIED_ONCE_WITHOUT_SUCCESS );
206221 numFailedRefreshes .get (name ).set (0 );
207222 }
@@ -274,7 +289,7 @@ private Flux<ProposedBucketConfigContext> fetchConfigPerNode(final String name,
274289 .wrap (request , request .response (), true )
275290 .filter (response -> {
276291 if (!response .status ().success ()) {
277- numFailedRefreshes . get (name ). incrementAndGet ( );
292+ incrementFailedRefreshes (name );
278293 eventBus .publish (new BucketConfigRefreshFailedEvent (
279294 core .context (),
280295 BucketConfigRefreshFailedEvent .RefresherType .KV ,
@@ -287,9 +302,9 @@ private Flux<ProposedBucketConfigContext> fetchConfigPerNode(final String name,
287302 .map (response ->
288303 new ProposedBucketConfigContext (name , new String (response .content (), UTF_8 ), nodeInfo .hostname ())
289304 )
290- .doOnSuccess (r -> numFailedRefreshes . get (name ). set ( 0 ))
305+ .doOnSuccess (r -> clearFailedRefreshes (name ))
291306 .onErrorResume (t -> {
292- numFailedRefreshes . get (name ). incrementAndGet ( );
307+ incrementFailedRefreshes (name );
293308 eventBus .publish (new BucketConfigRefreshFailedEvent (
294309 core .context (),
295310 BucketConfigRefreshFailedEvent .RefresherType .KV ,
0 commit comments