Skip to content

Commit 4b03ef8

Browse files
authored
Merge (#116406)
1 parent b24151a commit 4b03ef8

File tree

2 files changed

+168
-28
lines changed

2 files changed

+168
-28
lines changed

x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/support/SecurityIndexManagerIntegTests.java

Lines changed: 107 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,39 @@
2626
import org.elasticsearch.xpack.core.security.action.user.PutUserResponse;
2727
import org.elasticsearch.xpack.security.authz.store.NativePrivilegeStore;
2828
import org.hamcrest.Matchers;
29+
import org.junit.After;
2930
import org.junit.Before;
3031

3132
import java.util.ArrayList;
3233
import java.util.List;
3334
import java.util.concurrent.CopyOnWriteArrayList;
3435
import java.util.concurrent.CyclicBarrier;
36+
import java.util.concurrent.ExecutorService;
37+
import java.util.concurrent.Executors;
38+
import java.util.concurrent.Future;
3539
import java.util.concurrent.TimeUnit;
3640
import java.util.concurrent.atomic.AtomicInteger;
3741

3842
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3943
import static org.elasticsearch.xpack.security.support.SecuritySystemIndices.SECURITY_MAIN_ALIAS;
4044
import static org.hamcrest.Matchers.arrayContaining;
45+
import static org.hamcrest.Matchers.hasItem;
46+
import static org.hamcrest.Matchers.instanceOf;
4147
import static org.hamcrest.Matchers.is;
4248
import static org.hamcrest.Matchers.not;
4349
import static org.hamcrest.Matchers.notNullValue;
4450
import static org.hamcrest.Matchers.nullValue;
4551

4652
public class SecurityIndexManagerIntegTests extends SecurityIntegTestCase {
4753

54+
private final int concurrentCallsToOnAvailable = 6;
55+
private final ExecutorService executor = Executors.newFixedThreadPool(concurrentCallsToOnAvailable);
56+
57+
@After
58+
public void shutdownExecutor() {
59+
executor.shutdown();
60+
}
61+
4862
public void testConcurrentOperationsTryingToCreateSecurityIndexAndAlias() throws Exception {
4963
final int processors = Runtime.getRuntime().availableProcessors();
5064
final int numThreads = Math.min(50, scaledRandomIntBetween((processors + 1) / 2, 4 * processors)); // up to 50 threads
@@ -110,6 +124,12 @@ public void testOnIndexAvailableForSearchIndexCompletesWithinTimeout() throws Ex
110124
// pick longer wait than in the assertBusy that waits for below to ensure index has had enough time to initialize
111125
securityIndexManager.onIndexAvailableForSearch((ActionListener<Void>) future, TimeValue.timeValueSeconds(40));
112126

127+
// check listener added
128+
assertThat(
129+
securityIndexManager.getStateChangeListeners(),
130+
hasItem(instanceOf(SecurityIndexManager.StateConsumerWithCancellable.class))
131+
);
132+
113133
createSecurityIndexWithWaitForActiveShards();
114134

115135
assertBusy(
@@ -121,6 +141,12 @@ public void testOnIndexAvailableForSearchIndexCompletesWithinTimeout() throws Ex
121141
// security index creation is complete and index is available for search; therefore whenIndexAvailableForSearch should report
122142
// success in time
123143
future.actionGet();
144+
145+
// check no remaining listeners
146+
assertThat(
147+
securityIndexManager.getStateChangeListeners(),
148+
not(hasItem(instanceOf(SecurityIndexManager.StateConsumerWithCancellable.class)))
149+
);
124150
}
125151

126152
@SuppressWarnings("unchecked")
@@ -152,6 +178,69 @@ public void testOnIndexAvailableForSearchIndexAlreadyAvailable() throws Exceptio
152178
securityIndexManager.onIndexAvailableForSearch((ActionListener<Void>) future, TimeValue.timeValueSeconds(10));
153179
future.actionGet();
154180
}
181+
182+
// check no remaining listeners
183+
assertThat(
184+
securityIndexManager.getStateChangeListeners(),
185+
not(hasItem(instanceOf(SecurityIndexManager.StateConsumerWithCancellable.class)))
186+
);
187+
}
188+
189+
@SuppressWarnings("unchecked")
190+
public void testOnIndexAvailableForSearchIndexUnderConcurrentLoad() throws Exception {
191+
final SecurityIndexManager securityIndexManager = internalCluster().getInstances(NativePrivilegeStore.class)
192+
.iterator()
193+
.next()
194+
.getSecurityIndexManager();
195+
// Long time out calls should all succeed
196+
final List<Future<Void>> futures = new ArrayList<>();
197+
for (int i = 0; i < concurrentCallsToOnAvailable / 2; i++) {
198+
final Future<Void> future = executor.submit(() -> {
199+
try {
200+
final ActionFuture<Void> f = new PlainActionFuture<>();
201+
securityIndexManager.onIndexAvailableForSearch((ActionListener<Void>) f, TimeValue.timeValueSeconds(40));
202+
f.actionGet();
203+
} catch (Exception ex) {
204+
fail(ex, "should not have encountered exception");
205+
}
206+
return null;
207+
});
208+
futures.add(future);
209+
}
210+
211+
// short time-out tasks should all time out
212+
for (int i = 0; i < concurrentCallsToOnAvailable / 2; i++) {
213+
final Future<Void> future = executor.submit(() -> {
214+
expectThrows(ElasticsearchTimeoutException.class, () -> {
215+
final ActionFuture<Void> f = new PlainActionFuture<>();
216+
securityIndexManager.onIndexAvailableForSearch((ActionListener<Void>) f, TimeValue.timeValueMillis(10));
217+
f.actionGet();
218+
});
219+
return null;
220+
});
221+
futures.add(future);
222+
}
223+
224+
// Sleep a second for short-running calls to timeout
225+
Thread.sleep(1000);
226+
227+
createSecurityIndexWithWaitForActiveShards();
228+
// ensure security index manager state is fully in the expected precondition state for this test (ready for search)
229+
assertBusy(
230+
() -> assertThat(securityIndexManager.isAvailable(SecurityIndexManager.Availability.SEARCH_SHARDS), is(true)),
231+
30,
232+
TimeUnit.SECONDS
233+
);
234+
235+
for (var future : futures) {
236+
future.get(10, TimeUnit.SECONDS);
237+
}
238+
239+
// check no remaining listeners
240+
assertThat(
241+
securityIndexManager.getStateChangeListeners(),
242+
not(hasItem(instanceOf(SecurityIndexManager.StateConsumerWithCancellable.class)))
243+
);
155244
}
156245

157246
@SuppressWarnings("unchecked")
@@ -163,9 +252,24 @@ public void testOnIndexAvailableForSearchIndexWaitTimeOut() {
163252
.next()
164253
.getSecurityIndexManager();
165254

166-
final ActionFuture<Void> future = new PlainActionFuture<>();
167-
securityIndexManager.onIndexAvailableForSearch((ActionListener<Void>) future, TimeValue.timeValueMillis(100));
168-
expectThrows(ElasticsearchTimeoutException.class, future::actionGet);
255+
{
256+
final ActionFuture<Void> future = new PlainActionFuture<>();
257+
securityIndexManager.onIndexAvailableForSearch((ActionListener<Void>) future, TimeValue.timeValueMillis(100));
258+
expectThrows(ElasticsearchTimeoutException.class, future::actionGet);
259+
}
260+
261+
// Also works with 0 timeout
262+
{
263+
final ActionFuture<Void> future = new PlainActionFuture<>();
264+
securityIndexManager.onIndexAvailableForSearch((ActionListener<Void>) future, TimeValue.timeValueMillis(0));
265+
expectThrows(ElasticsearchTimeoutException.class, future::actionGet);
266+
}
267+
268+
// check no remaining listeners
269+
assertThat(
270+
securityIndexManager.getStateChangeListeners(),
271+
not(hasItem(instanceOf(SecurityIndexManager.StateConsumerWithCancellable.class)))
272+
);
169273
}
170274

171275
public void testSecurityIndexSettingsCannotBeChanged() throws Exception {

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.Objects;
5858
import java.util.Set;
5959
import java.util.concurrent.CopyOnWriteArrayList;
60+
import java.util.concurrent.atomic.AtomicBoolean;
6061
import java.util.function.BiConsumer;
6162
import java.util.function.Consumer;
6263
import java.util.stream.Collectors;
@@ -420,45 +421,80 @@ public void accept(State previousState, State nextState) {
420421
* Notifies {@code listener} once the security index is available, or calls {@code onFailure} on {@code timeout}.
421422
*/
422423
public void onIndexAvailableForSearch(ActionListener<Void> listener, TimeValue timeout) {
423-
logger.info("Will wait for security index [{}] to become available for search", getConcreteIndexName());
424+
logger.info("Will wait for security index [{}] for [{}] to become available for search", getConcreteIndexName(), timeout);
424425

425-
final ActionListener<Void> notifyOnceListener = ActionListener.notifyOnce(listener);
426+
if (state.indexAvailableForSearch) {
427+
logger.debug("Security index [{}] is already available", getConcreteIndexName());
428+
listener.onResponse(null);
429+
return;
430+
}
426431

432+
final AtomicBoolean isDone = new AtomicBoolean(false);
427433
final var indexAvailableForSearchListener = new StateConsumerWithCancellable() {
428434
@Override
429435
public void accept(SecurityIndexManager.State previousState, SecurityIndexManager.State nextState) {
430436
if (nextState.indexAvailableForSearch) {
431-
assert cancellable != null;
432-
// cancel and removeStateListener are idempotent
433-
cancellable.cancel();
434-
removeStateListener(this);
435-
notifyOnceListener.onResponse(null);
437+
if (isDone.compareAndSet(false, true)) {
438+
cancel();
439+
removeStateListener(this);
440+
listener.onResponse(null);
441+
}
436442
}
437443
}
438444
};
445+
// add listener _before_ registering timeout -- this way we are guaranteed it gets removed (either by timeout below, or successful
446+
// completion above)
447+
addStateListener(indexAvailableForSearchListener);
448+
439449
// schedule failure handling on timeout -- keep reference to cancellable so a successful completion can cancel the timeout
440-
indexAvailableForSearchListener.cancellable = client.threadPool().schedule(() -> {
441-
removeStateListener(indexAvailableForSearchListener);
442-
notifyOnceListener.onFailure(
443-
new ElasticsearchTimeoutException(
444-
"timed out waiting for security index [" + getConcreteIndexName() + "] to become available for search"
445-
)
446-
);
447-
}, timeout, client.threadPool().generic());
450+
indexAvailableForSearchListener.setCancellable(client.threadPool().schedule(() -> {
451+
if (isDone.compareAndSet(false, true)) {
452+
removeStateListener(indexAvailableForSearchListener);
453+
listener.onFailure(
454+
new ElasticsearchTimeoutException(
455+
"timed out waiting for security index [" + getConcreteIndexName() + "] to become available for search"
456+
)
457+
);
458+
}
459+
}, timeout, client.threadPool().generic()));
460+
}
448461

449-
// in case the state has meanwhile changed to available, return immediately
450-
if (state.indexAvailableForSearch) {
451-
indexAvailableForSearchListener.cancellable.cancel();
452-
notifyOnceListener.onResponse(null);
453-
} else {
454-
addStateListener(indexAvailableForSearchListener);
455-
}
462+
// pkg-private for testing
463+
List<BiConsumer<State, State>> getStateChangeListeners() {
464+
return stateChangeListeners;
456465
}
457466

458-
private abstract static class StateConsumerWithCancellable
467+
/**
468+
* This class ensures that if cancel() is called _before_ setCancellable(), the passed-in cancellable is still correctly cancelled on
469+
* a subsequent setCancellable() call.
470+
*/
471+
// pkg-private for testing
472+
abstract static class StateConsumerWithCancellable
459473
implements
460-
BiConsumer<SecurityIndexManager.State, SecurityIndexManager.State> {
461-
volatile Scheduler.ScheduledCancellable cancellable;
474+
BiConsumer<SecurityIndexManager.State, SecurityIndexManager.State>,
475+
Scheduler.Cancellable {
476+
private volatile Scheduler.ScheduledCancellable cancellable;
477+
private volatile boolean cancelled = false;
478+
479+
void setCancellable(Scheduler.ScheduledCancellable cancellable) {
480+
this.cancellable = cancellable;
481+
if (cancelled) {
482+
cancel();
483+
}
484+
}
485+
486+
public boolean cancel() {
487+
cancelled = true;
488+
if (cancellable != null) {
489+
// cancellable is idempotent, so it's fine to potentially call it multiple times
490+
return cancellable.cancel();
491+
}
492+
return isCancelled();
493+
}
494+
495+
public boolean isCancelled() {
496+
return cancelled;
497+
}
462498
}
463499

464500
private Tuple<Boolean, Boolean> checkIndexAvailable(ClusterState state) {

0 commit comments

Comments
 (0)