Skip to content

Commit 02f7e62

Browse files
authored
Handle profiler registration messages (#204)
1 parent 61fbcf5 commit 02f7e62

File tree

8 files changed

+570
-74
lines changed

8 files changed

+570
-74
lines changed

universal-profiling-integration/src/main/java/co/elastic/otel/SpanProfilingSamplesCorrelator.java

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,18 @@ public class SpanProfilingSamplesCorrelator {
5353
private final RingBuffer<DelayedSpan> delayedSpans;
5454
private final PeekingPoller<DelayedSpan> delayedSpansPoller;
5555

56-
private volatile long spanDelayNanos;
56+
private volatile long spanBufferDurationNanos;
57+
private volatile boolean shuttingDown = false;
5758

5859
private final WriterReaderPhaser shutdownPhaser = new WriterReaderPhaser();
5960

6061
public SpanProfilingSamplesCorrelator(
6162
int bufferCapacity,
6263
LongSupplier nanoClock,
63-
long spanDelayNanos,
64+
long initialSpanDelayNanos,
6465
Consumer<ReadableSpan> sendSpan) {
6566
this.nanoClock = nanoClock;
66-
this.spanDelayNanos = spanDelayNanos;
67+
this.spanBufferDurationNanos = initialSpanDelayNanos;
6768
this.sendSpan = sendSpan;
6869

6970
bufferCapacity = nextPowerOf2(bufferCapacity);
@@ -78,6 +79,13 @@ public SpanProfilingSamplesCorrelator(
7879
delayedSpansPoller = new PeekingPoller<>(nonPeekingPoller, DelayedSpan::new);
7980
}
8081

82+
public void setSpanBufferDurationNanos(long nanos) {
83+
if (nanos < 0) {
84+
throw new IllegalArgumentException("nanos must be positive but was " + nanos);
85+
}
86+
spanBufferDurationNanos = nanos;
87+
}
88+
8189
public void onSpanStart(ReadableSpan span, Context parentCtx) {
8290
boolean sampled = span.getSpanContext().getTraceFlags().isSampled();
8391
boolean isLocalRoot = LocalRootSpan.getFor(span) == span;
@@ -95,7 +103,7 @@ public void sendOrBufferSpan(ReadableSpan span) {
95103

96104
long criticalPhaseVal = shutdownPhaser.writerCriticalSectionEnter();
97105
try {
98-
if (spanDelayNanos == 0) {
106+
if (spanBufferDurationNanos == 0 || shuttingDown) {
99107
correlateAndSendSpan(span);
100108
return;
101109
}
@@ -133,14 +141,14 @@ public void correlate(
133141
}
134142
}
135143

136-
public synchronized void flushPendingDelayedSpans() {
144+
public synchronized void flushPendingBufferedSpans() {
137145
try {
138146
delayedSpansPoller.poll(
139-
delayedSpan -> {
140-
long elapsed = nanoClock.getAsLong() - delayedSpan.endNanoTimestamp;
141-
if (elapsed >= spanDelayNanos) {
142-
correlateAndSendSpan(delayedSpan.span);
143-
delayedSpan.clear();
147+
bufferedSpan -> {
148+
long elapsed = nanoClock.getAsLong() - bufferedSpan.endNanoTimestamp;
149+
if (elapsed >= spanBufferDurationNanos || shuttingDown) {
150+
correlateAndSendSpan(bufferedSpan.span);
151+
bufferedSpan.clear();
144152
return true;
145153
}
146154
return false; // span is not yet ready to be sent
@@ -154,16 +162,16 @@ public synchronized void flushPendingDelayedSpans() {
154162
public synchronized void shutdownAndFlushAll() {
155163
shutdownPhaser.readerLock();
156164
try {
157-
spanDelayNanos = 0L; // This will cause new ended spans to not be buffered anymore
165+
shuttingDown = true; // This will cause new ended spans to not be buffered anymore
158166

159167
// avoid race condition: we wait until we are
160168
// sure that no more spans will be added to the ringbuffer
161169
shutdownPhaser.flipPhase();
162170
} finally {
163171
shutdownPhaser.readerUnlock();
164172
}
165-
// every span is now pending because the desired delay is zero
166-
flushPendingDelayedSpans();
173+
// This will flush all pending spans because shuttingDown=true
174+
flushPendingBufferedSpans();
167175
}
168176

169177
private void correlateAndSendSpan(ReadableSpan span) {

universal-profiling-integration/src/main/java/co/elastic/otel/UniversalProfilingProcessor.java

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import co.elastic.otel.common.LocalRootSpan;
2323
import co.elastic.otel.common.util.ExecutorUtils;
2424
import co.elastic.otel.common.util.HexUtils;
25+
import co.elastic.otel.hostid.ProfilerProvidedHostId;
2526
import co.elastic.otel.profiler.DecodeException;
2627
import co.elastic.otel.profiler.ProfilerMessage;
28+
import co.elastic.otel.profiler.ProfilerRegistrationMessage;
2729
import co.elastic.otel.profiler.TraceCorrelationMessage;
2830
import io.opentelemetry.api.trace.Span;
2931
import io.opentelemetry.context.Context;
@@ -66,20 +68,24 @@ public class UniversalProfilingProcessor extends AbstractChainingSpanProcessor {
6668

6769
private static final Logger log = Logger.getLogger(UniversalProfilingProcessor.class.getName());
6870

71+
private static final long INITIAL_SPAN_DELAY_NANOS = Duration.ofSeconds(1).toNanos();
72+
6973
/**
7074
* The frequency at which the processor polls the unix domain socket for new messages from the
7175
* profiler.
7276
*/
73-
private static final long POLL_FREQUENCY_MS = 20;
77+
static final long POLL_FREQUENCY_MS = 20;
7478

7579
private static boolean anyInstanceActive = false;
7680

7781
private final SpanProfilingSamplesCorrelator correlator;
7882
private final ScheduledExecutorService messagePollAndSpanFlushExecutor;
7983

80-
// Visibile for testing
84+
// Visible for testing
8185
String socketPath;
8286

87+
private volatile boolean tlsPropagationActive = false;
88+
8389
public static UniversalProfilingProcessorBuilder builder(SpanProcessor next, Resource resource) {
8490
return new UniversalProfilingProcessorBuilder(next, resource);
8591
}
@@ -88,7 +94,7 @@ public static UniversalProfilingProcessorBuilder builder(SpanProcessor next, Res
8894
SpanProcessor next,
8995
Resource serviceResource,
9096
int bufferSize,
91-
Duration spanDelay,
97+
boolean activeOnlyAfterProfilerRegistration,
9298
String socketDir,
9399
LongSupplier nanoClock) {
94100
super(next);
@@ -99,9 +105,18 @@ public static UniversalProfilingProcessorBuilder builder(SpanProcessor next, Res
99105
+ " There must be at most one processor of this type active at a time!");
100106
}
101107

108+
long initialSpanDelay;
109+
if (activeOnlyAfterProfilerRegistration) {
110+
initialSpanDelay = 0; // do not buffer spans until we know that a profiler is running
111+
tlsPropagationActive = false;
112+
} else {
113+
initialSpanDelay = INITIAL_SPAN_DELAY_NANOS; // delay conservatively to not miss any data
114+
tlsPropagationActive = true;
115+
}
116+
102117
correlator =
103118
new SpanProfilingSamplesCorrelator(
104-
bufferSize, nanoClock, spanDelay.toNanos(), this.next::onEnd);
119+
bufferSize, nanoClock, initialSpanDelay, this.next::onEnd);
105120

106121
socketPath = openProfilerSocket(socketDir);
107122
try {
@@ -199,6 +214,9 @@ public boolean isEndRequired() {
199214

200215
@Nullable
201216
private void onContextChange(@Nullable Context previous, @Nullable Context next) {
217+
if (!tlsPropagationActive) {
218+
return;
219+
}
202220
try {
203221
Span oldSpan = safeSpanFromContext(previous);
204222
Span newSpan = safeSpanFromContext(next);
@@ -222,7 +240,7 @@ synchronized void pollMessagesAndFlushPendingSpans() {
222240
// Order is important: we only want to flush spans after we have consumed all pending messages
223241
// otherwise the data for the spans to be flushed might be incomplete
224242
consumeProfilerMessages();
225-
correlator.flushPendingDelayedSpans();
243+
correlator.flushPendingBufferedSpans();
226244
}
227245

228246
private void consumeProfilerMessages() {
@@ -236,6 +254,8 @@ private void consumeProfilerMessages() {
236254
break;
237255
} else if (message instanceof TraceCorrelationMessage) {
238256
handleMessage((TraceCorrelationMessage) message, tempBuffer);
257+
} else if (message instanceof ProfilerRegistrationMessage) {
258+
handleMessage((ProfilerRegistrationMessage) message);
239259
} else {
240260
log.log(Level.FINE, "Received unknown message type from profiler: {0}", message);
241261
}
@@ -249,6 +269,20 @@ private void consumeProfilerMessages() {
249269
}
250270
}
251271

272+
private void handleMessage(ProfilerRegistrationMessage message) {
273+
log.log(
274+
Level.FINE,
275+
"Received profiler registration message! host.id is {0} and the span delay is {1} ms",
276+
new Object[] {message.getHostId(), message.getSamplesDelayMillis()});
277+
278+
tlsPropagationActive = true;
279+
long spanDelayNanos =
280+
Duration.ofMillis(message.getSamplesDelayMillis() + POLL_FREQUENCY_MS).toNanos();
281+
correlator.setSpanBufferDurationNanos(spanDelayNanos);
282+
283+
ProfilerProvidedHostId.set(message.getHostId());
284+
}
285+
252286
private void handleMessage(TraceCorrelationMessage message, StringBuilder tempBuffer) {
253287
tempBuffer.setLength(0);
254288
HexUtils.appendAsHex(message.getTraceId(), tempBuffer);

universal-profiling-integration/src/main/java/co/elastic/otel/UniversalProfilingProcessorBuilder.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,13 @@
2020

2121
import io.opentelemetry.sdk.resources.Resource;
2222
import io.opentelemetry.sdk.trace.SpanProcessor;
23-
import java.time.Duration;
2423
import java.util.function.LongSupplier;
2524

2625
public class UniversalProfilingProcessorBuilder {
2726

2827
private final Resource resource;
2928
private final SpanProcessor nextProcessor;
30-
private Duration spanDelay = Duration.ofSeconds(10);
29+
private boolean delayActivationAfterProfilerRegistration = true;
3130

3231
private LongSupplier nanoClock = System::nanoTime;
3332

@@ -42,16 +41,32 @@ public class UniversalProfilingProcessorBuilder {
4241

4342
public UniversalProfilingProcessor build() {
4443
return new UniversalProfilingProcessor(
45-
nextProcessor, resource, bufferSize, spanDelay, socketDir, nanoClock);
44+
nextProcessor,
45+
resource,
46+
bufferSize,
47+
delayActivationAfterProfilerRegistration,
48+
socketDir,
49+
nanoClock);
4650
}
4751

4852
UniversalProfilingProcessorBuilder clock(LongSupplier nanoClock) {
4953
this.nanoClock = nanoClock;
5054
return this;
5155
}
5256

53-
public UniversalProfilingProcessorBuilder spanDelay(Duration delay) {
54-
this.spanDelay = delay;
57+
/**
58+
* If enabled, the profiling integration will remain inactive until the presence of a profiler is
59+
* actually detected. This safes a bit of overhead in the case no profiler is there.
60+
*
61+
* <p>The downside is if the application starts a span immediately after startup, the profiler
62+
* might not be detected in time and therefore this first span might not be correlated correctly.
63+
* This can be avoided by setting this option to {@code false}. In this case the {@link
64+
* UniversalProfilingProcessor} will assume a profiler will be eventually running and start the
65+
* correlation eagerly.
66+
*/
67+
public UniversalProfilingProcessorBuilder delayActivationAfterProfilerRegistration(
68+
boolean value) {
69+
this.delayActivationAfterProfilerRegistration = value;
5570
return this;
5671
}
5772

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package co.elastic.otel.hostid;
20+
21+
import io.opentelemetry.sdk.common.CompletableResultCode;
22+
import io.opentelemetry.sdk.resources.Resource;
23+
import io.opentelemetry.sdk.trace.data.DelegatingSpanData;
24+
import io.opentelemetry.sdk.trace.data.SpanData;
25+
import io.opentelemetry.sdk.trace.export.SpanExporter;
26+
import java.util.Collection;
27+
import java.util.stream.Collectors;
28+
29+
public class ProfilerHostIdApplyingSpanExporter implements SpanExporter {
30+
31+
private final SpanExporter delegate;
32+
33+
public ProfilerHostIdApplyingSpanExporter(SpanExporter delegate) {
34+
this.delegate = delegate;
35+
}
36+
37+
@Override
38+
public CompletableResultCode export(Collection<SpanData> collection) {
39+
// do a pre-check to avoid unnecessary allocations
40+
boolean anyUpdatesRequired =
41+
collection.stream()
42+
.anyMatch(
43+
data ->
44+
data.getResource()
45+
!= ProfilerHostIdResourceUpdater.applyHostId(data.getResource()));
46+
Collection<SpanData> updatedSpanData = collection;
47+
if (anyUpdatesRequired) {
48+
updatedSpanData =
49+
collection.stream()
50+
.map(
51+
data -> {
52+
Resource original = data.getResource();
53+
Resource updated = ProfilerHostIdResourceUpdater.applyHostId(original);
54+
if (original != updated) {
55+
return new DelegatingSpanData(data) {
56+
@Override
57+
public Resource getResource() {
58+
return updated;
59+
}
60+
};
61+
} else {
62+
return data;
63+
}
64+
})
65+
.collect(Collectors.toList());
66+
}
67+
return delegate.export(updatedSpanData);
68+
}
69+
70+
@Override
71+
public CompletableResultCode flush() {
72+
return delegate.flush();
73+
}
74+
75+
@Override
76+
public CompletableResultCode shutdown() {
77+
return delegate.flush();
78+
}
79+
}

0 commit comments

Comments
 (0)