Skip to content

Commit da3408b

Browse files
committed
TransportConfigUpdateAction does wait again for config reload, but no longer blocks thread
Signed-off-by: Nils Bandener <[email protected]>
1 parent 69ab6e6 commit da3408b

File tree

5 files changed

+384
-28
lines changed

5 files changed

+384
-28
lines changed

src/main/java/org/opensearch/security/action/configupdate/TransportConfigUpdateAction.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,22 @@
3535

3636
import org.opensearch.action.FailedNodeException;
3737
import org.opensearch.action.support.ActionFilters;
38-
import org.opensearch.action.support.nodes.TransportNodesAction;
3938
import org.opensearch.cluster.service.ClusterService;
4039
import org.opensearch.common.inject.Inject;
4140
import org.opensearch.common.inject.Provider;
4241
import org.opensearch.common.settings.Settings;
42+
import org.opensearch.core.action.ActionListener;
4343
import org.opensearch.core.common.io.stream.StreamInput;
4444
import org.opensearch.core.common.io.stream.StreamOutput;
4545
import org.opensearch.security.auth.BackendRegistry;
4646
import org.opensearch.security.configuration.ConfigurationRepository;
4747
import org.opensearch.security.securityconf.impl.CType;
48+
import org.opensearch.security.util.TransportNodesAsyncAction;
4849
import org.opensearch.threadpool.ThreadPool;
4950
import org.opensearch.transport.TransportRequest;
5051
import org.opensearch.transport.TransportService;
5152

52-
public class TransportConfigUpdateAction extends TransportNodesAction<
53+
public class TransportConfigUpdateAction extends TransportNodesAsyncAction<
5354
ConfigUpdateRequest,
5455
ConfigUpdateResponse,
5556
TransportConfigUpdateAction.NodeConfigUpdateRequest,
@@ -81,6 +82,7 @@ public TransportConfigUpdateAction(
8182
ConfigUpdateRequest::new,
8283
TransportConfigUpdateAction.NodeConfigUpdateRequest::new,
8384
ThreadPool.Names.MANAGEMENT,
85+
ThreadPool.Names.SAME,
8486
ConfigUpdateNodeResponse.class
8587
);
8688

@@ -124,14 +126,29 @@ protected ConfigUpdateResponse newResponse(
124126
}
125127

