Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.action;

import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.util.concurrent.ThreadContext;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
* Wraps an async action that consumes an {@link ActionListener} such that multiple invocations of {@link #execute(ActionListener)} can
* share the result from a single call to the wrapped action. This implementation is similar to {@link StrictSingleResultDeduplicator} but
* relaxed in the sense that it allows the result of a currently running computation to be used for listeners that queue up during that
* computation.
*
* @param <T> Result type
*/
public class RelaxedSingleResultDeduplicator<T> extends SingleResultDeduplicator<T> {

private ActionListenerList<T> waitingListeners;

public RelaxedSingleResultDeduplicator(ThreadContext threadContext, Consumer<ActionListener<T>> executeAction) {
super(threadContext, executeAction);
}

@Override
public void execute(ActionListener<T> listener) {
final var wrappedListener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
synchronized (this) {
if (waitingListeners != null) {
waitingListeners.add(wrappedListener);
return;
}
waitingListeners = new ActionListenerList<>();
waitingListeners.add(wrappedListener);
}
final var currentWaitingListeners = waitingListeners;
SubscribableListener.newForked(executeAction::accept).addListener(ActionListener.runBefore(currentWaitingListeners, () -> {
synchronized (this) {
waitingListeners = null;
}
}));
}

private static class ActionListenerList<T> implements ActionListener<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about just using SubscribableListener here? It uses a linked list so slightly more expensive in general I guess but also more efficient in the common one-subscriber case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, makes sense. This is trimmed down quite a bit and as you mentioned this code could just be used directly as needed in TransportGetAllocationStatsAction. I'm fine with closing this and not touching SingleResultDeduplicator. I can create a separate PR to update the class javadoc for SingleResultDeduplicator with the expected usage pattern, or revert changes here and just make that javadoc update.

private final List<ActionListener<T>> listeners = new ArrayList<>();

void add(ActionListener<T> listener) {
listeners.add(listener);
}

@Override
public void onResponse(T response) {
ActionListener.onResponse(listeners, response);
}

@Override
public void onFailure(Exception e) {
ActionListener.onFailure(listeners, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,117 +9,31 @@

package org.elasticsearch.action;

import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
*
* Wraps an async action that consumes an {@link ActionListener} such that multiple invocations of {@link #execute(ActionListener)} can
* share the result from a single call to the wrapped action. This implementation is similar to {@link ResultDeduplicator} but offers
* stronger guarantees of not seeing a stale result ever. Concretely, every invocation of {@link #execute(ActionListener)} is guaranteed to
* be resolved with a response that has been computed at a time after the call to {@code execute} has been made. This allows this class to
* be used to deduplicate results from actions that produce results that change over time transparently.
* share the result from a single call to the wrapped action.
*
* @param <T> Result type
*/
public final class SingleResultDeduplicator<T> {

private final ThreadContext threadContext;

/**
* List of listeners waiting for the execution after the current in-progress execution. If {@code null} then no execution is in
* progress currently, otherwise an execution is in progress and will trigger another execution that will resolve any listeners queued
* up here once done.
*/
private List<ActionListener<T>> waitingListeners;
/**
* The threadContext associated with the first listener in the waitingListeners. This context will be restored right before
* we perform the {@code executeAction}.
*/
private ThreadContext.StoredContext waitingStoredContext;
public abstract class SingleResultDeduplicator<T> {

private final Consumer<ActionListener<T>> executeAction;
protected final ThreadContext threadContext;
protected final Consumer<ActionListener<T>> executeAction;

public SingleResultDeduplicator(ThreadContext threadContext, Consumer<ActionListener<T>> executeAction) {
this.threadContext = threadContext;
this.executeAction = executeAction;
}

/**
* Execute the action for the given {@code listener}.
* Execute the action for the given {@link ActionListener}.
* @param listener listener to resolve with execution result. The listener always has its threadContext preserved, i.e.
* when the listener is invoked, it will see its original threadContext plus any response headers generated
* by performing the {@code executeAction}.
*/
public void execute(ActionListener<T> listener) {
synchronized (this) {
if (waitingListeners == null) {
// no queued up listeners, just execute this one directly without deduplication and instantiate the list so that
// subsequent executions will wait
waitingListeners = new ArrayList<>();
waitingStoredContext = null;
} else {
// already running an execution, queue this one up
if (waitingListeners.isEmpty()) {
// Only the first listener in queue needs the stored context which is used for running executeAction
assert waitingStoredContext == null;
waitingStoredContext = threadContext.newStoredContext();
}
waitingListeners.add(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext));
return;
}
}
doExecute(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext), null);
}

private void doExecute(ActionListener<T> listener, @Nullable ThreadContext.StoredContext storedContext) {
final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> {
final List<ActionListener<T>> listeners;
final ThreadContext.StoredContext thisStoredContext;
synchronized (this) {
if (waitingListeners.isEmpty()) {
// no listeners were queued up while this execution ran, so we just reset the state to not having a running execution
waitingListeners = null;
waitingStoredContext = null;
return;
} else {
// we have queued up listeners, so we create a fresh list for the next execution and execute once to handle the
// listeners currently queued up
listeners = waitingListeners;
thisStoredContext = waitingStoredContext;
// This batch of listeners will use the context of the first listener in this batch for the work execution
assert thisStoredContext != null : "stored context must not be null for the first listener in a batch";
waitingListeners = new ArrayList<>();
waitingStoredContext = null;
}
}

// Create a child threadContext so that the parent context remains unchanged when the child execution ends.
// This ensures the parent does not see response headers from the child execution.
try (var ignore = threadContext.newStoredContext()) {
doExecute(new ActionListener<>() {
@Override
public void onResponse(T response) {
ActionListener.onResponse(listeners, response);
}

@Override
public void onFailure(Exception e) {
ActionListener.onFailure(listeners, e);
}
}, thisStoredContext);
}
});
// Restore the given threadContext before proceed with the work execution.
// This ensures all executions begin execution with their own context.
if (storedContext != null) {
storedContext.restore();
}
ActionListener.run(wrappedListener, executeAction::accept);
}
public abstract void execute(ActionListener<T> listener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.action;

import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
*
* Wraps an async action that consumes an {@link ActionListener} such that multiple invocations of {@link #execute(ActionListener)} can
* share the result from a single call to the wrapped action. This implementation is similar to {@link ResultDeduplicator} but offers
* stronger guarantees of not seeing a stale result ever. Concretely, every invocation of {@link #execute(ActionListener)} is guaranteed to
* be resolved with a response that has been computed at a time after the call to {@code execute} has been made. This allows this class to
* be used to deduplicate results from actions that produce results that change over time transparently.
*
* @param <T> Result type
*/
public final class StrictSingleResultDeduplicator<T> extends SingleResultDeduplicator<T> {

/**
* List of listeners waiting for the execution after the current in-progress execution. If {@code null} then no execution is in
* progress currently, otherwise an execution is in progress and will trigger another execution that will resolve any listeners queued
* up here once done.
*/
private List<ActionListener<T>> waitingListeners;
/**
* The threadContext associated with the first listener in the waitingListeners. This context will be restored right before
* we perform the {@code executeAction}.
*/
private ThreadContext.StoredContext waitingStoredContext;

public StrictSingleResultDeduplicator(ThreadContext threadContext, Consumer<ActionListener<T>> executeAction) {
super(threadContext, executeAction);
}

@Override
public void execute(ActionListener<T> listener) {
synchronized (this) {
if (waitingListeners == null) {
// no queued up listeners, just execute this one directly without deduplication and instantiate the list so that
// subsequent executions will wait
waitingListeners = new ArrayList<>();
waitingStoredContext = null;
} else {
// already running an execution, queue this one up
if (waitingListeners.isEmpty()) {
// Only the first listener in queue needs the stored context which is used for running executeAction
assert waitingStoredContext == null;
waitingStoredContext = threadContext.newStoredContext();
}
waitingListeners.add(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext));
return;
}
}
doExecute(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext), null);
}

private void doExecute(ActionListener<T> listener, @Nullable ThreadContext.StoredContext storedContext) {
final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> {
final List<ActionListener<T>> listeners;
final ThreadContext.StoredContext thisStoredContext;
synchronized (this) {
if (waitingListeners.isEmpty()) {
// no listeners were queued up while this execution ran, so we just reset the state to not having a running execution
waitingListeners = null;
waitingStoredContext = null;
return;
} else {
// we have queued up listeners, so we create a fresh list for the next execution and execute once to handle the
// listeners currently queued up
listeners = waitingListeners;
thisStoredContext = waitingStoredContext;
// This batch of listeners will use the context of the first listener in this batch for the work execution
assert thisStoredContext != null : "stored context must not be null for the first listener in a batch";
waitingListeners = new ArrayList<>();
waitingStoredContext = null;
}
}

// Create a child threadContext so that the parent context remains unchanged when the child execution ends.
// This ensures the parent does not see response headers from the child execution.
try (var ignore = threadContext.newStoredContext()) {
doExecute(new ActionListener<>() {
@Override
public void onResponse(T response) {
ActionListener.onResponse(listeners, response);
}

@Override
public void onFailure(Exception e) {
ActionListener.onFailure(listeners, e);
}
}, thisStoredContext);
}
});
// Restore the given threadContext before proceed with the work execution.
// This ensures all executions begin execution with their own context.
if (storedContext != null) {
storedContext.restore();
}
ActionListener.run(wrappedListener, executeAction::accept);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.RelaxedSingleResultDeduplicator;
import org.elasticsearch.action.SingleResultDeduplicator;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric;
import org.elasticsearch.action.support.ActionFilters;
Expand Down Expand Up @@ -87,7 +88,7 @@ public TransportGetAllocationStatsAction(
);
final var managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
this.allocationStatsCache = new AllocationStatsCache(threadPool, DEFAULT_CACHE_TTL);
this.allocationStatsSupplier = new SingleResultDeduplicator<>(threadPool.getThreadContext(), l -> {
this.allocationStatsSupplier = new RelaxedSingleResultDeduplicator<>(threadPool.getThreadContext(), l -> {
final var cachedStats = allocationStatsCache.get();
if (cachedStats != null) {
l.onResponse(cachedStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.SingleResultDeduplicator;
import org.elasticsearch.action.StrictSingleResultDeduplicator;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener;
Expand Down Expand Up @@ -506,7 +507,7 @@ protected BlobStoreRepository(
this.namedXContentRegistry = namedXContentRegistry;
this.basePath = basePath;
this.maxSnapshotCount = MAX_SNAPSHOTS_SETTING.get(metadata.settings());
this.repoDataLoadDeduplicator = new SingleResultDeduplicator<>(
this.repoDataLoadDeduplicator = new StrictSingleResultDeduplicator<>(
threadPool.getThreadContext(),
listener -> threadPool.executor(ThreadPool.Names.SNAPSHOT_META)
.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData))
Expand Down
Loading