Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,15 @@ public TransportClusterStatsAction(
protected SubscribableListener<AdditionalStats> createActionContext(Task task, ClusterStatsRequest request) {
assert task instanceof CancellableTask;
final var cancellableTask = (CancellableTask) task;
final var additionalStatsListener = new SubscribableListener<AdditionalStats>();
if (request.isRemoteStats() == false) {
final var additionalStatsListener = new SubscribableListener<AdditionalStats>();
final AdditionalStats additionalStats = new AdditionalStats();
additionalStats.compute(cancellableTask, request, additionalStatsListener);
return additionalStatsListener;
} else {
// For remote stats request, we don't need to compute anything
additionalStatsListener.onResponse(null);
return SubscribableListener.nullSuccess();
}
return additionalStatsListener;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,4 +586,15 @@ private Runnable scheduleTimeout(TimeValue timeout, ThreadPool threadPool, Execu
private Object compareAndExchangeState(Object expectedValue, Object newValue) {
return VH_STATE_FIELD.compareAndExchange(this, expectedValue, newValue);
}

@SuppressWarnings("rawtypes")
private static final SubscribableListener NULL_SUCCESS = newSucceeded(null);

/**
* Same as {@link #newSucceeded(Object)} but always returns the same instance with result value {@code null}.
*/
@SuppressWarnings("unchecked")
public static <T> SubscribableListener<T> nullSuccess() {
return NULL_SUCCESS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ protected void doClose() {}
* Kind of a hack tbh, we can't be sure the shard locks are fully released when this is completed so there's all sorts of retries and
* other lenience to handle that. It'd be better to wait for the shard locks to be released and then delete the data. See #74149.
*/
private volatile SubscribableListener<Void> lastClusterStateShardsClosedListener = SubscribableListener.newSucceeded(null);
private volatile SubscribableListener<Void> lastClusterStateShardsClosedListener = SubscribableListener.nullSuccess();

@Nullable // if not currently applying a cluster state
private RefCountingListener currentClusterStateShardsClosedListeners;
Expand Down Expand Up @@ -397,7 +397,7 @@ private void deleteIndices(final ClusterChangedEvent event) {
);
} else if (project.isPresent() && project.get().hasIndex(index)) {
// The deleted index was part of the previous cluster state, but not loaded on the local node
indexServiceClosedListener = SubscribableListener.newSucceeded(null);
indexServiceClosedListener = SubscribableListener.nullSuccess();
final IndexMetadata metadata = project.get().index(index);
indexSettings = new IndexSettings(metadata, settings);
indicesService.deleteUnassignedIndex("deleted index was not assigned to local node", metadata, state);
Expand All @@ -411,7 +411,7 @@ private void deleteIndices(final ClusterChangedEvent event) {
// previous cluster state is not initialized/recovered.
assert state.metadata().projects().values().stream().anyMatch(p -> p.indexGraveyard().containsIndex(index))
|| previousState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
indexServiceClosedListener = SubscribableListener.newSucceeded(null);
indexServiceClosedListener = SubscribableListener.nullSuccess();
final IndexMetadata metadata = indicesService.verifyIndexIsDeleted(index, event.state());
if (metadata != null) {
indexSettings = new IndexSettings(metadata, settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public SubscribableListener<Void> getClusterStateDelayListener(long clusterState
refCounted.decRef();
}
} else {
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ private void deleteTemplates(Set<String> excludeTemplates, ActionListener<Void>
SubscribableListener

// dummy start step for symmetry
.newSucceeded(null)

.nullSuccess()
// delete composable templates
.<GetComposableIndexTemplateAction.Response>andThen(getComposableTemplates::addListener)
.<AcknowledgedResponse>andThen((l, r) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ default IsBlockedResult isBlocked() {
return NOT_BLOCKED;
}

IsBlockedResult NOT_BLOCKED = new IsBlockedResult(SubscribableListener.newSucceeded(null), "not blocked");
IsBlockedResult NOT_BLOCKED = new IsBlockedResult(SubscribableListener.nullSuccess(), "not blocked");

/**
* A factory for creating intermediate operators.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeRe
}
doFetchPageAsync(false, ActionListener.wrap(r -> {
if (r.finished()) {
completionListenerRef.compareAndSet(null, SubscribableListener.newSucceeded(null));
completionListenerRef.compareAndSet(null, SubscribableListener.nullSuccess());
}
listener.onResponse(r);
}, e -> close(ActionListener.running(() -> listener.onFailure(e)))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,6 @@ public SubscribableListener<Void> intercept(
}
}
}
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,6 @@ public SubscribableListener<Void> intercept(
}
}
}
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ && supports(indicesRequest)
return listener;
}
}
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}

abstract void disableFeatures(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public SubscribableListener<Void> intercept(
);
return listener;
} else {
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public SubscribableListener<Void> intercept(
);
return listener;
} else {
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ && hasRemoteIndices(searchRequest)
searchRequest.requestCache(false);
}
}
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}

// package private for test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public void testTaskStatus() throws IOException {
assertEquals(0L, status.indexSnapshotsVerified());
assertEquals(0L, status.blobsVerified());
assertEquals(0L, status.blobBytesVerified());
yield SubscribableListener.newSucceeded(null);
yield SubscribableListener.nullSuccess();
}
case INDEX_RESTORABILITY -> {
// several of these chunks might arrive concurrently; we want to verify the task status before processing any of
Expand All @@ -210,7 +210,7 @@ public void testTaskStatus() throws IOException {
assertEquals(0L, status.indicesVerified());
});
}
case SNAPSHOT_INFO -> SubscribableListener.newSucceeded(null);
case SNAPSHOT_INFO -> SubscribableListener.nullSuccess();
case ANOMALY -> fail(null, "should not see anomalies");
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ protected InputStream openSlice(int slice) throws IOException {
})));
} else {
blobBytesVerified.addAndGet(fileInfo.length());
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private void unassignTransforms(ClusterState state, ActionListener<Void> listene

// chain each call one at a time
// because that is what we are doing for ML, and that is all that is supported in the persistentTasksClusterService (for now)
SubscribableListener<PersistentTasksCustomMetadata.PersistentTask<?>> chainListener = SubscribableListener.newSucceeded(null);
SubscribableListener<PersistentTasksCustomMetadata.PersistentTask<?>> chainListener = SubscribableListener.nullSuccess();
for (var task : transformTasks) {
@FixForMultiProject
final var projectId = Metadata.DEFAULT_PROJECT_ID;
Expand Down