Skip to content

Commit 9ed0978

Browse files
author
Harish Narasimhan
committed
Avoid retaining ClusterState in lambda closures
Extract version and cluster-manager node ID before creating lambda closures to prevent retaining full ClusterState objects in memory. - Add build(long, String) overload and private buildPredicate helper to ClusterManagerNodeChangePredicate - Add StoredState(String, long) constructor and matching ClusterStateObserver constructors accepting pre-extracted values - Update TransportClusterManagerNodeAction methods to capture only primitives and Strings instead of full ClusterState in closures Signed-off-by: Harish Narasimhan <hxarishk@amazon.com>
1 parent 22a8d9d commit 9ed0978

File tree

7 files changed

+733
-15
lines changed

7 files changed

+733
-15
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
5353
- Delegate getMin/getMax methods for ExitableTerms ([#20775](https://github.com/opensearch-project/OpenSearch/pull/20775))
5454
- Fix terms lookup subquery fetch limit reading from non-existent index setting instead of cluster `max_clause_count` ([#20823](https://github.com/opensearch-project/OpenSearch/pull/20823))
5555
- Fix array_index_out_of_bounds_exception with wildcard and aggregations ([#20842](https://github.com/opensearch-project/OpenSearch/pull/20842))
56+
- Reduce ClusterState retention in retry closures of TransportClusterManagerNodeAction ([#20858](https://github.com/opensearch-project/OpenSearch/pull/20858))
5657

5758
### Dependencies
5859
- Bump shadow-gradle-plugin from 8.3.9 to 9.3.1 ([#20569](https://github.com/opensearch-project/OpenSearch/pull/20569))

server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,9 @@ protected void doStart(ClusterState clusterState) {
266266
} else {
267267
if (nodes.getClusterManagerNode() == null) {
268268
logger.debug("no known cluster-manager node, scheduling a retry");
269-
retryOnMasterChange(clusterState, null);
269+
// Extract version before creating closure to avoid retaining full ClusterState
270+
final long stateVersion = clusterState.version();
271+
retryOnMasterChange(stateVersion, null, null, null);
270272
} else {
271273
DiscoveryNode clusterManagerNode = nodes.getClusterManagerNode();
272274
if (clusterManagerNode.getVersion().onOrAfter(V_2_13_0) && localExecuteSupportedByAction()) {
@@ -285,11 +287,23 @@ protected void doStart(ClusterState clusterState) {
285287
}
286288
}
287289

288-
private void retryOnMasterChange(ClusterState state, Throwable failure) {
289-
retry(state, failure, ClusterManagerNodeChangePredicate.build(state));
290+
private void retryOnMasterChange(long stateVersion, String ephemeralNodeId, String persistentNodeId, Throwable failure) {
291+
retry(
292+
stateVersion,
293+
ephemeralNodeId,
294+
persistentNodeId,
295+
failure,
296+
ClusterManagerNodeChangePredicate.build(stateVersion, ephemeralNodeId)
297+
);
290298
}
291299

292-
private void retry(ClusterState state, final Throwable failure, final Predicate<ClusterState> statePredicate) {
300+
private void retry(
301+
final long stateVersion,
302+
final String ephemeralNodeId,
303+
final String persistentNodeId,
304+
final Throwable failure,
305+
final Predicate<ClusterState> statePredicate
306+
) {
293307
if (observer == null) {
294308
final long remainingTimeoutMS = request.clusterManagerNodeTimeout().millis() - (threadPool.relativeTimeInMillis()
295309
- startTime);
@@ -299,7 +313,8 @@ private void retry(ClusterState state, final Throwable failure, final Predicate<
299313
return;
300314
}
301315
this.observer = new ClusterStateObserver(
302-
state,
316+
persistentNodeId,
317+
stateVersion,
303318
clusterService,
304319
TimeValue.timeValueMillis(remainingTimeoutMS),
305320
logger,
@@ -329,6 +344,12 @@ public void onTimeout(TimeValue timeout) {
329344
}
330345

331346
private ActionListener<Response> getDelegateForLocalExecute(ClusterState clusterState) {
347+
// Extract version and cluster manager node IDs before creating closure to avoid retaining full ClusterState
348+
final long stateVersion = clusterState.version();
349+
final DiscoveryNode clusterManagerNode = clusterState.nodes().getClusterManagerNode();
350+
final String ephemeralNodeId = clusterManagerNode != null ? clusterManagerNode.getEphemeralId() : null;
351+
final String persistentNodeId = clusterManagerNode != null ? clusterManagerNode.getId() : null;
352+
332353
return ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
333354
if (t instanceof FailedToCommitClusterStateException || t instanceof NotClusterManagerException) {
334355
logger.debug(
@@ -340,7 +361,7 @@ private ActionListener<Response> getDelegateForLocalExecute(ClusterState cluster
340361
t
341362
);
342363

343-
retryOnMasterChange(clusterState, t);
364+
retryOnMasterChange(stateVersion, ephemeralNodeId, persistentNodeId, t);
344365
} else {
345366
delegatedListener.onFailure(t);
346367
}
@@ -352,6 +373,12 @@ protected BiConsumer<DiscoveryNode, ClusterState> clusterStateLatestChecker(
352373
BiConsumer<DiscoveryNode, ClusterState> onStaleLocalState
353374
) {
354375
return (clusterManagerNode, clusterState) -> {
376+
// Extract version and cluster manager node IDs before creating closure to avoid retaining full ClusterState
377+
final long stateVersion = clusterState.version();
378+
final DiscoveryNode currentClusterManagerNode = clusterState.nodes().getClusterManagerNode();
379+
final String currentEphemeralNodeId = currentClusterManagerNode != null ? currentClusterManagerNode.getEphemeralId() : null;
380+
final String currentPersistentNodeId = currentClusterManagerNode != null ? currentClusterManagerNode.getId() : null;
381+
355382
transportService.sendRequest(
356383
clusterManagerNode,
357384
GetTermVersionAction.NAME,
@@ -377,7 +404,13 @@ public void handleResponse(GetTermVersionResponse response) {
377404

378405
@Override
379406
public void handleException(TransportException exp) {
380-
handleTransportException(clusterManagerNode, clusterState, exp);
407+
handleTransportException(
408+
clusterManagerNode,
409+
stateVersion,
410+
currentEphemeralNodeId,
411+
currentPersistentNodeId,
412+
exp
413+
);
381414
}
382415

383416
@Override
@@ -455,7 +488,12 @@ private boolean checkForBlock(Request request, ClusterState localClusterState) {
455488
listener.onFailure(blockException);
456489
} else {
457490
logger.debug("can't execute due to a cluster block, retrying", blockException);
458-
retry(localClusterState, blockException, newState -> {
491+
// Avoid capturing full ClusterState in the lambda
492+
final long blockStateVersion = localClusterState.version();
493+
final DiscoveryNode blockClusterManagerNode = localClusterState.nodes().getClusterManagerNode();
494+
final String blockEphemeralNodeId = blockClusterManagerNode != null ? blockClusterManagerNode.getEphemeralId() : null;
495+
final String blockPersistentNodeId = blockClusterManagerNode != null ? blockClusterManagerNode.getId() : null;
496+
retry(blockStateVersion, blockEphemeralNodeId, blockPersistentNodeId, blockException, newState -> {
459497
try {
460498
ClusterBlockException newException = checkBlock(request, newState);
461499
return (newException == null || !newException.retryable());
@@ -490,20 +528,32 @@ private void executeOnLocalNode(ClusterState localClusterState) {
490528
private void executeOnClusterManager(DiscoveryNode clusterManagerNode, ClusterState clusterState) {
491529
final String actionName = getClusterManagerActionName(clusterManagerNode);
492530

531+
// Extract version and cluster manager node IDs before creating closure to avoid retaining full ClusterState
532+
final long stateVersion = clusterState.version();
533+
final DiscoveryNode currentClusterManagerNode = clusterState.nodes().getClusterManagerNode();
534+
final String currentEphemeralNodeId = currentClusterManagerNode != null ? currentClusterManagerNode.getEphemeralId() : null;
535+
final String currentPersistentNodeId = currentClusterManagerNode != null ? currentClusterManagerNode.getId() : null;
536+
493537
transportService.sendRequest(
494538
clusterManagerNode,
495539
actionName,
496540
request,
497541
new ActionListenerResponseHandler<Response>(listener, TransportClusterManagerNodeAction.this::read) {
498542
@Override
499543
public void handleException(final TransportException exp) {
500-
handleTransportException(clusterManagerNode, clusterState, exp);
544+
handleTransportException(clusterManagerNode, stateVersion, currentEphemeralNodeId, currentPersistentNodeId, exp);
501545
}
502546
}
503547
);
504548
}
505549

506-
private void handleTransportException(DiscoveryNode clusterManagerNode, ClusterState clusterState, final TransportException exp) {
550+
private void handleTransportException(
551+
DiscoveryNode clusterManagerNode,
552+
long stateVersion,
553+
String ephemeralNodeId,
554+
String persistentNodeId,
555+
final TransportException exp
556+
) {
507557
Throwable cause = exp.unwrapCause();
508558
if (cause instanceof ConnectTransportException
509559
|| (exp instanceof RemoteTransportException && cause instanceof NodeClosedException)) {
@@ -517,7 +567,7 @@ private void handleTransportException(DiscoveryNode clusterManagerNode, ClusterS
517567
exp.getDetailedMessage()
518568
);
519569

520-
retryOnMasterChange(clusterState, cause);
570+
retryOnMasterChange(stateVersion, ephemeralNodeId, persistentNodeId, cause);
521571
} else {
522572
listener.onFailure(exp);
523573
}

server/src/main/java/org/opensearch/cluster/ClusterManagerNodeChangePredicate.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,27 @@ private ClusterManagerNodeChangePredicate() {
4848
}
4949

5050
/**
51-
* builds a predicate that will accept a cluster state only if it was generated after the current has
52-
* (re-)joined the master
51+
* Builds a predicate that accepts a cluster state only if it was generated after the current node
52+
* (re-)joined the cluster manager. Extracts version and cluster-manager ID from the given state
53+
* and delegates to {@link #build(long, String)}.
5354
*/
5455
public static Predicate<ClusterState> build(ClusterState currentState) {
5556
final long currentVersion = currentState.version();
5657
final DiscoveryNode clusterManagerNode = currentState.nodes().getClusterManagerNode();
5758
final String currentMasterId = clusterManagerNode == null ? null : clusterManagerNode.getEphemeralId();
59+
return build(currentVersion, currentMasterId);
60+
}
61+
62+
/**
63+
* Builds a predicate that accepts a cluster state only if the cluster manager has changed
64+
* or the state version has increased beyond the provided version. Accepts pre-extracted
65+
* values to avoid retaining the full {@link ClusterState} in lambda closures.
66+
*
67+
* @param currentVersion the cluster state version to compare against
68+
* @param currentMasterId the ephemeral ID of the current cluster manager node, or null if none
69+
* @return predicate that returns true when cluster manager changes or version increases
70+
*/
71+
public static Predicate<ClusterState> build(long currentVersion, String currentMasterId) {
5872
return newState -> {
5973
final DiscoveryNode newClusterManager = newState.nodes().getClusterManagerNode();
6074
final boolean accept;

server/src/main/java/org/opensearch/cluster/ClusterStateObserver.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,36 @@ public ClusterStateObserver(
117117
this.contextHolder = contextHolder;
118118
}
119119

120+
public ClusterStateObserver(
121+
String clusterManagerNodeId,
122+
long version,
123+
ClusterService clusterService,
124+
@Nullable TimeValue timeout,
125+
Logger logger,
126+
ThreadContext contextHolder
127+
) {
128+
this(clusterManagerNodeId, version, clusterService.getClusterApplierService(), timeout, logger, contextHolder);
129+
}
130+
131+
public ClusterStateObserver(
132+
String clusterManagerNodeId,
133+
long version,
134+
ClusterApplierService clusterApplierService,
135+
@Nullable TimeValue timeout,
136+
Logger logger,
137+
ThreadContext contextHolder
138+
) {
139+
this.clusterApplierService = clusterApplierService;
140+
this.threadPool = clusterApplierService.threadPool();
141+
this.lastObservedState = new AtomicReference<>(new StoredState(clusterManagerNodeId, version));
142+
this.timeOutValue = timeout;
143+
if (timeOutValue != null) {
144+
this.startTimeMS = threadPool.relativeTimeInMillis();
145+
}
146+
this.logger = logger;
147+
this.contextHolder = contextHolder;
148+
}
149+
120150
/** sets the last observed state to the currently applied cluster state and returns it */
121151
public ClusterState setAndGetObservedState() {
122152
if (observingContext.get() != null) {
@@ -311,8 +341,12 @@ private static class StoredState {
311341
private final long version;
312342

313343
StoredState(ClusterState clusterState) {
314-
this.clusterManagerNodeId = clusterState.nodes().getClusterManagerNodeId();
315-
this.version = clusterState.version();
344+
this(clusterState.nodes().getClusterManagerNodeId(), clusterState.version());
345+
}
346+
347+
StoredState(String clusterManagerNodeId, long version) {
348+
this.clusterManagerNodeId = clusterManagerNodeId;
349+
this.version = version;
316350
}
317351

318352
/**

0 commit comments

Comments
 (0)