Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public Map<String, Object> runtimeFields() {
return this.runtimeFields;
}

Long nowInMillis() {
public Long nowInMillis() {
return nowInMillis;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
/**
* Dispatches child field-caps requests to old/new data nodes in the local cluster that have shards of the requesting indices.
*/
final class RequestDispatcher {
public final class RequestDispatcher {
static final Logger LOGGER = LogManager.getLogger(RequestDispatcher.class);

private final TransportService transportService;
Expand All @@ -74,7 +74,7 @@ final class RequestDispatcher {
private final AtomicInteger executionRound = new AtomicInteger();
private final Map<String, IndexSelector> indexSelectors;

RequestDispatcher(
public RequestDispatcher(
ClusterService clusterService,
TransportService transportService,
ProjectResolver projectResolver,
Expand Down Expand Up @@ -127,7 +127,7 @@ final class RequestDispatcher {
}
}

void execute() {
public void execute() {
executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,40 +129,19 @@ public TransportFieldCapabilitiesAction(

@Override
protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
executeRequest(
task,
request,
(transportService, conn, fieldCapabilitiesRequest, responseHandler) -> transportService.sendRequest(
conn,
REMOTE_TYPE.name(),
fieldCapabilitiesRequest,
TransportRequestOptions.EMPTY,
responseHandler
),
listener
);
executeRequest(task, request, listener);
}

public void executeRequest(
Task task,
FieldCapabilitiesRequest request,
LinkedRequestExecutor linkedRequestExecutor,
ActionListener<FieldCapabilitiesResponse> listener
) {
public void executeRequest(Task task, FieldCapabilitiesRequest request, ActionListener<FieldCapabilitiesResponse> listener) {
// workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, linkedRequestExecutor, l)));
searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, l)));
}

private void doExecuteForked(
Task task,
FieldCapabilitiesRequest request,
LinkedRequestExecutor linkedRequestExecutor,
ActionListener<FieldCapabilitiesResponse> listener
) {
private void doExecuteForked(Task task, FieldCapabilitiesRequest request, ActionListener<FieldCapabilitiesResponse> listener) {
if (ccsCheckCompatibility) {
checkCCSVersionCompatibility(request);
}
final Executor singleThreadedExecutor = buildSingleThreadedExecutor();
final Executor singleThreadedExecutor = buildSingleThreadedExecutor(searchCoordinationExecutor, LOGGER);
assert task instanceof CancellableTask;
final CancellableTask fieldCapTask = (CancellableTask) task;
// retrieve the initial timestamp in case the action is a cross cluster search
Expand Down Expand Up @@ -322,10 +301,11 @@ private void doExecuteForked(
true,
ActionListener.releaseAfter(remoteListener, refs.acquire())
).delegateFailure(
(responseListener, conn) -> linkedRequestExecutor.executeRemoteRequest(
transportService,
(responseListener, conn) -> transportService.sendRequest(
conn,
REMOTE_TYPE.name(),
remoteRequest,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(responseListener, FieldCapabilitiesResponse::new, singleThreadedExecutor)
)
)
Expand All @@ -339,7 +319,7 @@ private void doExecuteForked(
}
}

private Executor buildSingleThreadedExecutor() {
public static Executor buildSingleThreadedExecutor(Executor searchCoordinationExecutor, Logger logger) {
final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner("field_caps", 1, searchCoordinationExecutor);
return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() {
@Override
Expand All @@ -362,16 +342,7 @@ public void onFailure(Exception e) {
});
}

public interface LinkedRequestExecutor {
void executeRemoteRequest(
TransportService transportService,
Transport.Connection conn,
FieldCapabilitiesRequest remoteRequest,
ActionListenerResponseHandler<FieldCapabilitiesResponse> responseHandler
);
}

private static void checkIndexBlocks(ProjectState projectState, String[] concreteIndices) {
public static void checkIndexBlocks(ProjectState projectState, String[] concreteIndices) {
var blocks = projectState.blocks();
var projectId = projectState.projectId();
if (blocks.global(projectId).isEmpty() && blocks.indices(projectId).isEmpty()) {
Expand Down Expand Up @@ -421,7 +392,7 @@ private static void mergeIndexResponses(
}
}

private static FieldCapabilitiesRequest prepareRemoteRequest(
public static FieldCapabilitiesRequest prepareRemoteRequest(
String clusterAlias,
FieldCapabilitiesRequest request,
OriginalIndices originalIndices,
Expand Down Expand Up @@ -621,10 +592,10 @@ private static void innerMerge(
* This collector can contain a failure for an index even if one of its shards was successful. When building the final
* list, these failures will be skipped because they have no affect on the final response.
*/
private static final class FailureCollector {
public static final class FailureCollector {
private final Map<String, Exception> failuresByIndex = new HashMap<>();

List<FieldCapabilitiesFailure> build(Set<String> successfulIndices) {
public List<FieldCapabilitiesFailure> build(Set<String> successfulIndices) {
Map<Tuple<String, String>, FieldCapabilitiesFailure> indexFailures = new HashMap<>();
for (Map.Entry<String, Exception> failure : failuresByIndex.entrySet()) {
String index = failure.getKey();
Expand Down Expand Up @@ -653,15 +624,15 @@ List<FieldCapabilitiesFailure> build(Set<String> successfulIndices) {
return new ArrayList<>(indexFailures.values());
}

void collect(String index, Exception e) {
public void collect(String index, Exception e) {
failuresByIndex.putIfAbsent(index, e);
}

void clear() {
public void clear() {
failuresByIndex.clear();
}

boolean isEmpty() {
public boolean isEmpty() {
return failuresByIndex.isEmpty();
}
}
Expand Down Expand Up @@ -724,8 +695,8 @@ public void messageReceived(FieldCapabilitiesNodeRequest request, TransportChann
}
}

private static class ForkingOnFailureActionListener<Response> extends AbstractThreadedActionListener<Response> {
ForkingOnFailureActionListener(Executor executor, boolean forceExecution, ActionListener<Response> delegate) {
public static class ForkingOnFailureActionListener<Response> extends AbstractThreadedActionListener<Response> {
public ForkingOnFailureActionListener(Executor executor, boolean forceExecution, ActionListener<Response> delegate) {
super(executor, forceExecution, delegate);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9189000,9185001
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.2.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
initial_9.2.0,9185000
esql_resolve_fields_response_created,9185001
1 change: 1 addition & 0 deletions server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
esql_resolve_fields_response_created,9189000
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ protected boolean fetchDenseVectorAggMetricDoubleIfFns() throws IOException {

protected boolean supportsNodeAssignment() throws IOException {
if (supportsNodeAssignment == null) {
for (NodeInfo i : allNodeToInfo().values()) {
logger.error("NOCOMMIT {}", i);
}
supportsNodeAssignment = allNodeToInfo().values()
.stream()
.allMatch(i -> (i.roles.contains("index") && i.roles.contains("search")) || (i.roles.contains("data")));
Expand Down Expand Up @@ -542,10 +539,8 @@ private boolean syntheticSourceByDefault() {
}

private Map<String, NodeInfo> expectedIndices() throws IOException {
logger.error("ADFADF NOCOMMIT");
Map<String, NodeInfo> result = new TreeMap<>();
if (supportsNodeAssignment()) {
logger.error("supports {}", allNodeToInfo());
for (Map.Entry<String, NodeInfo> e : allNodeToInfo().entrySet()) {
String name = indexMode + "_" + e.getKey();
if (e.getValue().cluster != null) {
Expand All @@ -554,7 +549,6 @@ private Map<String, NodeInfo> expectedIndices() throws IOException {
result.put(name, e.getValue());
}
} else {
logger.error("one per {}", allNodeToInfo());
for (Map.Entry<String, NodeInfo> e : allNodeToInfo().entrySet()) {
String name = indexMode.toString();
if (e.getValue().cluster != null) {
Expand Down
Loading