-
Notifications
You must be signed in to change notification settings - Fork 25.6k
ESQL: Change queries ID to be the same as the async #127472
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 17 commits
0383960
a463ba5
0948424
25fee06
9f2e297
ec7f125
79a9669
e3958c0
de60192
9011486
6dfa323
24b654e
6791493
b1eedc6
c6a717a
5a017be
62e421e
de56665
fcdb7a8
398ba4c
f46d1fa
45b4e8e
e61f293
bbd87ac
7adbc29
2abb63d
84dcd7c
46f7494
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| pr: 127472 | ||
| summary: Change queries ID to be the same as the async | ||
| area: ES|QL | ||
| type: feature | ||
| issues: | ||
| - 127187 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,6 +52,14 @@ default Task createTask(long id, String type, String action, TaskId parentTaskId | |
| return new Task(id, type, action, getDescription(), parentTaskId, headers); | ||
| } | ||
|
|
||
| /** | ||
| * Returns the task object that should be used to keep track of the processing of the request, with an extra local node ID. | ||
| */ | ||
| // TODO remove the above overload, use only this one. | ||
| default Task createTask(String localNodeId, long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) { | ||
| return createTask(id, type, action, parentTaskId, headers); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps localNodeId and id should be combined into a TaskId as a parameter to this like parentTaskId?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is assuming that
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P.S. This would make it slightly more involved to remove that above overload, since all the overriders would need to extract the node ID from the |
||
| } | ||
|
|
||
| /** | ||
| * Returns optional description of the request to be displayed by the task manager | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -87,6 +87,7 @@ public class TaskManager implements ClusterStateApplier { | |
| private final ByteSizeValue maxHeaderSize; | ||
| private final Map<TcpChannel, ChannelPendingTaskTracker> channelPendingTaskTrackers = ConcurrentCollections.newConcurrentMap(); | ||
| private final SetOnce<TaskCancellationService> cancellationService = new SetOnce<>(); | ||
| private final SetOnce<String> localNodeId = new SetOnce<>(); | ||
|
||
|
|
||
| private final List<RemovedTaskListener> removedTaskListeners = new CopyOnWriteArrayList<>(); | ||
|
|
||
|
|
@@ -111,6 +112,10 @@ public void setTaskCancellationService(TaskCancellationService taskCancellationS | |
| this.cancellationService.set(taskCancellationService); | ||
| } | ||
|
|
||
| public void setLocalNodeId(String localNodeId) { | ||
| this.localNodeId.set(localNodeId); | ||
| } | ||
|
|
||
| /** | ||
| * Registers a task without parent task | ||
| */ | ||
|
|
@@ -141,7 +146,10 @@ public Task register(String type, String action, TaskAwareRequest request, boole | |
| headers.put(key, httpHeader); | ||
| } | ||
| } | ||
| Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers); | ||
| var localNodeId = this.localNodeId.get(); | ||
| Task task = localNodeId != null | ||
| ? request.createTask(localNodeId, taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers) | ||
GalLalouche marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| : request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers); | ||
| Objects.requireNonNull(task); | ||
| assert task.getParentTaskId().equals(request.getParentTask()) : "Request [ " + request + "] didn't preserve it parentTaskId"; | ||
| if (logger.isTraceEnabled()) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,8 @@ | |
| import org.elasticsearch.common.io.stream.ByteBufferStreamInput; | ||
| import org.elasticsearch.common.io.stream.BytesStreamOutput; | ||
| import org.elasticsearch.common.io.stream.StreamInput; | ||
| import org.elasticsearch.common.io.stream.StreamOutput; | ||
| import org.elasticsearch.common.io.stream.Writeable; | ||
| import org.elasticsearch.tasks.TaskId; | ||
|
|
||
| import java.io.IOException; | ||
|
|
@@ -20,7 +22,7 @@ | |
| /** | ||
| * A class that contains all information related to a submitted async execution. | ||
| */ | ||
| public final class AsyncExecutionId { | ||
| public final class AsyncExecutionId implements Writeable { | ||
| public static final String ASYNC_EXECUTION_ID_HEADER = "X-Elasticsearch-Async-Id"; | ||
| public static final String ASYNC_EXECUTION_IS_RUNNING_HEADER = "X-Elasticsearch-Async-Is-Running"; | ||
|
|
||
|
|
@@ -115,4 +117,13 @@ public static AsyncExecutionId decode(String id) { | |
| } | ||
| return new AsyncExecutionId(docId, new TaskId(taskId), id); | ||
| } | ||
|
|
||
| @Override | ||
| public void writeTo(StreamOutput out) throws IOException { | ||
| out.writeString(getEncoded()); | ||
| } | ||
|
|
||
| public static AsyncExecutionId readFrom(StreamInput input) throws IOException { | ||
| return decode(input.readString()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Worth adding an
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.xpack.core.async; | ||
|
|
||
| import org.elasticsearch.common.io.stream.Writeable; | ||
| import org.elasticsearch.tasks.TaskId; | ||
| import org.elasticsearch.test.AbstractWireSerializingTestCase; | ||
|
|
||
| import java.io.IOException; | ||
|
|
||
| public class AsyncExecutionIdWireTests extends AbstractWireSerializingTestCase<AsyncExecutionId> { | ||
| @Override | ||
| protected Writeable.Reader<AsyncExecutionId> instanceReader() { | ||
| return AsyncExecutionId::readFrom; | ||
| } | ||
|
|
||
| @Override | ||
| protected AsyncExecutionId createTestInstance() { | ||
| return new AsyncExecutionId(randomAlphaOfLength(15), new TaskId(randomAlphaOfLength(10), randomLong())); | ||
| } | ||
|
|
||
| @Override | ||
| protected AsyncExecutionId mutateInstance(AsyncExecutionId instance) throws IOException { | ||
| return new AsyncExecutionId( | ||
| instance.getDocId(), | ||
| new TaskId(instance.getTaskId().getNodeId(), instance.getTaskId().getId() * 12345) | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a TODO you are planning to followup with?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I wanted to keep this PR cleaner.