From 67caefec5a148e7e36be14283fa8ae9fd91be093 Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Fri, 4 Jul 2025 09:10:14 +0200 Subject: [PATCH 1/3] Do not start tracing transactions in TaskManager. This changes TaskManager to only create new tracing spans if a trace parent (and hence a tracing transaction) already exists. Tracing transactions are automatically started in the RestController for external requests. To trace detached, internal transport actions, a transaction has to be explicitly started using Tracer.startTrace. This helps to prevent everlasting tracing transaction for internal self-rescheduling work that is using transport actions. Such transactions have caused memory issues with the APM java agent in the past. Relates to ES-10969 --- .../apm/internal/tracing/APMTracer.java | 6 +- .../elasticsearch/action/ActionModule.java | 4 +- .../common/util/concurrent/ThreadContext.java | 78 +++++++---- .../java/org/elasticsearch/tasks/Task.java | 31 +++-- .../org/elasticsearch/tasks/TaskManager.java | 28 ++-- .../elasticsearch/transport/Transports.java | 2 +- .../util/concurrent/ThreadContextTests.java | 71 ++++++++++ .../AbstractHttpServerTransportTests.java | 8 +- .../elasticsearch/tasks/TaskManagerTests.java | 131 ++++++++++++++++-- .../elastic/ElasticInferenceService.java | 2 +- ...nceServiceAuthorizationRequestHandler.java | 2 +- 11 files changed, 288 insertions(+), 75 deletions(-) diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java index 4c2f1f740693a..f0eced2c56b21 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java @@ -235,10 +235,10 @@ private Context getParentContext(TraceContext traceContext) { // You can just pass the Context object directly to another thread (it is immutable and thus thread-safe). // Attempt to fetch a local parent context first, otherwise look for a remote parent - Context parentContext = traceContext.getTransient("parent_" + Task.APM_TRACE_CONTEXT); + Context parentContext = traceContext.getTransient(Task.PARENT_APM_TRACE_CONTEXT); if (parentContext == null) { - final String traceParentHeader = traceContext.getTransient("parent_" + Task.TRACE_PARENT_HTTP_HEADER); - final String traceStateHeader = traceContext.getTransient("parent_" + Task.TRACE_STATE); + final String traceParentHeader = traceContext.getTransient(Task.PARENT_TRACE_PARENT_HEADER); + final String traceStateHeader = traceContext.getTransient(Task.PARENT_TRACE_STATE); if (traceParentHeader != null) { final Map traceContextMap = Maps.newMapWithExpectedSize(2); diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index b17288e222d43..e7a91aa17d1df 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -595,10 +595,10 @@ public void copyRequestHeadersToThreadContext(HttpPreRequest request, ThreadCont Optional traceId = RestUtils.extractTraceId(traceparent); if (traceId.isPresent()) { threadContext.putHeader(Task.TRACE_ID, traceId.get()); - threadContext.putTransient("parent_" + Task.TRACE_PARENT_HTTP_HEADER, traceparent); + threadContext.putTransient(Task.PARENT_TRACE_PARENT_HEADER, traceparent); } } else if (name.equals(Task.TRACE_STATE)) { - threadContext.putTransient("parent_" + Task.TRACE_STATE, distinctHeaderValues.get(0)); + threadContext.putTransient(Task.PARENT_TRACE_STATE, distinctHeaderValues.get(0)); } else { threadContext.putHeader(name, String.join(",", distinctHeaderValues)); } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java index bf5ff9e7c5144..9a8a27ba7cd3f 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java @@ -193,33 +193,38 @@ public StoredContext newEmptySystemContext() { */ public StoredContext newTraceContext() { final ThreadContextStruct originalContext = threadLocal.get(); - final Map newRequestHeaders = new HashMap<>(originalContext.requestHeaders); - final Map newTransientHeaders = new HashMap<>(originalContext.transientHeaders); + // this is the context when this method returns + final ThreadContextStruct newContext; + if (originalContext.hasTraceContext() == false) { + newContext = originalContext; + } else { + final Map newRequestHeaders = new HashMap<>(originalContext.requestHeaders); + final Map newTransientHeaders = new HashMap<>(originalContext.transientHeaders); - final String previousTraceParent = newRequestHeaders.remove(Task.TRACE_PARENT_HTTP_HEADER); - if (previousTraceParent != null) { - newTransientHeaders.put("parent_" + Task.TRACE_PARENT_HTTP_HEADER, previousTraceParent); - } + final String previousTraceParent = newRequestHeaders.remove(Task.TRACE_PARENT_HTTP_HEADER); + if (previousTraceParent != null) { + newTransientHeaders.put(Task.PARENT_TRACE_PARENT_HEADER, previousTraceParent); + } - final String previousTraceState = newRequestHeaders.remove(Task.TRACE_STATE); - if (previousTraceState != null) { - newTransientHeaders.put("parent_" + Task.TRACE_STATE, previousTraceState); - } + final String previousTraceState = newRequestHeaders.remove(Task.TRACE_STATE); + if (previousTraceState != null) { + newTransientHeaders.put(Task.PARENT_TRACE_STATE, previousTraceState); + } - final Object previousTraceContext = newTransientHeaders.remove(Task.APM_TRACE_CONTEXT); - if (previousTraceContext != null) { - newTransientHeaders.put("parent_" + Task.APM_TRACE_CONTEXT, previousTraceContext); - } + final Object previousTraceContext = newTransientHeaders.remove(Task.APM_TRACE_CONTEXT); + if (previousTraceContext != null) { + newTransientHeaders.put(Task.PARENT_APM_TRACE_CONTEXT, previousTraceContext); + } - // this is the context when this method returns - final ThreadContextStruct newContext = new ThreadContextStruct( - newRequestHeaders, - originalContext.responseHeaders, - newTransientHeaders, - originalContext.isSystemContext, - originalContext.warningHeadersSize - ); - threadLocal.set(newContext); + newContext = new ThreadContextStruct( + newRequestHeaders, + originalContext.responseHeaders, + newTransientHeaders, + originalContext.isSystemContext, + originalContext.warningHeadersSize + ); + threadLocal.set(newContext); + } // Tracing shouldn't interrupt the propagation of response headers, so in the same as // #newStoredContextPreservingResponseHeaders(), pass on any potential changes to the response headers. return () -> { @@ -233,10 +238,11 @@ public StoredContext newTraceContext() { } public boolean hasTraceContext() { - final ThreadContextStruct context = threadLocal.get(); - return context.requestHeaders.containsKey(Task.TRACE_PARENT_HTTP_HEADER) - || context.requestHeaders.containsKey(Task.TRACE_STATE) - || context.transientHeaders.containsKey(Task.APM_TRACE_CONTEXT); + return threadLocal.get().hasTraceContext(); + } + + public boolean hasParentTraceContext() { + return threadLocal.get().hasParentTraceContext(); } /** @@ -254,10 +260,10 @@ public StoredContext clearTraceContext() { newRequestHeaders.remove(Task.TRACE_PARENT_HTTP_HEADER); newRequestHeaders.remove(Task.TRACE_STATE); - newTransientHeaders.remove("parent_" + Task.TRACE_PARENT_HTTP_HEADER); - newTransientHeaders.remove("parent_" + Task.TRACE_STATE); + newTransientHeaders.remove(Task.PARENT_TRACE_PARENT_HEADER); + newTransientHeaders.remove(Task.PARENT_TRACE_STATE); newTransientHeaders.remove(Task.APM_TRACE_CONTEXT); - newTransientHeaders.remove("parent_" + Task.APM_TRACE_CONTEXT); + newTransientHeaders.remove(Task.PARENT_APM_TRACE_CONTEXT); threadLocal.set( new ThreadContextStruct( @@ -853,6 +859,18 @@ private ThreadContextStruct putResponseHeaders(Map> headers) return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext); } + private boolean hasTraceContext() { + return requestHeaders.containsKey(Task.TRACE_PARENT_HTTP_HEADER) + || requestHeaders.containsKey(Task.TRACE_STATE) + || transientHeaders.containsKey(Task.APM_TRACE_CONTEXT); + } + + private boolean hasParentTraceContext() { + return transientHeaders.containsKey(Task.PARENT_TRACE_PARENT_HEADER) + || transientHeaders.containsKey(Task.PARENT_TRACE_STATE) + || transientHeaders.containsKey(Task.PARENT_APM_TRACE_CONTEXT); + } + private void logWarningHeaderThresholdExceeded(long threshold, Setting thresholdSetting) { // If available, log some selected headers to help identifying the source of the request. // Note: Only Task.HEADERS_TO_COPY are guaranteed to be preserved at this point. diff --git a/server/src/main/java/org/elasticsearch/tasks/Task.java b/server/src/main/java/org/elasticsearch/tasks/Task.java index b422b2854e7c9..d2cd330307b6c 100644 --- a/server/src/main/java/org/elasticsearch/tasks/Task.java +++ b/server/src/main/java/org/elasticsearch/tasks/Task.java @@ -31,6 +31,14 @@ public class Task implements Traceable { */ public static final String X_OPAQUE_ID_HTTP_HEADER = "X-Opaque-Id"; + /** + * A request header that indicates the origin of the request from Elastic stack. The value will stored in ThreadContext + * and emitted to ES logs + */ + public static final String X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER = "X-elastic-product-origin"; + + public static final String X_ELASTIC_PROJECT_ID_HTTP_HEADER = "X-Elastic-Project-Id"; + /** * The request header which is contained in HTTP request. We parse trace.id from it and store it in thread context. * TRACE_PARENT once parsed in RestController.tryAllHandler is not preserved @@ -39,28 +47,27 @@ public class Task implements Traceable { */ public static final String TRACE_PARENT_HTTP_HEADER = "traceparent"; + public static final String TRACE_STATE = "tracestate"; + /** - * A request header that indicates the origin of the request from Elastic stack. The value will stored in ThreadContext - * and emitted to ES logs + * Parsed part of traceparent. It is stored in thread context and emitted in logs. + * Has to be declared as a header copied over for tasks. */ - public static final String X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER = "X-elastic-product-origin"; + public static final String TRACE_ID = "trace.id"; - public static final String TRACE_STATE = "tracestate"; + public static final String TRACE_START_TIME = "trace.starttime"; /** * Used internally to pass the apm trace context between the nodes */ public static final String APM_TRACE_CONTEXT = "apm.local.context"; - /** - * Parsed part of traceparent. It is stored in thread context and emitted in logs. - * Has to be declared as a header copied over for tasks. - */ - public static final String TRACE_ID = "trace.id"; + public static final String PARENT_TRACE_PARENT_HEADER = "parent_" + Task.TRACE_PARENT_HTTP_HEADER; + + public static final String PARENT_TRACE_STATE = "parent_" + Task.TRACE_STATE; + + public static final String PARENT_APM_TRACE_CONTEXT = "parent_" + Task.APM_TRACE_CONTEXT; - public static final String TRACE_START_TIME = "trace.starttime"; - public static final String TRACE_PARENT = "traceparent"; - public static final String X_ELASTIC_PROJECT_ID_HTTP_HEADER = "X-Elastic-Project-Id"; public static final Set HEADERS_TO_COPY = Set.of( X_OPAQUE_ID_HTTP_HEADER, diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index 249bb1d43119c..be886a6845b75 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -161,24 +161,32 @@ public Task register(String type, String action, TaskAwareRequest request, boole Task previousTask = tasks.put(task.getId(), task); assert previousTask == null; if (traceRequest) { - startTrace(threadContext, task); + maybeStartTrace(threadContext, task); } } return task; } + // Start a new trace span if a parent trace context already exists. + // For REST actions this will be the case, otherwise Tracer#startTrace can be used. // package private for testing - void startTrace(ThreadContext threadContext, Task task) { + void maybeStartTrace(ThreadContext threadContext, Task task) { + if (threadContext.hasParentTraceContext() == false) { + return; + } TaskId parentTask = task.getParentTaskId(); - Map attributes = Map.of( - Tracer.AttributeKeys.TASK_ID, - task.getId(), - Tracer.AttributeKeys.PARENT_TASK_ID, - parentTask.toString() - ); + Map attributes = parentTask.isSet() + ? Map.of(Tracer.AttributeKeys.TASK_ID, task.getId(), Tracer.AttributeKeys.PARENT_TASK_ID, parentTask.toString()) + : Map.of(Tracer.AttributeKeys.TASK_ID, task.getId()); tracer.startTrace(threadContext, task, task.getAction(), attributes); } + void maybeStopTrace(ThreadContext threadContext, Task task) { + if (threadContext.hasTraceContext()) { + tracer.stopTrace(task); + } + } + public Task registerAndExecute( String type, TransportAction action, @@ -241,7 +249,7 @@ private void registerCancellableTask(Task task, long requestId, boolean traceReq CancellableTaskHolder holder = new CancellableTaskHolder(cancellableTask); cancellableTasks.put(task, requestId, holder); if (traceRequest) { - startTrace(threadPool.getThreadContext(), task); + maybeStartTrace(threadPool.getThreadContext(), task); } // Check if this task was banned before we start it. if (task.getParentTaskId().isSet()) { @@ -340,7 +348,7 @@ public Task unregister(Task task) { return removedTask; } } finally { - tracer.stopTrace(task); + maybeStopTrace(threadPool.getThreadContext(), task); for (RemovedTaskListener listener : removedTaskListeners) { listener.onRemoved(task); } diff --git a/server/src/main/java/org/elasticsearch/transport/Transports.java b/server/src/main/java/org/elasticsearch/transport/Transports.java index 679d37b8af035..889f079a4e918 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transports.java +++ b/server/src/main/java/org/elasticsearch/transport/Transports.java @@ -19,7 +19,7 @@ public enum Transports { ; private static final Set REQUEST_HEADERS_ALLOWED_ON_DEFAULT_THREAD_CONTEXT = Set.of( Task.TRACE_ID, - Task.TRACE_PARENT, + Task.TRACE_PARENT_HTTP_HEADER, Task.X_OPAQUE_ID_HTTP_HEADER, Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER, Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/ThreadContextTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/ThreadContextTests.java index 289e1a730db90..1ddc316e6c10e 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/ThreadContextTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/ThreadContextTests.java @@ -34,12 +34,14 @@ import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLengthBetween; import static org.elasticsearch.tasks.Task.HEADERS_TO_COPY; +import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; @@ -1161,6 +1163,75 @@ public void testNewEmptySystemContext() { assertNotNull(threadContext.getHeader(header)); } + public void testNewTraceContext() { + final var threadContext = new ThreadContext(Settings.EMPTY); + + var rootTraceContext = Map.of(Task.TRACE_PARENT_HTTP_HEADER, randomIdentifier(), Task.TRACE_STATE, randomIdentifier()); + var apmTraceContext = new Object(); + var responseKey = randomIdentifier(); + var responseValue = randomAlphaOfLength(10); + + threadContext.putHeader(rootTraceContext); + threadContext.putTransient(Task.APM_TRACE_CONTEXT, apmTraceContext); + + assertThat(threadContext.hasTraceContext(), equalTo(true)); + assertThat(threadContext.hasParentTraceContext(), equalTo(false)); + + try (var ignored = threadContext.newTraceContext()) { + assertThat(threadContext.hasTraceContext(), equalTo(false)); // no trace started yet + assertThat(threadContext.hasParentTraceContext(), equalTo(true)); + + assertThat(threadContext.getHeaders(), is(anEmptyMap())); + assertThat( + threadContext.getTransientHeaders(), + equalTo( + Map.of( + Task.PARENT_TRACE_PARENT_HEADER, + rootTraceContext.get(Task.TRACE_PARENT_HTTP_HEADER), + Task.PARENT_TRACE_STATE, + rootTraceContext.get(Task.TRACE_STATE), + Task.PARENT_APM_TRACE_CONTEXT, + apmTraceContext + ) + ) + ); + // response headers shall be propagated + threadContext.addResponseHeader(responseKey, responseValue); + } + + assertThat(threadContext.hasTraceContext(), equalTo(true)); + assertThat(threadContext.hasParentTraceContext(), equalTo(false)); + + assertThat(threadContext.getHeaders(), equalTo(rootTraceContext)); + assertThat(threadContext.getTransientHeaders(), equalTo(Map.of(Task.APM_TRACE_CONTEXT, apmTraceContext))); + assertThat(threadContext.getResponseHeaders(), equalTo(Map.of(responseKey, List.of(responseValue)))); + } + + public void testNewTraceContextWithoutParentTrace() { + final var threadContext = new ThreadContext(Settings.EMPTY); + + var responseKey = randomIdentifier(); + var responseValue = randomAlphaOfLength(10); + + assertThat(threadContext.hasTraceContext(), equalTo(false)); + assertThat(threadContext.hasParentTraceContext(), equalTo(false)); + + try (var ignored = threadContext.newTraceContext()) { + assertTrue(threadContext.isDefaultContext()); + assertThat(threadContext.hasTraceContext(), equalTo(false)); + assertThat(threadContext.hasParentTraceContext(), equalTo(false)); + + // discared, just making sure the context is isolated + threadContext.putTransient(randomIdentifier(), randomAlphaOfLength(10)); + // response headers shall be propagated + threadContext.addResponseHeader(responseKey, responseValue); + } + + assertThat(threadContext.getHeaders(), is(anEmptyMap())); + assertThat(threadContext.getTransientHeaders(), is(anEmptyMap())); + assertThat(threadContext.getResponseHeaders(), equalTo(Map.of(responseKey, List.of(responseValue)))); + } + public void testRestoreExistingContext() { final var threadContext = new ThreadContext(Settings.EMPTY); final var header = randomIdentifier(); diff --git a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java index d8b6f0fc96ae8..14e380309d750 100644 --- a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java +++ b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java @@ -341,7 +341,7 @@ public void testTraceParentAndTraceId() { public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { assertThat(threadContext.getHeader(Task.TRACE_ID), equalTo("0af7651916cd43dd8448eb211c80319c")); assertThat(threadContext.getHeader(Task.TRACE_PARENT_HTTP_HEADER), nullValue()); - assertThat(threadContext.getTransient("parent_" + Task.TRACE_PARENT_HTTP_HEADER), equalTo(traceParentValue)); + assertThat(threadContext.getTransient(Task.PARENT_TRACE_PARENT_HEADER), equalTo(traceParentValue)); // request trace start time is also set assertTrue(traceStartTimeRef.compareAndSet(null, threadContext.getTransient(Task.TRACE_START_TIME))); assertNotNull(traceStartTimeRef.get()); @@ -352,7 +352,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th // but they're not copied in for bad requests assertThat(threadContext.getHeader(Task.TRACE_ID), nullValue()); assertThat(threadContext.getHeader(Task.TRACE_PARENT_HTTP_HEADER), nullValue()); - assertThat(threadContext.getTransient("parent_" + Task.TRACE_PARENT_HTTP_HEADER), nullValue()); + assertThat(threadContext.getTransient(Task.PARENT_TRACE_PARENT_HEADER), nullValue()); assertThat(threadContext.getTransient(Task.TRACE_START_TIME), nullValue()); } @@ -407,7 +407,7 @@ protected void populatePerRequestThreadContext(RestRequest restRequest, ThreadCo // headers are "null" here, aka not present, because the thread context changes containing them is to be confined to the request assertThat(threadPool.getThreadContext().getHeader(Task.TRACE_ID), nullValue()); assertThat(threadPool.getThreadContext().getHeader(Task.TRACE_PARENT_HTTP_HEADER), nullValue()); - assertThat(threadPool.getThreadContext().getTransient("parent_" + Task.TRACE_PARENT_HTTP_HEADER), nullValue()); + assertThat(threadPool.getThreadContext().getTransient(Task.PARENT_TRACE_PARENT_HEADER), nullValue()); // system clock is not _technically_ monotonic but in practice it's very unlikely to see a discontinuity here assertThat( @@ -419,7 +419,7 @@ protected void populatePerRequestThreadContext(RestRequest restRequest, ThreadCo // headers are "null" here, aka not present, because the thread context changes containing them is to be confined to the request assertThat(threadPool.getThreadContext().getHeader(Task.TRACE_ID), nullValue()); assertThat(threadPool.getThreadContext().getHeader(Task.TRACE_PARENT_HTTP_HEADER), nullValue()); - assertThat(threadPool.getThreadContext().getTransient("parent_" + Task.TRACE_PARENT_HTTP_HEADER), nullValue()); + assertThat(threadPool.getThreadContext().getTransient(Task.PARENT_TRACE_PARENT_HEADER), nullValue()); } } diff --git a/server/src/test/java/org/elasticsearch/tasks/TaskManagerTests.java b/server/src/test/java/org/elasticsearch/tasks/TaskManagerTests.java index be7ffdc60d2ea..94784b94a2691 100644 --- a/server/src/test/java/org/elasticsearch/tasks/TaskManagerTests.java +++ b/server/src/test/java/org/elasticsearch/tasks/TaskManagerTests.java @@ -68,6 +68,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; public class TaskManagerTests extends ESTestCase { @@ -281,7 +282,64 @@ public void testTaskAccounting() { /** * Check that registering a task also causes tracing to be started on that task. */ - public void testRegisterTaskStartsTracing() { + public void testRegisterTaskStartsTracingIfTraceParentExists() { + final Tracer mockTracer = mock(Tracer.class); + final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Set.of(), mockTracer); + + // fake a trace parent + threadPool.getThreadContext().putHeader(Task.TRACE_PARENT_HTTP_HEADER, "traceparent"); + + try (var ignored = threadPool.getThreadContext().newTraceContext()) { + + final Task task = taskManager.register("testType", "testAction", new TaskAwareRequest() { + + @Override + public void setParentTask(TaskId taskId) {} + + @Override + public void setRequestId(long requestId) {} + + @Override + public TaskId getParentTask() { + return TaskId.EMPTY_TASK_ID; + } + }); + + verify(mockTracer).startTrace(any(), eq(task), eq("testAction"), anyMap()); + } + } + + /** + * Check that registering a task also causes tracing to be started on that task. + */ + public void testRegisterTaskSkipsTracingIfTraceParentMissing() { + final Tracer mockTracer = mock(Tracer.class); + final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Set.of(), mockTracer); + + // no trace parent + try (var ignored = threadPool.getThreadContext().newTraceContext()) { + final Task task = taskManager.register("testType", "testAction", new TaskAwareRequest() { + + @Override + public void setParentTask(TaskId taskId) {} + + @Override + public void setRequestId(long requestId) {} + + @Override + public TaskId getParentTask() { + return TaskId.EMPTY_TASK_ID; + } + }); + } + + verifyNoInteractions(mockTracer); + } + + /** + * Check that unregistering a task also causes tracing to be stopped on that task. + */ + public void testUnregisterTaskStopsTracingIfTraceContextExists() { final Tracer mockTracer = mock(Tracer.class); final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Set.of(), mockTracer); @@ -299,13 +357,17 @@ public TaskId getParentTask() { } }); - verify(mockTracer).startTrace(any(), eq(task), eq("testAction"), anyMap()); + // fake a trace context (trace parent) + threadPool.getThreadContext().putHeader(Task.TRACE_PARENT_HTTP_HEADER, "traceparent"); + + taskManager.unregister(task); + verify(mockTracer).stopTrace(task); } /** * Check that unregistering a task also causes tracing to be stopped on that task. */ - public void testUnregisterTaskStopsTracing() { + public void testUnregisterTaskStopsTracingIfTraceContextMissing() { final Tracer mockTracer = mock(Tracer.class); final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Set.of(), mockTracer); @@ -323,18 +385,22 @@ public TaskId getParentTask() { } }); - taskManager.unregister(task); + // no trace context - verify(mockTracer).stopTrace(task); + taskManager.unregister(task); + verifyNoInteractions(mockTracer); } /** - * Check that registering and executing a task also causes tracing to be started and stopped on that task. + * Check that registering and executing a task also causes tracing to be started if a trace parent exists. */ - public void testRegisterAndExecuteStartsAndStopsTracing() { + public void testRegisterAndExecuteStartsTracingIfTraceParentExists() { final Tracer mockTracer = mock(Tracer.class); final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Set.of(), mockTracer); + // fake a trace parent + threadPool.getThreadContext().putHeader(Task.TRACE_PARENT_HTTP_HEADER, "traceparent"); + final Task task = taskManager.registerAndExecute( "testType", new TransportAction( @@ -369,25 +435,68 @@ public TaskId getParentTask() { verify(mockTracer).startTrace(any(), eq(task), eq("actionName"), anyMap()); } + /** + * Check that registering and executing a task skips tracing if trace parent does not exists. + */ + public void testRegisterAndExecuteSkipsTracingIfTraceParentMissing() { + final Tracer mockTracer = mock(Tracer.class); + final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Set.of(), mockTracer); + + // clean thread context without trace parent + + final Task task = taskManager.registerAndExecute( + "testType", + new TransportAction( + "actionName", + new ActionFilters(Set.of()), + taskManager, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ) { + @Override + protected void doExecute(Task task, ActionRequest request, ActionListener listener) { + listener.onResponse(new ActionResponse() { + @Override + public void writeTo(StreamOutput out) {} + }); + } + }, + new ActionRequest() { + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public TaskId getParentTask() { + return TaskId.EMPTY_TASK_ID; + } + }, + null, + ActionTestUtils.assertNoFailureListener(r -> {}) + ); + + verifyNoInteractions(mockTracer); + } + public void testRegisterWithEnabledDisabledTracing() { final Tracer mockTracer = mock(Tracer.class); final TaskManager taskManager = spy(new TaskManager(Settings.EMPTY, threadPool, Set.of(), mockTracer)); taskManager.register("type", "action", makeTaskRequest(true, 123), false); - verify(taskManager, times(0)).startTrace(any(), any()); + verify(taskManager, times(0)).maybeStartTrace(any(), any()); taskManager.register("type", "action", makeTaskRequest(false, 234), false); - verify(taskManager, times(0)).startTrace(any(), any()); + verify(taskManager, times(0)).maybeStartTrace(any(), any()); clearInvocations(taskManager); taskManager.register("type", "action", makeTaskRequest(true, 345), true); - verify(taskManager, times(1)).startTrace(any(), any()); + verify(taskManager, times(1)).maybeStartTrace(any(), any()); clearInvocations(taskManager); taskManager.register("type", "action", makeTaskRequest(false, 456), true); - verify(taskManager, times(1)).startTrace(any(), any()); + verify(taskManager, times(1)).maybeStartTrace(any(), any()); } static class CancellableRequest extends AbstractTransportRequest { diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ElasticInferenceService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ElasticInferenceService.java index 640929b058760..f00db6ca30a18 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ElasticInferenceService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ElasticInferenceService.java @@ -627,7 +627,7 @@ private static List translateToChunkedResults(InferenceInputs private TraceContext getCurrentTraceInfo() { var threadPool = getServiceComponents().threadPool(); - var traceParent = threadPool.getThreadContext().getHeader(Task.TRACE_PARENT); + var traceParent = threadPool.getThreadContext().getHeader(Task.TRACE_PARENT_HTTP_HEADER); var traceState = threadPool.getThreadContext().getHeader(Task.TRACE_STATE); return new TraceContext(traceParent, traceState); diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/authorization/ElasticInferenceServiceAuthorizationRequestHandler.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/authorization/ElasticInferenceServiceAuthorizationRequestHandler.java index 46c56b80e3bec..20da7710d6d5a 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/authorization/ElasticInferenceServiceAuthorizationRequestHandler.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/authorization/ElasticInferenceServiceAuthorizationRequestHandler.java @@ -119,7 +119,7 @@ public void getAuthorization(ActionListener Date: Fri, 4 Jul 2025 07:27:32 +0000 Subject: [PATCH 2/3] [CI] Auto commit changes from spotless --- server/src/main/java/org/elasticsearch/tasks/Task.java | 1 - .../src/main/java/org/elasticsearch/transport/Transports.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/tasks/Task.java b/server/src/main/java/org/elasticsearch/tasks/Task.java index d2cd330307b6c..7e72c7c0b3927 100644 --- a/server/src/main/java/org/elasticsearch/tasks/Task.java +++ b/server/src/main/java/org/elasticsearch/tasks/Task.java @@ -68,7 +68,6 @@ public class Task implements Traceable { public static final String PARENT_APM_TRACE_CONTEXT = "parent_" + Task.APM_TRACE_CONTEXT; - public static final Set HEADERS_TO_COPY = Set.of( X_OPAQUE_ID_HTTP_HEADER, TRACE_PARENT_HTTP_HEADER, diff --git a/server/src/main/java/org/elasticsearch/transport/Transports.java b/server/src/main/java/org/elasticsearch/transport/Transports.java index 889f079a4e918..908d4f0669bee 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transports.java +++ b/server/src/main/java/org/elasticsearch/transport/Transports.java @@ -19,7 +19,7 @@ public enum Transports { ; private static final Set REQUEST_HEADERS_ALLOWED_ON_DEFAULT_THREAD_CONTEXT = Set.of( Task.TRACE_ID, - Task.TRACE_PARENT_HTTP_HEADER, + Task.TRACE_PARENT_HTTP_HEADER, Task.X_OPAQUE_ID_HTTP_HEADER, Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER, Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER From ff9e4d2c8c7d340a39878960c1a9b43baaa29dfa Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 10 Jul 2025 14:48:36 +0200 Subject: [PATCH 3/3] javadocs --- .../common/util/concurrent/ThreadContext.java | 1 + .../src/main/java/org/elasticsearch/tasks/TaskManager.java | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java index 32b66a5252af6..de21750a97082 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java @@ -196,6 +196,7 @@ public StoredContext newEmptySystemContext() { */ public StoredContext newTraceContext() { final ThreadContextStruct originalContext = threadLocal.get(); + // this is the context when this method returns final ThreadContextStruct newContext; if (originalContext.hasTraceContext() == false) { diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index da46e469b030f..2ed347c226870 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -176,9 +176,10 @@ public Task register(String type, String action, TaskAwareRequest request, boole return task; } - // Start a new trace span if a parent trace context already exists. - // For REST actions this will be the case, otherwise Tracer#startTrace can be used. - // package private for testing + /** + * Start a new trace span if a parent trace context already exists. + * For REST actions this will be the case, otherwise {@link Tracer#startTrace} can be used. + */ void maybeStartTrace(ThreadContext threadContext, Task task) { if (threadContext.hasParentTraceContext() == false) { return;