Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,15 @@ public TransportClusterStatsAction(
protected SubscribableListener<AdditionalStats> createActionContext(Task task, ClusterStatsRequest request) {
assert task instanceof CancellableTask;
final var cancellableTask = (CancellableTask) task;
final var additionalStatsListener = new SubscribableListener<AdditionalStats>();
if (request.isRemoteStats() == false) {
final var additionalStatsListener = new SubscribableListener<AdditionalStats>();
final AdditionalStats additionalStats = new AdditionalStats();
additionalStats.compute(cancellableTask, request, additionalStatsListener);
return additionalStatsListener;
} else {
// For remote stats request, we don't need to compute anything
additionalStatsListener.onResponse(null);
return SubscribableListener.nullSuccess();
}
return additionalStatsListener;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,6 @@ public SubscribableListener() {
this(EMPTY);
}

/**
* Create a {@link SubscribableListener} which has already succeeded with the given result.
*/
public static <T> SubscribableListener<T> newSucceeded(T result) {
return new SubscribableListener<>(new SuccessResult<>(result));
}

/**
* Create a {@link SubscribableListener} which has already failed with the given exception.
*/
public static <T> SubscribableListener<T> newFailed(Exception exception) {
return new SubscribableListener<>(new FailureResult(exception, exception));
}

/**
* Create a {@link SubscribableListener}, fork a computation to complete it, and return the listener. If the forking itself throws an
* exception then the exception is caught and fed to the returned listener.
Expand Down Expand Up @@ -586,4 +572,33 @@ private Runnable scheduleTimeout(TimeValue timeout, ThreadPool threadPool, Execu
private Object compareAndExchangeState(Object expectedValue, Object newValue) {
return VH_STATE_FIELD.compareAndExchange(this, expectedValue, newValue);
}

@SuppressWarnings("rawtypes")
private static final SubscribableListener NULL_SUCCESS = newSucceeded(null);

/**
* Same as {@link #newSucceeded(Object)} but always returns the same instance with result value {@code null}.
*/
@SuppressWarnings("unchecked")
public static <T> SubscribableListener<T> nullSuccess() {
return NULL_SUCCESS;
}

/**
* Create a {@link SubscribableListener} which has already succeeded with the given result.
*/
public static <T> SubscribableListener<T> newSucceeded(T result) {
var res = new SubscribableListener<T>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allocates a new Object() for the initial state, can we find a way not to do that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah 🤦 gotta fix that too, otherwise this is kinda pointless. Sec on it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, not sure you'll like this one :) But if I get rid of EMPTY this all falls into place just fine doesn't it?

VH_STATE_FIELD.setRelease(res, new SuccessResult<>(result));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm so my understanding of setRelease is a little sketchy but AIUI it blocks earlier stores/loads from moving later but does not block later stores/loads from moving earlier. Is that valid here? It seems like if we did something like this ...

l = SubscribableListener.newSucceeded(r1);
// make l visible to another thread
l.onResponse(r2);

... then potentially someone could observe r2 rather than r1?

OTOH if there's something magic about newly-constructed objects that makes this work, does the same reasoning not also work for all volatile writes in a constructor (as long as it doesn't have let this escape I guess)? If so, why is the compiler not already doing this?

Basically this feels risky enough that I'd like to see hard data about the benefits, and a big old comment proving its correctness.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not be a problem because you do a volatile mode read on the state field in onResponse to check if it isn't done already. So that later store never actually happens here.

does the same reasoning not also work for all volatile writes in a constructor

There is no magic for newly constructed objects actually. See https://shipilev.net/blog/2016/close-encounters-of-jmm-kind/#wishful-volatiles-are-finals :)

Basically this feels risky enough that I'd like to see hard data about the benefits,

That's probably impossible to come by :) It's dependant on target architecture, JVM version and all of that stuff.
For me it's just, the more we use this primitive, the more likely we are to create a spot where it's advantageous to have some operations reordered to before the creation of these already-complete instances. Also as compilers make progress this becomes more likely :)

return res;
}

