Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,15 @@ public TransportClusterStatsAction(
protected SubscribableListener<AdditionalStats> createActionContext(Task task, ClusterStatsRequest request) {
assert task instanceof CancellableTask;
final var cancellableTask = (CancellableTask) task;
final var additionalStatsListener = new SubscribableListener<AdditionalStats>();
if (request.isRemoteStats() == false) {
final var additionalStatsListener = new SubscribableListener<AdditionalStats>();
final AdditionalStats additionalStats = new AdditionalStats();
additionalStats.compute(cancellableTask, request, additionalStatsListener);
return additionalStatsListener;
} else {
// For remote stats request, we don't need to compute anything
additionalStatsListener.onResponse(null);
return SubscribableListener.nullSuccess();
}
return additionalStatsListener;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,59 @@
public class SubscribableListener<T> implements ActionListener<T> {

private static final Logger logger = LogManager.getLogger(SubscribableListener.class);
private static final Object EMPTY = new Object();

private static final VarHandle VH_STATE_FIELD;

static {
try {
VH_STATE_FIELD = MethodHandles.lookup()
.in(SubscribableListener.class)
.findVarHandle(SubscribableListener.class, "state", Object.class);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}

/**
* If we are incomplete, {@code state} may be one of the following depending on how many waiting subscribers there are:
* <ul>
* <li>If there are no subscribers yet, {@code state} is {@code null}.
* <li>If there is one subscriber, {@code state} is that subscriber.
* <li>If there are multiple subscribers, {@code state} is the head of a linked list of subscribers in reverse order of their
* subscriptions.
* </ul>
* If we are complete, {@code state} is the {@code SuccessResult<T>} or {@code FailureResult} which will be used to complete any
* subsequent subscribers.
*/
@SuppressWarnings("FieldMayBeFinal") // updated via VH_STATE_FIELD (and _only_ via VH_STATE_FIELD)
private volatile Object state;

private Object compareAndExchangeState(Object expectedValue, Object newValue) {
return VH_STATE_FIELD.compareAndExchange(this, expectedValue, newValue);
}

@SuppressWarnings("rawtypes")
private static final SubscribableListener NULL_SUCCESS = newSucceeded(null);

/**
* Same as {@link #newSucceeded(Object)} but always returns the same instance with result value {@code null}.
*/
@SuppressWarnings("unchecked")
public static <T> SubscribableListener<T> nullSuccess() {
return NULL_SUCCESS;
}

/**
* Create a {@link SubscribableListener} which is incomplete.
*/
public SubscribableListener() {
this(EMPTY);
public SubscribableListener() {}

@SuppressWarnings("this-escape")
private SubscribableListener(Object initialState) {
// Final state so release semantics are safe enough since this is the only store we ever do to #state. There are no possible
// later stores that could be reordered in a way to override this store because subsequent stores are all done through CAS with
// volatile read semantics
VH_STATE_FIELD.setRelease(this, initialState);
}

/**
Expand All @@ -134,24 +180,6 @@ public static <T> SubscribableListener<T> newForked(CheckedConsumer<ActionListen
return listener;
}

private SubscribableListener(Object initialState) {
state = initialState;
}

/**
* If we are incomplete, {@code state} may be one of the following depending on how many waiting subscribers there are:
* <ul>
* <li>If there are no subscribers yet, {@code state} is {@link #EMPTY}.
* <li>If there is one subscriber, {@code state} is that subscriber.
* <li>If there are multiple subscribers, {@code state} is the head of a linked list of subscribers in reverse order of their
* subscriptions.
* </ul>
* If we are complete, {@code state} is the {@code SuccessResult<T>} or {@code FailureResult} which will be used to complete any
* subsequent subscribers.
*/
@SuppressWarnings("FieldMayBeFinal") // updated via VH_STATE_FIELD (and _only_ via VH_STATE_FIELD)
private volatile Object state;

/**
* Add a listener to this listener's collection of subscribers. If this listener is complete, this method completes the subscribing
* listener immediately with the result with which this listener was completed. Otherwise, the subscribing listener is retained and
Expand Down Expand Up @@ -212,8 +240,8 @@ public final void addListener(ActionListener<T> listener, Executor executor, @Nu
}

final ActionListener<T> wrappedListener = fork(executor, preserveContext(threadContext, listener));
Object currentValue = compareAndExchangeState(EMPTY, wrappedListener);
if (currentValue == EMPTY) {
Object currentValue = compareAndExchangeState(null, wrappedListener);
if (currentValue == null) {
return;
}
Cell newCell = null;
Expand Down Expand Up @@ -361,7 +389,7 @@ private void setResult(Object result) {
currCell = currCell.next;
}
} else {
assert currentState == EMPTY : "unexpected witness: " + currentState;
assert currentState == null : "unexpected witness: " + currentState;
}
return;
}
Expand Down Expand Up @@ -570,20 +598,4 @@ private Runnable scheduleTimeout(TimeValue timeout, ThreadPool threadPool, Execu
return () -> {};
}
}

private static final VarHandle VH_STATE_FIELD;

static {
try {
VH_STATE_FIELD = MethodHandles.lookup()
.in(SubscribableListener.class)
.findVarHandle(SubscribableListener.class, "state", Object.class);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}

private Object compareAndExchangeState(Object expectedValue, Object newValue) {
return VH_STATE_FIELD.compareAndExchange(this, expectedValue, newValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ protected void doClose() {}
* Kind of a hack tbh, we can't be sure the shard locks are fully released when this is completed so there's all sorts of retries and
* other lenience to handle that. It'd be better to wait for the shard locks to be released and then delete the data. See #74149.
*/
private volatile SubscribableListener<Void> lastClusterStateShardsClosedListener = SubscribableListener.newSucceeded(null);
private volatile SubscribableListener<Void> lastClusterStateShardsClosedListener = SubscribableListener.nullSuccess();

@Nullable // if not currently applying a cluster state
private RefCountingListener currentClusterStateShardsClosedListeners;
Expand Down Expand Up @@ -397,7 +397,7 @@ private void deleteIndices(final ClusterChangedEvent event) {
);
} else if (project.isPresent() && project.get().hasIndex(index)) {
// The deleted index was part of the previous cluster state, but not loaded on the local node
indexServiceClosedListener = SubscribableListener.newSucceeded(null);
indexServiceClosedListener = SubscribableListener.nullSuccess();
final IndexMetadata metadata = project.get().index(index);
indexSettings = new IndexSettings(metadata, settings);
indicesService.deleteUnassignedIndex("deleted index was not assigned to local node", metadata, state);
Expand All @@ -411,7 +411,7 @@ private void deleteIndices(final ClusterChangedEvent event) {
// previous cluster state is not initialized/recovered.
assert state.metadata().projects().values().stream().anyMatch(p -> p.indexGraveyard().containsIndex(index))
|| previousState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
indexServiceClosedListener = SubscribableListener.newSucceeded(null);
indexServiceClosedListener = SubscribableListener.nullSuccess();
final IndexMetadata metadata = indicesService.verifyIndexIsDeleted(index, event.state());
if (metadata != null) {
indexSettings = new IndexSettings(metadata, settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public SubscribableListener<Void> getClusterStateDelayListener(long clusterState
refCounted.decRef();
}
} else {
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ private void deleteTemplates(Set<String> excludeTemplates, ActionListener<Void>
SubscribableListener

// dummy start step for symmetry
.newSucceeded(null)

.nullSuccess()
// delete composable templates
.<GetComposableIndexTemplateAction.Response>andThen(getComposableTemplates::addListener)
.<AcknowledgedResponse>andThen((l, r) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ default IsBlockedResult isBlocked() {
return NOT_BLOCKED;
}

IsBlockedResult NOT_BLOCKED = new IsBlockedResult(SubscribableListener.newSucceeded(null), "not blocked");
IsBlockedResult NOT_BLOCKED = new IsBlockedResult(SubscribableListener.nullSuccess(), "not blocked");

/**
* A factory for creating intermediate operators.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeRe
}
doFetchPageAsync(false, ActionListener.wrap(r -> {
if (r.finished()) {
completionListenerRef.compareAndSet(null, SubscribableListener.newSucceeded(null));
completionListenerRef.compareAndSet(null, SubscribableListener.nullSuccess());
}
listener.onResponse(r);
}, e -> close(ActionListener.running(() -> listener.onFailure(e)))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,6 @@ public SubscribableListener<Void> intercept(
}
}
}
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,6 @@ public SubscribableListener<Void> intercept(
}
}
}
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ && supports(indicesRequest)
return listener;
}
}
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}

abstract void disableFeatures(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public SubscribableListener<Void> intercept(
);
return listener;
} else {
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public SubscribableListener<Void> intercept(
);
return listener;
} else {
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ && hasRemoteIndices(searchRequest)
searchRequest.requestCache(false);
}
}
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}

// package private for test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public void testTaskStatus() throws IOException {
assertEquals(0L, status.indexSnapshotsVerified());
assertEquals(0L, status.blobsVerified());
assertEquals(0L, status.blobBytesVerified());
yield SubscribableListener.newSucceeded(null);
yield SubscribableListener.nullSuccess();
}
case INDEX_RESTORABILITY -> {
// several of these chunks might arrive concurrently; we want to verify the task status before processing any of
Expand All @@ -210,7 +210,7 @@ public void testTaskStatus() throws IOException {
assertEquals(0L, status.indicesVerified());
});
}
case SNAPSHOT_INFO -> SubscribableListener.newSucceeded(null);
case SNAPSHOT_INFO -> SubscribableListener.nullSuccess();
case ANOMALY -> fail(null, "should not see anomalies");
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ protected InputStream openSlice(int slice) throws IOException {
})));
} else {
blobBytesVerified.addAndGet(fileInfo.length());
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private void unassignTransforms(ClusterState state, ActionListener<Void> listene

// chain each call one at a time
// because that is what we are doing for ML, and that is all that is supported in the persistentTasksClusterService (for now)
SubscribableListener<PersistentTasksCustomMetadata.PersistentTask<?>> chainListener = SubscribableListener.newSucceeded(null);
SubscribableListener<PersistentTasksCustomMetadata.PersistentTask<?>> chainListener = SubscribableListener.nullSuccess();
for (var task : transformTasks) {
@FixForMultiProject
final var projectId = Metadata.DEFAULT_PROJECT_ID;
Expand Down