Skip to content

Commit b590298

Browse files
Limit max number of built-in roles sync attempts on unexpected failures (#120149) (#120320)
Currently, we retry indefinitely on unexpected errors. This PR adds a limit to avoid flooding logs with potential unknown errors. The counter of unexpected sync failures will be reset to 0 when a successful sync occurs or when a master node is restarted.
1 parent 18f88a0 commit b590298

File tree

2 files changed

+220
-2
lines changed

2 files changed

+220
-2
lines changed

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/QueryableBuiltInRolesSynchronizer.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import java.util.Set;
5555
import java.util.concurrent.Executor;
5656
import java.util.concurrent.atomic.AtomicBoolean;
57+
import java.util.concurrent.atomic.AtomicInteger;
5758

5859
import static java.util.stream.Collectors.toMap;
5960
import static java.util.stream.Collectors.toSet;
@@ -132,6 +133,16 @@ public void taskSucceeded(MarkRolesAsSyncedTask task, Map<String, String> value)
132133

133134
private volatile boolean securityIndexDeleted = false;
134135

136+
/**
137+
* The max consecutive failed sync attempts before skipping further sync attempts.
138+
*/
139+
static final int MAX_FAILED_SYNC_ATTEMPTS = 10;
140+
141+
/**
142+
* The counter of unexpected sync failures. Reset to 0 when a successful sync occurs or when a master node is restarted.
143+
*/
144+
private final AtomicInteger failedSyncAttempts = new AtomicInteger(0);
145+
135146
/**
136147
* Constructs a new built-in roles synchronizer.
137148
*
@@ -214,10 +225,12 @@ private void syncBuiltInRoles(final QueryableBuiltInRoles roles) {
214225
final Map<String, String> indexedRolesDigests = readIndexedBuiltInRolesDigests(clusterService.state());
215226
if (roles.rolesDigest().equals(indexedRolesDigests)) {
216227
logger.debug("Security index already contains the latest built-in roles indexed, skipping roles synchronization");
228+
resetFailedSyncAttempts();
217229
synchronizationInProgress.set(false);
218230
} else {
219231
executor.execute(() -> doSyncBuiltinRoles(indexedRolesDigests, roles, ActionListener.wrap(v -> {
220-
logger.info("Successfully synced [" + roles.roleDescriptors().size() + "] built-in roles to .security index");
232+
logger.info("Successfully synced [{}] built-in roles to .security index", roles.roleDescriptors().size());
233+
resetFailedSyncAttempts();
221234
synchronizationInProgress.set(false);
222235
}, e -> {
223236
handleException(e);
@@ -226,12 +239,14 @@ private void syncBuiltInRoles(final QueryableBuiltInRoles roles) {
226239
}
227240
} catch (Exception e) {
228241
logger.error("Failed to sync built-in roles", e);
242+
failedSyncAttempts.incrementAndGet();
229243
synchronizationInProgress.set(false);
230244
}
231245
}
232246
}
233247

234-
private static void handleException(Exception e) {
248+
private void handleException(Exception e) {
249+
boolean isUnexpectedFailure = false;
235250
if (e instanceof BulkRolesResponseException bulkException) {
236251
final boolean isBulkDeleteFailure = bulkException instanceof BulkDeleteRolesResponseException;
237252
for (final Map.Entry<String, Exception> bulkFailure : bulkException.getFailures().entrySet()) {
@@ -243,14 +258,35 @@ private static void handleException(Exception e) {
243258
if (isExpectedFailure(bulkFailure.getValue())) {
244259
logger.info(logMessage, bulkFailure.getValue());
245260
} else {
261+
isUnexpectedFailure = true;
246262
logger.warn(logMessage, bulkFailure.getValue());
247263
}
248264
}
249265
} else if (isExpectedFailure(e)) {
250266
logger.info("Failed to sync built-in roles to .security index", e);
251267
} else {
268+
isUnexpectedFailure = true;
252269
logger.warn("Failed to sync built-in roles to .security index due to unexpected exception", e);
253270
}
271+
if (isUnexpectedFailure) {
272+
failedSyncAttempts.incrementAndGet();
273+
}
274+
}
275+
276+
private void resetFailedSyncAttempts() {
277+
if (failedSyncAttempts.get() > 0) {
278+
logger.trace("resetting failed sync attempts to 0");
279+
failedSyncAttempts.set(0);
280+
}
281+
}
282+
283+
/**
284+
* Package protected for testing purposes.
285+
*
286+
* @return the number of failed sync attempts
287+
*/
288+
int getFailedSyncAttempts() {
289+
return failedSyncAttempts.get();
254290
}
255291

256292
/**
@@ -293,6 +329,13 @@ private boolean shouldSyncBuiltInRoles(final ClusterState state) {
293329
logger.trace("Local node is not the master, skipping built-in roles synchronization");
294330
return false;
295331
}
332+
if (failedSyncAttempts.get() >= MAX_FAILED_SYNC_ATTEMPTS) {
333+
logger.debug(
334+
"Failed to sync built-in roles to .security index [{}] times. Skipping built-in roles synchronization.",
335+
failedSyncAttempts.get()
336+
);
337+
return false;
338+
}
296339
if (false == state.clusterRecovered()) {
297340
logger.trace("Cluster state has not recovered yet, skipping built-in roles synchronization");
298341
return false;

x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/support/QueryableBuiltInRolesSynchronizerTests.java

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.DocWriteResponse;
12+
import org.elasticsearch.action.UnavailableShardsException;
1213
import org.elasticsearch.action.support.WriteRequest;
1314
import org.elasticsearch.cluster.ClusterChangedEvent;
1415
import org.elasticsearch.cluster.ClusterName;
@@ -45,8 +46,11 @@
4546

4647
import java.util.ArrayList;
4748
import java.util.Collection;
49+
import java.util.Collections;
50+
import java.util.HashSet;
4851
import java.util.List;
4952
import java.util.Set;
53+
import java.util.stream.Collectors;
5054

5155
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
5256
import static org.elasticsearch.xpack.security.support.QueryableBuiltInRolesSynchronizer.QUERYABLE_BUILT_IN_ROLES_FEATURE;
@@ -164,6 +168,7 @@ public void testSuccessfulSync() {
164168
verify(clusterService, times(3)).state();
165169
verifyNoMoreInteractions(nativeRolesStore, featureService, taskQueue, reservedRolesProvider, threadPool, clusterService);
166170
assertThat(synchronizer.isSynchronizationInProgress(), equalTo(false));
171+
assertThat(synchronizer.getFailedSyncAttempts(), equalTo(0));
167172
}
168173

169174
public void testNotMaster() {
@@ -384,6 +389,151 @@ public void testSecurityIndexClosed() {
384389
verifyNoMoreInteractions(nativeRolesStore, featureService, taskQueue, reservedRolesProvider, threadPool, clusterService);
385390
}
386391

392+
public void testUnexpectedSyncFailures() {
393+
assertInitialState();
394+
395+
ClusterState clusterState = markShardsAvailable(createClusterStateWithOpenSecurityIndex()).nodes(localNodeMaster())
396+
.blocks(emptyClusterBlocks())
397+
.build();
398+
399+
when(clusterService.state()).thenReturn(clusterState);
400+
when(featureService.clusterHasFeature(any(), eq(QUERYABLE_BUILT_IN_ROLES_FEATURE))).thenReturn(true);
401+
402+
final Set<String> roles = randomReservedRoles(randomIntBetween(1, 10));
403+
final QueryableBuiltInRoles builtInRoles = buildQueryableBuiltInRoles(
404+
roles.stream().map(ReservedRolesStore::roleDescriptor).collect(Collectors.toSet())
405+
);
406+
when(reservedRolesProvider.getRoles()).thenReturn(builtInRoles);
407+
mockNativeRolesStoreWithFailure(builtInRoles.roleDescriptors(), Set.of(), new IllegalStateException("unexpected failure"));
408+
assertThat(synchronizer.isSynchronizationInProgress(), equalTo(false));
409+
410+
for (int i = 1; i <= QueryableBuiltInRolesSynchronizer.MAX_FAILED_SYNC_ATTEMPTS + 5; i++) {
411+
synchronizer.clusterChanged(event(clusterState));
412+
if (i < QueryableBuiltInRolesSynchronizer.MAX_FAILED_SYNC_ATTEMPTS) {
413+
assertThat(synchronizer.getFailedSyncAttempts(), equalTo(i));
414+
} else {
415+
assertThat(synchronizer.getFailedSyncAttempts(), equalTo(QueryableBuiltInRolesSynchronizer.MAX_FAILED_SYNC_ATTEMPTS));
416+
}
417+
}
418+
419+
verify(nativeRolesStore, times(QueryableBuiltInRolesSynchronizer.MAX_FAILED_SYNC_ATTEMPTS)).isEnabled();
420+
verify(featureService, times(QueryableBuiltInRolesSynchronizer.MAX_FAILED_SYNC_ATTEMPTS)).clusterHasFeature(
421+
any(),
422+
eq(QUERYABLE_BUILT_IN_ROLES_FEATURE)
423+
);
424+
verify(reservedRolesProvider, times(QueryableBuiltInRolesSynchronizer.MAX_FAILED_SYNC_ATTEMPTS)).getRoles();
425+
verify(nativeRolesStore, times(QueryableBuiltInRolesSynchronizer.MAX_FAILED_SYNC_ATTEMPTS)).putRoles(
426+
eq(WriteRequest.RefreshPolicy.IMMEDIATE),
427+
eq(builtInRoles.roleDescriptors()),
428+
eq(false),
429+
any()
430+
);
431+
verify(clusterService, times(QueryableBuiltInRolesSynchronizer.MAX_FAILED_SYNC_ATTEMPTS)).state();
432+
verifyNoMoreInteractions(nativeRolesStore, featureService, taskQueue, reservedRolesProvider, threadPool, clusterService);
433+
assertThat(synchronizer.isSynchronizationInProgress(), equalTo(false));
434+
}
435+
436+
public void testFailedSyncAttemptsGetsResetAfterSuccessfulSync() {
437+
assertInitialState();
438+
439+
ClusterState clusterState = markShardsAvailable(createClusterStateWithOpenSecurityIndex()).nodes(localNodeMaster())
440+
.blocks(emptyClusterBlocks())
441+
.build();
442+
443+
when(clusterService.state()).thenReturn(clusterState);
444+
when(featureService.clusterHasFeature(any(), eq(QUERYABLE_BUILT_IN_ROLES_FEATURE))).thenReturn(true);
445+
446+
final Set<String> roles = randomReservedRoles(randomIntBetween(1, 10));
447+
final QueryableBuiltInRoles builtInRoles = buildQueryableBuiltInRoles(
448+
roles.stream().map(ReservedRolesStore::roleDescriptor).collect(Collectors.toSet())
449+
);
450+
when(reservedRolesProvider.getRoles()).thenReturn(builtInRoles);
451+
mockNativeRolesStoreWithFailure(builtInRoles.roleDescriptors(), Set.of(), new IllegalStateException("unexpected failure"));
452+
assertThat(synchronizer.isSynchronizationInProgress(), equalTo(false));
453+
454+
// assert failed sync attempts are counted
455+
int numOfSimulatedFailures = randomIntBetween(1, QueryableBuiltInRolesSynchronizer.MAX_FAILED_SYNC_ATTEMPTS - 1);
456+
for (int i = 0; i < numOfSimulatedFailures; i++) {
457+
synchronizer.clusterChanged(event(clusterState));
458+
assertThat(synchronizer.getFailedSyncAttempts(), equalTo(i + 1));
459+
}
460+
assertThat(synchronizer.getFailedSyncAttempts(), equalTo(numOfSimulatedFailures));
461+
462+
// assert successful sync resets the failed sync attempts
463+
mockEnabledNativeStore(builtInRoles.roleDescriptors(), Set.of());
464+
synchronizer.clusterChanged(event(clusterState));
465+
assertThat(synchronizer.getFailedSyncAttempts(), equalTo(0));
466+
467+
verify(nativeRolesStore, times(numOfSimulatedFailures + 1)).isEnabled();
468+
verify(featureService, times(numOfSimulatedFailures + 1)).clusterHasFeature(any(), eq(QUERYABLE_BUILT_IN_ROLES_FEATURE));
469+
verify(reservedRolesProvider, times(numOfSimulatedFailures + 1)).getRoles();
470+
verify(nativeRolesStore, times(numOfSimulatedFailures + 1)).putRoles(
471+
eq(WriteRequest.RefreshPolicy.IMMEDIATE),
472+
eq(builtInRoles.roleDescriptors()),
473+
eq(false),
474+
any()
475+
);
476+
verify(taskQueue, times(1)).submitTask(any(), argThat(task -> task.getNewRoleDigests().equals(builtInRoles.rolesDigest())), any());
477+
verify(clusterService, times(numOfSimulatedFailures + 3)).state();
478+
verifyNoMoreInteractions(nativeRolesStore, featureService, taskQueue, reservedRolesProvider, threadPool, clusterService);
479+
assertThat(synchronizer.isSynchronizationInProgress(), equalTo(false));
480+
}
481+
482+
public void testExpectedSyncFailuresAreNotCounted() {
483+
assertInitialState();
484+
485+
ClusterState clusterState = markShardsAvailable(createClusterStateWithOpenSecurityIndex()).nodes(localNodeMaster())
486+
.blocks(emptyClusterBlocks())
487+
.build();
488+
489+
when(clusterService.state()).thenReturn(clusterState);
490+
when(featureService.clusterHasFeature(any(), eq(QUERYABLE_BUILT_IN_ROLES_FEATURE))).thenReturn(true);
491+
492+
final Set<String> roles = randomReservedRoles(randomIntBetween(1, 10));
493+
final QueryableBuiltInRoles builtInRoles = buildQueryableBuiltInRoles(
494+
roles.stream().map(ReservedRolesStore::roleDescriptor).collect(Collectors.toSet())
495+
);
496+
when(reservedRolesProvider.getRoles()).thenReturn(builtInRoles);
497+
mockNativeRolesStoreWithFailure(builtInRoles.roleDescriptors(), Set.of(), new UnavailableShardsException(null, "expected failure"));
498+
assertThat(synchronizer.isSynchronizationInProgress(), equalTo(false));
499+
500+
synchronizer.clusterChanged(event(clusterState));
501+
502+
assertThat(synchronizer.getFailedSyncAttempts(), equalTo(0));
503+
504+
verify(nativeRolesStore, times(1)).isEnabled();
505+
verify(featureService, times(1)).clusterHasFeature(any(), eq(QUERYABLE_BUILT_IN_ROLES_FEATURE));
506+
verify(reservedRolesProvider, times(1)).getRoles();
507+
verify(nativeRolesStore, times(1)).putRoles(
508+
eq(WriteRequest.RefreshPolicy.IMMEDIATE),
509+
eq(builtInRoles.roleDescriptors()),
510+
eq(false),
511+
any()
512+
);
513+
verify(clusterService, times(1)).state();
514+
verifyNoMoreInteractions(nativeRolesStore, featureService, taskQueue, reservedRolesProvider, threadPool, clusterService);
515+
assertThat(synchronizer.isSynchronizationInProgress(), equalTo(false));
516+
}
517+
518+
private Set<String> randomReservedRoles(int count) {
519+
assert count >= 0;
520+
if (count == 0) {
521+
return Set.of();
522+
}
523+
if (count == 1) {
524+
return Set.of(ReservedRolesStore.SUPERUSER_ROLE_DESCRIPTOR.getName());
525+
}
526+
527+
final Set<String> reservedRoles = new HashSet<>();
528+
reservedRoles.add(ReservedRolesStore.SUPERUSER_ROLE_DESCRIPTOR.getName());
529+
final Set<String> allReservedRolesExceptSuperuser = ReservedRolesStore.names()
530+
.stream()
531+
.filter(role -> false == role.equals(ReservedRolesStore.SUPERUSER_ROLE_DESCRIPTOR.getName()))
532+
.collect(Collectors.toSet());
533+
reservedRoles.addAll(randomUnique(() -> randomFrom(allReservedRolesExceptSuperuser), count - 1));
534+
return Collections.unmodifiableSet(reservedRoles);
535+
}
536+
387537
private static ClusterState.Builder markShardsAvailable(ClusterState.Builder clusterStateBuilder) {
388538
final ClusterState cs = clusterStateBuilder.build();
389539
return ClusterState.builder(cs)
@@ -508,6 +658,31 @@ private void mockEnabledNativeStore(final Collection<RoleDescriptor> rolesToUpse
508658
.deleteRoles(eq(rolesToDelete), eq(WriteRequest.RefreshPolicy.IMMEDIATE), eq(false), any(ActionListener.class));
509659
}
510660

661+
@SuppressWarnings({ "unchecked", "rawtypes" })
662+
private void mockNativeRolesStoreWithFailure(
663+
final Collection<RoleDescriptor> rolesToUpsert,
664+
final Collection<String> rolesToDelete,
665+
Exception failure
666+
) {
667+
when(nativeRolesStore.isEnabled()).thenReturn(true);
668+
doAnswer(i -> {
669+
assertThat(synchronizer.isSynchronizationInProgress(), equalTo(true));
670+
((ActionListener) i.getArgument(3)).onResponse(
671+
new BulkRolesResponse(rolesToUpsert.stream().map(role -> BulkRolesResponse.Item.failure(role.getName(), failure)).toList())
672+
);
673+
return null;
674+
}).when(nativeRolesStore)
675+
.putRoles(eq(WriteRequest.RefreshPolicy.IMMEDIATE), eq(rolesToUpsert), eq(false), any(ActionListener.class));
676+
677+
doAnswer(i -> {
678+
((ActionListener) i.getArgument(3)).onResponse(
679+
new BulkRolesResponse(rolesToDelete.stream().map(role -> BulkRolesResponse.Item.failure(role, failure)).toList())
680+
);
681+
return null;
682+
}).when(nativeRolesStore)
683+
.deleteRoles(eq(rolesToDelete), eq(WriteRequest.RefreshPolicy.IMMEDIATE), eq(false), any(ActionListener.class));
684+
}
685+
511686
private static ClusterState.Builder createClusterStateWithOpenSecurityIndex() {
512687
return SecurityIndexManagerTests.createClusterState(
513688
TestRestrictedIndices.INTERNAL_SECURITY_MAIN_INDEX_7,

0 commit comments

Comments
 (0)