Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,37 +189,46 @@ public StoredContext newEmptySystemContext() {
* moving tracing-related fields to different names so that a new child span can be started. This child span will pick up
* the moved fields and use them to establish the parent-child relationship.
*
* Response headers will be propagated. If no parent span is in progress (meaning there's no trace context), this will behave exactly
* the same way as {@link #newStoredContextPreservingResponseHeaders}.
*
* @return a stored context, which can be restored when this context is no longer needed.
*/
public StoredContext newTraceContext() {
final ThreadContextStruct originalContext = threadLocal.get();
final Map<String, String> newRequestHeaders = new HashMap<>(originalContext.requestHeaders);
final Map<String, Object> newTransientHeaders = new HashMap<>(originalContext.transientHeaders);

final String previousTraceParent = newRequestHeaders.remove(Task.TRACE_PARENT_HTTP_HEADER);
if (previousTraceParent != null) {
newTransientHeaders.put(Task.PARENT_TRACE_PARENT_HEADER, previousTraceParent);
}
// this is the context when this method returns
final ThreadContextStruct newContext;
if (originalContext.hasTraceContext() == false) {
newContext = originalContext;
} else {
final Map<String, String> newRequestHeaders = new HashMap<>(originalContext.requestHeaders);
final Map<String, Object> newTransientHeaders = new HashMap<>(originalContext.transientHeaders);

final String previousTraceState = newRequestHeaders.remove(Task.TRACE_STATE);
if (previousTraceState != null) {
newTransientHeaders.put(Task.PARENT_TRACE_STATE, previousTraceState);
}
final String previousTraceParent = newRequestHeaders.remove(Task.TRACE_PARENT_HTTP_HEADER);
if (previousTraceParent != null) {
newTransientHeaders.put(Task.PARENT_TRACE_PARENT_HEADER, previousTraceParent);
}

final Object previousTraceContext = newTransientHeaders.remove(Task.APM_TRACE_CONTEXT);
if (previousTraceContext != null) {
newTransientHeaders.put(Task.PARENT_APM_TRACE_CONTEXT, previousTraceContext);
}
final String previousTraceState = newRequestHeaders.remove(Task.TRACE_STATE);
if (previousTraceState != null) {
newTransientHeaders.put(Task.PARENT_TRACE_STATE, previousTraceState);
}

// this is the context when this method returns
final ThreadContextStruct newContext = new ThreadContextStruct(
newRequestHeaders,
originalContext.responseHeaders,
newTransientHeaders,
originalContext.isSystemContext,
originalContext.warningHeadersSize
);
threadLocal.set(newContext);
final Object previousTraceContext = newTransientHeaders.remove(Task.APM_TRACE_CONTEXT);
if (previousTraceContext != null) {
newTransientHeaders.put(Task.PARENT_APM_TRACE_CONTEXT, previousTraceContext);
}

newContext = new ThreadContextStruct(
newRequestHeaders,
originalContext.responseHeaders,
newTransientHeaders,
originalContext.isSystemContext,
originalContext.warningHeadersSize
);
threadLocal.set(newContext);
}
// Tracing shouldn't interrupt the propagation of response headers, so in the same as
// #newStoredContextPreservingResponseHeaders(), pass on any potential changes to the response headers.
return () -> {
Expand All @@ -233,10 +242,11 @@ public StoredContext newTraceContext() {
}

public boolean hasTraceContext() {
final ThreadContextStruct context = threadLocal.get();
return context.requestHeaders.containsKey(Task.TRACE_PARENT_HTTP_HEADER)
|| context.requestHeaders.containsKey(Task.TRACE_STATE)
|| context.transientHeaders.containsKey(Task.APM_TRACE_CONTEXT);
return threadLocal.get().hasTraceContext();
}

public boolean hasParentTraceContext() {
return threadLocal.get().hasParentTraceContext();
}

/**
Expand Down Expand Up @@ -853,6 +863,18 @@ private ThreadContextStruct putResponseHeaders(Map<String, Set<String>> headers)
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext);
}

private boolean hasTraceContext() {
return requestHeaders.containsKey(Task.TRACE_PARENT_HTTP_HEADER)
|| requestHeaders.containsKey(Task.TRACE_STATE)
|| transientHeaders.containsKey(Task.APM_TRACE_CONTEXT);
}

private boolean hasParentTraceContext() {
return transientHeaders.containsKey(Task.PARENT_TRACE_PARENT_HEADER)
|| transientHeaders.containsKey(Task.PARENT_TRACE_STATE)
|| transientHeaders.containsKey(Task.PARENT_APM_TRACE_CONTEXT);
}

private void logWarningHeaderThresholdExceeded(long threshold, Setting<?> thresholdSetting) {
// If available, log some selected headers to help identifying the source of the request.
// Note: Only Task.HEADERS_TO_COPY are guaranteed to be preserved at this point.
Expand Down
5 changes: 5 additions & 0 deletions server/src/main/java/org/elasticsearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class Task implements Traceable {
* TRACE_PARENT once parsed in RestController.tryAllHandler is not preserved
* has to be declared as a header copied over from http request.
* May also be used internally when APM is enabled.
* https://www.w3.org/TR/trace-context-1/#traceparent-header
*/
public static final String TRACE_PARENT_HTTP_HEADER = "traceparent";

Expand All @@ -53,6 +54,10 @@ public class Task implements Traceable {
*/
public static final String TRACE_ID = "trace.id";

/**
* Optional request header carrying vendor-specific trace information.
* https://www.w3.org/TR/trace-context-1/#tracestate-header
*/
public static final String TRACE_STATE = "tracestate";

public static final String TRACE_START_TIME = "trace.starttime";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@

import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLengthBetween;
import static org.elasticsearch.tasks.Task.HEADERS_TO_COPY;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
Expand Down Expand Up @@ -1161,6 +1163,75 @@ public void testNewEmptySystemContext() {
assertNotNull(threadContext.getHeader(header));
}

public void testNewTraceContext() {
final var threadContext = new ThreadContext(Settings.EMPTY);

var rootTraceContext = Map.of(Task.TRACE_PARENT_HTTP_HEADER, randomIdentifier(), Task.TRACE_STATE, randomIdentifier());
var apmTraceContext = new Object();
var responseKey = randomIdentifier();
var responseValue = randomAlphaOfLength(10);

threadContext.putHeader(rootTraceContext);
threadContext.putTransient(Task.APM_TRACE_CONTEXT, apmTraceContext);

assertThat(threadContext.hasTraceContext(), equalTo(true));
assertThat(threadContext.hasParentTraceContext(), equalTo(false));

try (var ignored = threadContext.newTraceContext()) {
assertThat(threadContext.hasTraceContext(), equalTo(false)); // no trace started yet
assertThat(threadContext.hasParentTraceContext(), equalTo(true));

assertThat(threadContext.getHeaders(), is(anEmptyMap()));
assertThat(
threadContext.getTransientHeaders(),
equalTo(
Map.of(
Task.PARENT_TRACE_PARENT_HEADER,
rootTraceContext.get(Task.TRACE_PARENT_HTTP_HEADER),
Task.PARENT_TRACE_STATE,
rootTraceContext.get(Task.TRACE_STATE),
Task.PARENT_APM_TRACE_CONTEXT,
apmTraceContext
)
)
);
// response headers shall be propagated
threadContext.addResponseHeader(responseKey, responseValue);
}

assertThat(threadContext.hasTraceContext(), equalTo(true));
assertThat(threadContext.hasParentTraceContext(), equalTo(false));

assertThat(threadContext.getHeaders(), equalTo(rootTraceContext));
assertThat(threadContext.getTransientHeaders(), equalTo(Map.of(Task.APM_TRACE_CONTEXT, apmTraceContext)));
assertThat(threadContext.getResponseHeaders(), equalTo(Map.of(responseKey, List.of(responseValue))));
}

public void testNewTraceContextWithoutParentTrace() {
final var threadContext = new ThreadContext(Settings.EMPTY);

var responseKey = randomIdentifier();
var responseValue = randomAlphaOfLength(10);

assertThat(threadContext.hasTraceContext(), equalTo(false));
assertThat(threadContext.hasParentTraceContext(), equalTo(false));

try (var ignored = threadContext.newTraceContext()) {
assertTrue(threadContext.isDefaultContext());
assertThat(threadContext.hasTraceContext(), equalTo(false));
assertThat(threadContext.hasParentTraceContext(), equalTo(false));

// discared, just making sure the context is isolated
threadContext.putTransient(randomIdentifier(), randomAlphaOfLength(10));
// response headers shall be propagated
threadContext.addResponseHeader(responseKey, responseValue);
}

assertThat(threadContext.getHeaders(), is(anEmptyMap()));
assertThat(threadContext.getTransientHeaders(), is(anEmptyMap()));
assertThat(threadContext.getResponseHeaders(), equalTo(Map.of(responseKey, List.of(responseValue))));
}

public void testRestoreExistingContext() {
final var threadContext = new ThreadContext(Settings.EMPTY);
final var header = randomIdentifier();
Expand Down