Skip to content

Commit 67caefe

Browse files
committed
Do not start tracing transactions in TaskManager.
This changes TaskManager to only create new tracing spans if a trace parent (and hence a tracing transaction) already exists. Tracing transactions are automatically started in the RestController for external requests. To trace detached, internal transport actions, a transaction has to be explicitly started using Tracer.startTrace. This helps to prevent everlasting tracing transaction for internal self-rescheduling work that is using transport actions. Such transactions have caused memory issues with the APM java agent in the past. Relates to ES-10969
1 parent 74fd66c commit 67caefe

File tree

11 files changed

+288
-75
lines changed

11 files changed

+288
-75
lines changed

modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,10 @@ private Context getParentContext(TraceContext traceContext) {
235235
// You can just pass the Context object directly to another thread (it is immutable and thus thread-safe).
236236

237237
// Attempt to fetch a local parent context first, otherwise look for a remote parent
238-
Context parentContext = traceContext.getTransient("parent_" + Task.APM_TRACE_CONTEXT);
238+
Context parentContext = traceContext.getTransient(Task.PARENT_APM_TRACE_CONTEXT);
239239
if (parentContext == null) {
240-
final String traceParentHeader = traceContext.getTransient("parent_" + Task.TRACE_PARENT_HTTP_HEADER);
241-
final String traceStateHeader = traceContext.getTransient("parent_" + Task.TRACE_STATE);
240+
final String traceParentHeader = traceContext.getTransient(Task.PARENT_TRACE_PARENT_HEADER);
241+
final String traceStateHeader = traceContext.getTransient(Task.PARENT_TRACE_STATE);
242242

243243
if (traceParentHeader != null) {
244244
final Map<String, String> traceContextMap = Maps.newMapWithExpectedSize(2);

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -595,10 +595,10 @@ public void copyRequestHeadersToThreadContext(HttpPreRequest request, ThreadCont
595595
Optional<String> traceId = RestUtils.extractTraceId(traceparent);
596596
if (traceId.isPresent()) {
597597
threadContext.putHeader(Task.TRACE_ID, traceId.get());
598-
threadContext.putTransient("parent_" + Task.TRACE_PARENT_HTTP_HEADER, traceparent);
598+
threadContext.putTransient(Task.PARENT_TRACE_PARENT_HEADER, traceparent);
599599
}
600600
} else if (name.equals(Task.TRACE_STATE)) {
601-
threadContext.putTransient("parent_" + Task.TRACE_STATE, distinctHeaderValues.get(0));
601+
threadContext.putTransient(Task.PARENT_TRACE_STATE, distinctHeaderValues.get(0));
602602
} else {
603603
threadContext.putHeader(name, String.join(",", distinctHeaderValues));
604604
}

server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -193,33 +193,38 @@ public StoredContext newEmptySystemContext() {
193193
*/
194194
public StoredContext newTraceContext() {
195195
final ThreadContextStruct originalContext = threadLocal.get();
196-
final Map<String, String> newRequestHeaders = new HashMap<>(originalContext.requestHeaders);
197-
final Map<String, Object> newTransientHeaders = new HashMap<>(originalContext.transientHeaders);
196+
// this is the context when this method returns
197+
final ThreadContextStruct newContext;
198+
if (originalContext.hasTraceContext() == false) {
199+
newContext = originalContext;
200+
} else {
201+
final Map<String, String> newRequestHeaders = new HashMap<>(originalContext.requestHeaders);
202+
final Map<String, Object> newTransientHeaders = new HashMap<>(originalContext.transientHeaders);
198203

199-
final String previousTraceParent = newRequestHeaders.remove(Task.TRACE_PARENT_HTTP_HEADER);
200-
if (previousTraceParent != null) {
201-
newTransientHeaders.put("parent_" + Task.TRACE_PARENT_HTTP_HEADER, previousTraceParent);
202-
}
204+
final String previousTraceParent = newRequestHeaders.remove(Task.TRACE_PARENT_HTTP_HEADER);
205+
if (previousTraceParent != null) {
206+
newTransientHeaders.put(Task.PARENT_TRACE_PARENT_HEADER, previousTraceParent);
207+
}
203208

204-
final String previousTraceState = newRequestHeaders.remove(Task.TRACE_STATE);
205-
if (previousTraceState != null) {
206-
newTransientHeaders.put("parent_" + Task.TRACE_STATE, previousTraceState);
207-
}
209+
final String previousTraceState = newRequestHeaders.remove(Task.TRACE_STATE);
210+
if (previousTraceState != null) {
211+
newTransientHeaders.put(Task.PARENT_TRACE_STATE, previousTraceState);
212+
}
208213

209-
final Object previousTraceContext = newTransientHeaders.remove(Task.APM_TRACE_CONTEXT);
210-
if (previousTraceContext != null) {
211-
newTransientHeaders.put("parent_" + Task.APM_TRACE_CONTEXT, previousTraceContext);
212-
}
214+
final Object previousTraceContext = newTransientHeaders.remove(Task.APM_TRACE_CONTEXT);
215+
if (previousTraceContext != null) {
216+
newTransientHeaders.put(Task.PARENT_APM_TRACE_CONTEXT, previousTraceContext);
217+
}
213218

214-
// this is the context when this method returns
215-
final ThreadContextStruct newContext = new ThreadContextStruct(
216-
newRequestHeaders,
217-
originalContext.responseHeaders,
218-
newTransientHeaders,
219-
originalContext.isSystemContext,
220-
originalContext.warningHeadersSize
221-
);
222-
threadLocal.set(newContext);
219+
newContext = new ThreadContextStruct(
220+
newRequestHeaders,
221+
originalContext.responseHeaders,
222+
newTransientHeaders,
223+
originalContext.isSystemContext,
224+
originalContext.warningHeadersSize
225+
);
226+
threadLocal.set(newContext);
227+
}
223228
// Tracing shouldn't interrupt the propagation of response headers, so in the same as
224229
// #newStoredContextPreservingResponseHeaders(), pass on any potential changes to the response headers.
225230
return () -> {
@@ -233,10 +238,11 @@ public StoredContext newTraceContext() {
233238
}
234239

235240
public boolean hasTraceContext() {
236-
final ThreadContextStruct context = threadLocal.get();
237-
return context.requestHeaders.containsKey(Task.TRACE_PARENT_HTTP_HEADER)
238-
|| context.requestHeaders.containsKey(Task.TRACE_STATE)
239-
|| context.transientHeaders.containsKey(Task.APM_TRACE_CONTEXT);
241+
return threadLocal.get().hasTraceContext();
242+
}
243+
244+
public boolean hasParentTraceContext() {
245+
return threadLocal.get().hasParentTraceContext();
240246
}
241247

242248
/**
@@ -254,10 +260,10 @@ public StoredContext clearTraceContext() {
254260
newRequestHeaders.remove(Task.TRACE_PARENT_HTTP_HEADER);
255261
newRequestHeaders.remove(Task.TRACE_STATE);
256262

257-
newTransientHeaders.remove("parent_" + Task.TRACE_PARENT_HTTP_HEADER);
258-
newTransientHeaders.remove("parent_" + Task.TRACE_STATE);
263+
newTransientHeaders.remove(Task.PARENT_TRACE_PARENT_HEADER);
264+
newTransientHeaders.remove(Task.PARENT_TRACE_STATE);
259265
newTransientHeaders.remove(Task.APM_TRACE_CONTEXT);
260-
newTransientHeaders.remove("parent_" + Task.APM_TRACE_CONTEXT);
266+
newTransientHeaders.remove(Task.PARENT_APM_TRACE_CONTEXT);
261267

262268
threadLocal.set(
263269
new ThreadContextStruct(
@@ -853,6 +859,18 @@ private ThreadContextStruct putResponseHeaders(Map<String, Set<String>> headers)
853859
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext);
854860
}
855861

862+
private boolean hasTraceContext() {
863+
return requestHeaders.containsKey(Task.TRACE_PARENT_HTTP_HEADER)
864+
|| requestHeaders.containsKey(Task.TRACE_STATE)
865+
|| transientHeaders.containsKey(Task.APM_TRACE_CONTEXT);
866+
}
867+
868+
private boolean hasParentTraceContext() {
869+
return transientHeaders.containsKey(Task.PARENT_TRACE_PARENT_HEADER)
870+
|| transientHeaders.containsKey(Task.PARENT_TRACE_STATE)
871+
|| transientHeaders.containsKey(Task.PARENT_APM_TRACE_CONTEXT);
872+
}
873+
856874
private void logWarningHeaderThresholdExceeded(long threshold, Setting<?> thresholdSetting) {
857875
// If available, log some selected headers to help identifying the source of the request.
858876
// Note: Only Task.HEADERS_TO_COPY are guaranteed to be preserved at this point.

server/src/main/java/org/elasticsearch/tasks/Task.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,14 @@ public class Task implements Traceable {
3131
*/
3232
public static final String X_OPAQUE_ID_HTTP_HEADER = "X-Opaque-Id";
3333

34+
/**
35+
* A request header that indicates the origin of the request from Elastic stack. The value will stored in ThreadContext
36+
* and emitted to ES logs
37+
*/
38+
public static final String X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER = "X-elastic-product-origin";
39+
40+
public static final String X_ELASTIC_PROJECT_ID_HTTP_HEADER = "X-Elastic-Project-Id";
41+
3442
/**
3543
* The request header which is contained in HTTP request. We parse trace.id from it and store it in thread context.
3644
* TRACE_PARENT once parsed in RestController.tryAllHandler is not preserved
@@ -39,28 +47,27 @@ public class Task implements Traceable {
3947
*/
4048
public static final String TRACE_PARENT_HTTP_HEADER = "traceparent";
4149

50+
public static final String TRACE_STATE = "tracestate";
51+
4252
/**
43-
* A request header that indicates the origin of the request from Elastic stack. The value will stored in ThreadContext
44-
* and emitted to ES logs
53+
* Parsed part of traceparent. It is stored in thread context and emitted in logs.
54+
* Has to be declared as a header copied over for tasks.
4555
*/
46-
public static final String X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER = "X-elastic-product-origin";
56+
public static final String TRACE_ID = "trace.id";
4757

48-
public static final String TRACE_STATE = "tracestate";
58+
public static final String TRACE_START_TIME = "trace.starttime";
4959

5060
/**
5161
* Used internally to pass the apm trace context between the nodes
5262
*/
5363
public static final String APM_TRACE_CONTEXT = "apm.local.context";
5464

55-
/**
56-
* Parsed part of traceparent. It is stored in thread context and emitted in logs.
57-
* Has to be declared as a header copied over for tasks.
58-
*/
59-
public static final String TRACE_ID = "trace.id";
65+
public static final String PARENT_TRACE_PARENT_HEADER = "parent_" + Task.TRACE_PARENT_HTTP_HEADER;
66+
67+
public static final String PARENT_TRACE_STATE = "parent_" + Task.TRACE_STATE;
68+
69+
public static final String PARENT_APM_TRACE_CONTEXT = "parent_" + Task.APM_TRACE_CONTEXT;
6070

61-
public static final String TRACE_START_TIME = "trace.starttime";
62-
public static final String TRACE_PARENT = "traceparent";
63-
public static final String X_ELASTIC_PROJECT_ID_HTTP_HEADER = "X-Elastic-Project-Id";
6471

6572
public static final Set<String> HEADERS_TO_COPY = Set.of(
6673
X_OPAQUE_ID_HTTP_HEADER,

server/src/main/java/org/elasticsearch/tasks/TaskManager.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -161,24 +161,32 @@ public Task register(String type, String action, TaskAwareRequest request, boole
161161
Task previousTask = tasks.put(task.getId(), task);
162162
assert previousTask == null;
163163
if (traceRequest) {
164-
startTrace(threadContext, task);
164+
maybeStartTrace(threadContext, task);
165165
}
166166
}
167167
return task;
168168
}
169169

170+
// Start a new trace span if a parent trace context already exists.
171+
// For REST actions this will be the case, otherwise Tracer#startTrace can be used.
170172
// package private for testing
171-
void startTrace(ThreadContext threadContext, Task task) {
173+
void maybeStartTrace(ThreadContext threadContext, Task task) {
174+
if (threadContext.hasParentTraceContext() == false) {
175+
return;
176+
}
172177
TaskId parentTask = task.getParentTaskId();
173-
Map<String, Object> attributes = Map.of(
174-
Tracer.AttributeKeys.TASK_ID,
175-
task.getId(),
176-
Tracer.AttributeKeys.PARENT_TASK_ID,
177-
parentTask.toString()
178-
);
178+
Map<String, Object> attributes = parentTask.isSet()
179+
? Map.of(Tracer.AttributeKeys.TASK_ID, task.getId(), Tracer.AttributeKeys.PARENT_TASK_ID, parentTask.toString())
180+
: Map.of(Tracer.AttributeKeys.TASK_ID, task.getId());
179181
tracer.startTrace(threadContext, task, task.getAction(), attributes);
180182
}
181183

184+
void maybeStopTrace(ThreadContext threadContext, Task task) {
185+
if (threadContext.hasTraceContext()) {
186+
tracer.stopTrace(task);
187+
}
188+
}
189+
182190
public <Request extends ActionRequest, Response extends ActionResponse> Task registerAndExecute(
183191
String type,
184192
TransportAction<Request, Response> action,
@@ -241,7 +249,7 @@ private void registerCancellableTask(Task task, long requestId, boolean traceReq
241249
CancellableTaskHolder holder = new CancellableTaskHolder(cancellableTask);
242250
cancellableTasks.put(task, requestId, holder);
243251
if (traceRequest) {
244-
startTrace(threadPool.getThreadContext(), task);
252+
maybeStartTrace(threadPool.getThreadContext(), task);
245253
}
246254
// Check if this task was banned before we start it.
247255
if (task.getParentTaskId().isSet()) {
@@ -340,7 +348,7 @@ public Task unregister(Task task) {
340348
return removedTask;
341349
}
342350
} finally {
343-
tracer.stopTrace(task);
351+
maybeStopTrace(threadPool.getThreadContext(), task);
344352
for (RemovedTaskListener listener : removedTaskListeners) {
345353
listener.onRemoved(task);
346354
}

server/src/main/java/org/elasticsearch/transport/Transports.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public enum Transports {
1919
;
2020
private static final Set<String> REQUEST_HEADERS_ALLOWED_ON_DEFAULT_THREAD_CONTEXT = Set.of(
2121
Task.TRACE_ID,
22-
Task.TRACE_PARENT,
22+
Task.TRACE_PARENT_HTTP_HEADER,
2323
Task.X_OPAQUE_ID_HTTP_HEADER,
2424
Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER,
2525
Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER

server/src/test/java/org/elasticsearch/common/util/concurrent/ThreadContextTests.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,14 @@
3434

3535
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLengthBetween;
3636
import static org.elasticsearch.tasks.Task.HEADERS_TO_COPY;
37+
import static org.hamcrest.Matchers.anEmptyMap;
3738
import static org.hamcrest.Matchers.contains;
3839
import static org.hamcrest.Matchers.containsInAnyOrder;
3940
import static org.hamcrest.Matchers.equalTo;
4041
import static org.hamcrest.Matchers.hasItem;
4142
import static org.hamcrest.Matchers.hasSize;
4243
import static org.hamcrest.Matchers.instanceOf;
44+
import static org.hamcrest.Matchers.is;
4345
import static org.hamcrest.Matchers.not;
4446
import static org.hamcrest.Matchers.nullValue;
4547
import static org.hamcrest.Matchers.sameInstance;
@@ -1161,6 +1163,75 @@ public void testNewEmptySystemContext() {
11611163
assertNotNull(threadContext.getHeader(header));
11621164
}
11631165

1166+
public void testNewTraceContext() {
1167+
final var threadContext = new ThreadContext(Settings.EMPTY);
1168+
1169+
var rootTraceContext = Map.of(Task.TRACE_PARENT_HTTP_HEADER, randomIdentifier(), Task.TRACE_STATE, randomIdentifier());
1170+
var apmTraceContext = new Object();
1171+
var responseKey = randomIdentifier();
1172+
var responseValue = randomAlphaOfLength(10);
1173+
1174+
threadContext.putHeader(rootTraceContext);
1175+
threadContext.putTransient(Task.APM_TRACE_CONTEXT, apmTraceContext);
1176+
1177+
assertThat(threadContext.hasTraceContext(), equalTo(true));
1178+
assertThat(threadContext.hasParentTraceContext(), equalTo(false));
1179+
1180+
try (var ignored = threadContext.newTraceContext()) {
1181+
assertThat(threadContext.hasTraceContext(), equalTo(false)); // no trace started yet
1182+
assertThat(threadContext.hasParentTraceContext(), equalTo(true));
1183+
1184+
assertThat(threadContext.getHeaders(), is(anEmptyMap()));
1185+
assertThat(
1186+
threadContext.getTransientHeaders(),
1187+
equalTo(
1188+
Map.of(
1189+
Task.PARENT_TRACE_PARENT_HEADER,
1190+
rootTraceContext.get(Task.TRACE_PARENT_HTTP_HEADER),
1191+
Task.PARENT_TRACE_STATE,
1192+
rootTraceContext.get(Task.TRACE_STATE),
1193+
Task.PARENT_APM_TRACE_CONTEXT,
1194+
apmTraceContext
1195+
)
1196+
)
1197+
);
1198+
// response headers shall be propagated
1199+
threadContext.addResponseHeader(responseKey, responseValue);
1200+
}
1201+
1202+
assertThat(threadContext.hasTraceContext(), equalTo(true));
1203+
assertThat(threadContext.hasParentTraceContext(), equalTo(false));
1204+
1205+
assertThat(threadContext.getHeaders(), equalTo(rootTraceContext));
1206+
assertThat(threadContext.getTransientHeaders(), equalTo(Map.of(Task.APM_TRACE_CONTEXT, apmTraceContext)));
1207+
assertThat(threadContext.getResponseHeaders(), equalTo(Map.of(responseKey, List.of(responseValue))));
1208+
}
1209+
1210+
public void testNewTraceContextWithoutParentTrace() {
1211+
final var threadContext = new ThreadContext(Settings.EMPTY);
1212+
1213+
var responseKey = randomIdentifier();
1214+
var responseValue = randomAlphaOfLength(10);
1215+
1216+
assertThat(threadContext.hasTraceContext(), equalTo(false));
1217+
assertThat(threadContext.hasParentTraceContext(), equalTo(false));
1218+
1219+
try (var ignored = threadContext.newTraceContext()) {
1220+
assertTrue(threadContext.isDefaultContext());
1221+
assertThat(threadContext.hasTraceContext(), equalTo(false));
1222+
assertThat(threadContext.hasParentTraceContext(), equalTo(false));
1223+
1224+
// discared, just making sure the context is isolated
1225+
threadContext.putTransient(randomIdentifier(), randomAlphaOfLength(10));
1226+
// response headers shall be propagated
1227+
threadContext.addResponseHeader(responseKey, responseValue);
1228+
}
1229+
1230+
assertThat(threadContext.getHeaders(), is(anEmptyMap()));
1231+
assertThat(threadContext.getTransientHeaders(), is(anEmptyMap()));
1232+
assertThat(threadContext.getResponseHeaders(), equalTo(Map.of(responseKey, List.of(responseValue))));
1233+
}
1234+
11641235
public void testRestoreExistingContext() {
11651236
final var threadContext = new ThreadContext(Settings.EMPTY);
11661237
final var header = randomIdentifier();

0 commit comments

Comments
 (0)