diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index b39b1760e3da2..4cb75ae83f187 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -157,15 +157,15 @@ public TransportClusterStatsAction( protected SubscribableListener createActionContext(Task task, ClusterStatsRequest request) { assert task instanceof CancellableTask; final var cancellableTask = (CancellableTask) task; - final var additionalStatsListener = new SubscribableListener(); if (request.isRemoteStats() == false) { + final var additionalStatsListener = new SubscribableListener(); final AdditionalStats additionalStats = new AdditionalStats(); additionalStats.compute(cancellableTask, request, additionalStatsListener); + return additionalStatsListener; } else { // For remote stats request, we don't need to compute anything - additionalStatsListener.onResponse(null); + return SubscribableListener.nullSuccess(); } - return additionalStatsListener; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java index 3056f7cda6429..2931a56b56ad3 100644 --- a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java @@ -586,4 +586,15 @@ private Runnable scheduleTimeout(TimeValue timeout, ThreadPool threadPool, Execu private Object compareAndExchangeState(Object expectedValue, Object newValue) { return VH_STATE_FIELD.compareAndExchange(this, expectedValue, newValue); } + + @SuppressWarnings("rawtypes") + private static final SubscribableListener NULL_SUCCESS = newSucceeded(null); + + /** + * Same as {@link #newSucceeded(Object)} but always returns the same instance with result value {@code null}. + */ + @SuppressWarnings("unchecked") + public static SubscribableListener nullSuccess() { + return NULL_SUCCESS; + } } diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index dd537d8bf0a43..3c84d7be8c6b4 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -243,7 +243,7 @@ protected void doClose() {} * Kind of a hack tbh, we can't be sure the shard locks are fully released when this is completed so there's all sorts of retries and * other lenience to handle that. It'd be better to wait for the shard locks to be released and then delete the data. See #74149. */ - private volatile SubscribableListener lastClusterStateShardsClosedListener = SubscribableListener.newSucceeded(null); + private volatile SubscribableListener lastClusterStateShardsClosedListener = SubscribableListener.nullSuccess(); @Nullable // if not currently applying a cluster state private RefCountingListener currentClusterStateShardsClosedListeners; @@ -397,7 +397,7 @@ private void deleteIndices(final ClusterChangedEvent event) { ); } else if (project.isPresent() && project.get().hasIndex(index)) { // The deleted index was part of the previous cluster state, but not loaded on the local node - indexServiceClosedListener = SubscribableListener.newSucceeded(null); + indexServiceClosedListener = SubscribableListener.nullSuccess(); final IndexMetadata metadata = project.get().index(index); indexSettings = new IndexSettings(metadata, settings); indicesService.deleteUnassignedIndex("deleted index was not assigned to local node", metadata, state); @@ -411,7 +411,7 @@ private void deleteIndices(final ClusterChangedEvent event) { // previous cluster state is not initialized/recovered. assert state.metadata().projects().values().stream().anyMatch(p -> p.indexGraveyard().containsIndex(index)) || previousState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK); - indexServiceClosedListener = SubscribableListener.newSucceeded(null); + indexServiceClosedListener = SubscribableListener.nullSuccess(); final IndexMetadata metadata = indicesService.verifyIndexIsDeleted(index, event.state()); if (metadata != null) { indexSettings = new IndexSettings(metadata, settings); diff --git a/test/framework/src/main/java/org/elasticsearch/indices/recovery/RecoveryClusterStateDelayListeners.java b/test/framework/src/main/java/org/elasticsearch/indices/recovery/RecoveryClusterStateDelayListeners.java index 3dfe3003c22ff..eafe07a4287db 100644 --- a/test/framework/src/main/java/org/elasticsearch/indices/recovery/RecoveryClusterStateDelayListeners.java +++ b/test/framework/src/main/java/org/elasticsearch/indices/recovery/RecoveryClusterStateDelayListeners.java @@ -64,7 +64,7 @@ public SubscribableListener getClusterStateDelayListener(long clusterState refCounted.decRef(); } } else { - return SubscribableListener.newSucceeded(null); + return SubscribableListener.nullSuccess(); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/TestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/TestCluster.java index 001bf07b04be0..bc00996e03b2c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TestCluster.java @@ -130,8 +130,7 @@ private void deleteTemplates(Set excludeTemplates, ActionListener SubscribableListener // dummy start step for symmetry - .newSucceeded(null) - + .nullSuccess() // delete composable templates .andThen(getComposableTemplates::addListener) .andThen((l, r) -> { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java index 663e06756551b..46e85bec693e8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java @@ -92,7 +92,7 @@ default IsBlockedResult isBlocked() { return NOT_BLOCKED; } - IsBlockedResult NOT_BLOCKED = new IsBlockedResult(SubscribableListener.newSucceeded(null), "not blocked"); + IsBlockedResult NOT_BLOCKED = new IsBlockedResult(SubscribableListener.nullSuccess(), "not blocked"); /** * A factory for creating intermediate operators. diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index eddcbd97153ef..73e9cc9d54551 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -357,7 +357,7 @@ public void fetchPageAsync(boolean allSourcesFinished, ActionListener { if (r.finished()) { - completionListenerRef.compareAndSet(null, SubscribableListener.newSucceeded(null)); + completionListenerRef.compareAndSet(null, SubscribableListener.nullSuccess()); } listener.onResponse(r); }, e -> close(ActionListener.running(() -> listener.onFailure(e))))); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/BulkShardRequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/BulkShardRequestInterceptor.java index fb9dc02e387e7..d7d41f7dc4506 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/BulkShardRequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/BulkShardRequestInterceptor.java @@ -81,6 +81,6 @@ public SubscribableListener intercept( } } } - return SubscribableListener.newSucceeded(null); + return SubscribableListener.nullSuccess(); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/DlsFlsLicenseRequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/DlsFlsLicenseRequestInterceptor.java index 9d739f6db54ad..685cd63c7cee0 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/DlsFlsLicenseRequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/DlsFlsLicenseRequestInterceptor.java @@ -101,6 +101,6 @@ public SubscribableListener intercept( } } } - return SubscribableListener.newSucceeded(null); + return SubscribableListener.nullSuccess(); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/FieldAndDocumentLevelSecurityRequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/FieldAndDocumentLevelSecurityRequestInterceptor.java index 93e07dabbdcf3..e854e297b9779 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/FieldAndDocumentLevelSecurityRequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/FieldAndDocumentLevelSecurityRequestInterceptor.java @@ -77,7 +77,7 @@ && supports(indicesRequest) return listener; } } - return SubscribableListener.newSucceeded(null); + return SubscribableListener.nullSuccess(); } abstract void disableFeatures( diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java index 93f06efa32e93..fd99a965f558f 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java @@ -125,7 +125,7 @@ public SubscribableListener intercept( ); return listener; } else { - return SubscribableListener.newSucceeded(null); + return SubscribableListener.nullSuccess(); } } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptor.java index e3f13fe1e10cf..a7ba8ec1b712a 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptor.java @@ -103,7 +103,7 @@ public SubscribableListener intercept( ); return listener; } else { - return SubscribableListener.newSucceeded(null); + return SubscribableListener.nullSuccess(); } } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptor.java index 830ea90e3beed..cd88358e6c05f 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptor.java @@ -49,7 +49,7 @@ && hasRemoteIndices(searchRequest) searchRequest.requestCache(false); } } - return SubscribableListener.newSucceeded(null); + return SubscribableListener.nullSuccess(); } // package private for test diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityIT.java index 5725b065aeb06..298dcd9ed94c5 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityIT.java @@ -191,7 +191,7 @@ public void testTaskStatus() throws IOException { assertEquals(0L, status.indexSnapshotsVerified()); assertEquals(0L, status.blobsVerified()); assertEquals(0L, status.blobBytesVerified()); - yield SubscribableListener.newSucceeded(null); + yield SubscribableListener.nullSuccess(); } case INDEX_RESTORABILITY -> { // several of these chunks might arrive concurrently; we want to verify the task status before processing any of @@ -210,7 +210,7 @@ public void testTaskStatus() throws IOException { assertEquals(0L, status.indicesVerified()); }); } - case SNAPSHOT_INFO -> SubscribableListener.newSucceeded(null); + case SNAPSHOT_INFO -> SubscribableListener.nullSuccess(); case ANOMALY -> fail(null, "should not see anomalies"); }; diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryIntegrityVerifier.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryIntegrityVerifier.java index e2708a7f0c111..5fa3282cd57ae 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryIntegrityVerifier.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryIntegrityVerifier.java @@ -798,7 +798,7 @@ protected InputStream openSlice(int slice) throws IOException { }))); } else { blobBytesVerified.addAndGet(fileInfo.length()); - return SubscribableListener.newSucceeded(null); + return SubscribableListener.nullSuccess(); } }); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformUpgradeModeAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformUpgradeModeAction.java index 975c342637ee1..d5f30353f308e 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformUpgradeModeAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformUpgradeModeAction.java @@ -131,7 +131,7 @@ private void unassignTransforms(ClusterState state, ActionListener listene // chain each call one at a time // because that is what we are doing for ML, and that is all that is supported in the persistentTasksClusterService (for now) - SubscribableListener> chainListener = SubscribableListener.newSucceeded(null); + SubscribableListener> chainListener = SubscribableListener.nullSuccess(); for (var task : transformTasks) { @FixForMultiProject final var projectId = Metadata.DEFAULT_PROJECT_ID;