/**
* Create a {@link SubscribableListener} which has already failed with the given exception.
*/
public static <T> SubscribableListener<T> newFailed(Exception exception) {
var res = new SubscribableListener<T>();
VH_STATE_FIELD.setRelease(res, new FailureResult(exception, exception));
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ protected void doClose() {}
* Kind of a hack tbh, we can't be sure the shard locks are fully released when this is completed so there's all sorts of retries and
* other lenience to handle that. It'd be better to wait for the shard locks to be released and then delete the data. See #74149.
*/
private volatile SubscribableListener<Void> lastClusterStateShardsClosedListener = SubscribableListener.newSucceeded(null);
private volatile SubscribableListener<Void> lastClusterStateShardsClosedListener = SubscribableListener.nullSuccess();

@Nullable // if not currently applying a cluster state
private RefCountingListener currentClusterStateShardsClosedListeners;
Expand Down Expand Up @@ -397,7 +397,7 @@ private void deleteIndices(final ClusterChangedEvent event) {
);
} else if (project.isPresent() && project.get().hasIndex(index)) {
// The deleted index was part of the previous cluster state, but not loaded on the local node
indexServiceClosedListener = SubscribableListener.newSucceeded(null);
indexServiceClosedListener = SubscribableListener.nullSuccess();
final IndexMetadata metadata = project.get().index(index);
indexSettings = new IndexSettings(metadata, settings);
indicesService.deleteUnassignedIndex("deleted index was not assigned to local node", metadata, state);
Expand All @@ -411,7 +411,7 @@ private void deleteIndices(final ClusterChangedEvent event) {
// previous cluster state is not initialized/recovered.
assert state.metadata().projects().values().stream().anyMatch(p -> p.indexGraveyard().containsIndex(index))
|| previousState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
indexServiceClosedListener = SubscribableListener.newSucceeded(null);
indexServiceClosedListener = SubscribableListener.nullSuccess();
final IndexMetadata metadata = indicesService.verifyIndexIsDeleted(index, event.state());
if (metadata != null) {
indexSettings = new IndexSettings(metadata, settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public SubscribableListener<Void> getClusterStateDelayListener(long clusterState
refCounted.decRef();
}
} else {
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,46 +127,40 @@ private void deleteTemplates(Set<String> excludeTemplates, ActionListener<Void>
l -> client().execute(GetComponentTemplateAction.INSTANCE, new GetComponentTemplateAction.Request(TEST_REQUEST_TIMEOUT, "*"), l)
);

SubscribableListener

// dummy start step for symmetry
.newSucceeded(null)

// delete composable templates
.<GetComposableIndexTemplateAction.Response>andThen(getComposableTemplates::addListener)
.<AcknowledgedResponse>andThen((l, r) -> {
var templates = r.indexTemplates()
.keySet()
.stream()
.filter(template -> excludeTemplates.contains(template) == false)
.toArray(String[]::new);
if (templates.length == 0) {
l.onResponse(AcknowledgedResponse.TRUE);
} else {
var request = new TransportDeleteComposableIndexTemplateAction.Request(templates);
client().execute(TransportDeleteComposableIndexTemplateAction.TYPE, request, l);
}
})
.andThenAccept(ElasticsearchAssertions::assertAcked)

// then delete component templates
.<GetComponentTemplateAction.Response>andThen(getComponentTemplates::addListener)
.<AcknowledgedResponse>andThen((l, response) -> {
var componentTemplates = response.getComponentTemplates()
.keySet()
.stream()
.filter(template -> excludeTemplates.contains(template) == false)
.toArray(String[]::new);
if (componentTemplates.length == 0) {
l.onResponse(AcknowledgedResponse.TRUE);
} else {
client().execute(
TransportDeleteComponentTemplateAction.TYPE,
new TransportDeleteComponentTemplateAction.Request(componentTemplates),
l
);
}
})
// dummy start step for symmetry
SubscribableListener.nullSuccess()
// delete composable templates
.<GetComposableIndexTemplateAction.Response>andThen(getComposableTemplates::addListener).<AcknowledgedResponse>andThen((l, r) -> {
var templates = r.indexTemplates()
.keySet()
.stream()
.filter(template -> excludeTemplates.contains(template) == false)
.toArray(String[]::new);
if (templates.length == 0) {
l.onResponse(AcknowledgedResponse.TRUE);
} else {
var request = new TransportDeleteComposableIndexTemplateAction.Request(templates);
client().execute(TransportDeleteComposableIndexTemplateAction.TYPE, request, l);
}
}).andThenAccept(ElasticsearchAssertions::assertAcked)

// then delete component templates
.<GetComponentTemplateAction.Response>andThen(getComponentTemplates::addListener).<AcknowledgedResponse>andThen((l, response) -> {
var componentTemplates = response.getComponentTemplates()
.keySet()
.stream()
.filter(template -> excludeTemplates.contains(template) == false)
.toArray(String[]::new);
if (componentTemplates.length == 0) {
l.onResponse(AcknowledgedResponse.TRUE);
} else {
client().execute(
TransportDeleteComponentTemplateAction.TYPE,
new TransportDeleteComponentTemplateAction.Request(componentTemplates),
l
);
}
})
.andThenAccept(ElasticsearchAssertions::assertAcked)

// and finish
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ default IsBlockedResult isBlocked() {
return NOT_BLOCKED;
}

IsBlockedResult NOT_BLOCKED = new IsBlockedResult(SubscribableListener.newSucceeded(null), "not blocked");
IsBlockedResult NOT_BLOCKED = new IsBlockedResult(SubscribableListener.nullSuccess(), "not blocked");

/**
* A factory for creating intermediate operators.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeRe
}
doFetchPageAsync(false, ActionListener.wrap(r -> {
if (r.finished()) {
completionListenerRef.compareAndSet(null, SubscribableListener.newSucceeded(null));
completionListenerRef.compareAndSet(null, SubscribableListener.nullSuccess());
}
listener.onResponse(r);
}, e -> close(ActionListener.running(() -> listener.onFailure(e)))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,6 @@ public SubscribableListener<Void> intercept(
}
}
}
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,6 @@ public SubscribableListener<Void> intercept(
}
}
}
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ && supports(indicesRequest)
return listener;
}
}
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}

abstract void disableFeatures(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public SubscribableListener<Void> intercept(
);
return listener;
} else {
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public SubscribableListener<Void> intercept(
);
return listener;
} else {
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ && hasRemoteIndices(searchRequest)
searchRequest.requestCache(false);
}
}
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}

// package private for test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public void testTaskStatus() throws IOException {
assertEquals(0L, status.indexSnapshotsVerified());
assertEquals(0L, status.blobsVerified());
assertEquals(0L, status.blobBytesVerified());
yield SubscribableListener.newSucceeded(null);
yield SubscribableListener.nullSuccess();
}
case INDEX_RESTORABILITY -> {
// several of these chunks might arrive concurrently; we want to verify the task status before processing any of
Expand All @@ -210,7 +210,7 @@ public void testTaskStatus() throws IOException {
assertEquals(0L, status.indicesVerified());
});
}
case SNAPSHOT_INFO -> SubscribableListener.newSucceeded(null);
case SNAPSHOT_INFO -> SubscribableListener.nullSuccess();
case ANOMALY -> fail(null, "should not see anomalies");
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ protected InputStream openSlice(int slice) throws IOException {
})));
} else {
blobBytesVerified.addAndGet(fileInfo.length());
return SubscribableListener.newSucceeded(null);
return SubscribableListener.nullSuccess();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private void unassignTransforms(ClusterState state, ActionListener<Void> listene

// chain each call one at a time
// because that is what we are doing for ML, and that is all that is supported in the persistentTasksClusterService (for now)
SubscribableListener<PersistentTasksCustomMetadata.PersistentTask<?>> chainListener = SubscribableListener.newSucceeded(null);
SubscribableListener<PersistentTasksCustomMetadata.PersistentTask<?>> chainListener = SubscribableListener.nullSuccess();
for (var task : transformTasks) {
@FixForMultiProject
final var projectId = Metadata.DEFAULT_PROJECT_ID;
Expand Down