126128
@Override
127-
protected ConfigUpdateNodeResponse nodeOperation(final NodeConfigUpdateRequest request) {
129+
protected void nodeOperation(NodeConfigUpdateRequest request, ActionListener<ConfigUpdateNodeResponse> listener) {
128130
final var configupdateRequest = request.request;
129131
if (canHandleSelectively(configupdateRequest)) {
130132
backendRegistry.get().invalidateUserCache(configupdateRequest.getEntityNames());
133+
listener.onResponse(new ConfigUpdateNodeResponse(clusterService.localNode(), configupdateRequest.getConfigTypes(), null));
131134
} else {
132-
configurationRepository.reloadConfiguration(CType.fromStringValues((configupdateRequest.getConfigTypes())));
135+
configurationRepository.reloadConfiguration(
136+
CType.fromStringValues((configupdateRequest.getConfigTypes())),
137+
new ActionListener<>() {
138+
@Override
139+
public void onResponse(ConfigurationRepository.ConfigReloadResponse configReloadResponse) {
140+
listener.onResponse(
141+
new ConfigUpdateNodeResponse(clusterService.localNode(), configupdateRequest.getConfigTypes(), null)
142+
);
143+
}
144+
145+
@Override
146+
public void onFailure(Exception e) {
147+
listener.onFailure(e);
148+
}
149+
}
150+
);
133151
}
134-
return new ConfigUpdateNodeResponse(clusterService.localNode(), configupdateRequest.getConfigTypes(), null);
135152
}
136153

137154
private boolean canHandleSelectively(ConfigUpdateRequest request) {

src/main/java/org/opensearch/security/configuration/ConfigurationRepository.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -531,9 +531,11 @@ public <T> SecurityDynamicConfiguration<T> getConfiguration(CType<T> configurati
531531
* updates into one.
532532
533533
* @param configTypes the configuration types to be reloaded.
534+
* @param listener an listener to be notified when the reload was finished. You can provide null if you do not want
535+
* such a notification
534536
*/
535-
public void reloadConfiguration(Collection<CType<?>> configTypes) {
536-
this.reloadThread.requestReload(configTypes);
537+
public void reloadConfiguration(Collection<CType<?>> configTypes, ActionListener<ConfigReloadResponse> listener) {
538+
this.reloadThread.requestReload(configTypes, listener);
537539
}
538540

539541
/**
@@ -651,7 +653,7 @@ public void afterIndexShardStarted(IndexShard indexShard) {
651653
threadPool.generic().execute(() -> {
652654
if (isSecurityIndexRestoredFromSnapshot(clusterService, index, securityIndex)) {
653655
LOGGER.info("Security index primary shard {} started - config reloading for snapshot restore", shardId);
654-
reloadConfiguration(CType.values());
656+
reloadConfiguration(CType.values(), null);
655657
}
656658
});
657659
}
@@ -689,6 +691,12 @@ static class ReloadThread {
689691
*/
690692
private ImmutableSet<CType<?>> reloadRequestedFor = ImmutableSet.of();
691693

694+
/**
695+
* Action listeners to be called when the reload was finished. We collect the action listeners here until
696+
* the reload is actually in progress.
697+
*/
698+
private List<ActionListener<ConfigReloadResponse>> reloadRequestedForActionListeners = new ArrayList<>();
699+
692700
/**
693701
* The time we got the first currently queued reload request.
694702
*/
@@ -708,12 +716,16 @@ static class ReloadThread {
708716
* Requests an async configuration reload for the given configuration types. Calling this method
709717
* will not wait for the configuration reload to complete.
710718
*/
711-
void requestReload(Collection<CType<?>> configurationTypes) {
719+
void requestReload(Collection<CType<?>> configurationTypes, ActionListener<ConfigReloadResponse> actionListener) {
712720
synchronized (this.requestLock) {
713721
if (!this.started) {
714722
LOGGER.info("Cannot reload configuration yet, because the initialization process did not complete yet");
715723
}
716724

725+
if (actionListener != null) {
726+
this.reloadRequestedForActionListeners.add(actionListener);
727+
}
728+
717729
if (this.reloadRequestedFor.isEmpty()) {
718730
LOGGER.debug("Configuration reload request received for {}; notifying update thread", configurationTypes);
719731
this.reloadRequestedAt = Instant.now();
@@ -778,8 +790,9 @@ boolean queueIsEmpty() {
778790

779791
private void run() {
780792
for (;;) {
793+
ImmutableSet<CType<?>> localReloadRequestedFor;
794+
List<ActionListener<ConfigReloadResponse>> localReloadRequestedForActionListeners = null;
781795
try {
782-
ImmutableSet<CType<?>> localReloadRequestedFor;
783796

784797
synchronized (this.requestLock) {
785798
this.reloadInProgressFor = ImmutableSet.of();
@@ -790,8 +803,8 @@ private void run() {
790803

791804
// We save here the requested configuration types in order to pass them to the updateFunction later
792805
localReloadRequestedFor = this.reloadRequestedFor;
806+
localReloadRequestedForActionListeners = new ArrayList<>(this.reloadRequestedForActionListeners);
793807

794-
// this.reloadRequestedAt != null: This means, that we got an update request
795808
LOGGER.info(
796809
"Performing configuration reload for request at {} on {}",
797810
this.reloadRequestedAt,
@@ -802,14 +815,35 @@ private void run() {
802815
// following update process will be already recognized again and queued.
803816
this.reloadRequestedAt = null;
804817
this.reloadRequestedFor = ImmutableSet.of();
818+
this.reloadRequestedForActionListeners.clear();
805819
this.reloadInProgressFor = localReloadRequestedFor;
806820
}
807821

808822
this.performFunction.accept(localReloadRequestedFor);
823+
for (ActionListener<ConfigReloadResponse> listener : localReloadRequestedForActionListeners) {
824+
listener.onResponse(new ConfigReloadResponse(localReloadRequestedFor));
825+
}
809826
} catch (Exception e) {
810827
LOGGER.error("Error in {}", this.thread.getName(), e);
828+
if (localReloadRequestedForActionListeners != null) {
829+
for (ActionListener<ConfigReloadResponse> listener : localReloadRequestedForActionListeners) {
830+
listener.onFailure(e);
831+
}
832+
}
811833
}
812834
}
813835
}
814836
}
837+
838+
public static class ConfigReloadResponse {
839+
private final Set<CType<?>> reloadedConfigTypes;
840+
841+
ConfigReloadResponse(Set<CType<?>> reloadedConfigTypes) {
842+
this.reloadedConfigTypes = reloadedConfigTypes;
843+
}
844+
845+
public Set<CType<?>> getReloadedConfigTypes() {
846+
return reloadedConfigTypes;
847+
}
848+
}
815849
}

0 commit comments

Comments
 (0)