|
6 | 6 | import io.opentelemetry.context.Scope;
|
7 | 7 | import io.opentelemetry.instrumentation.reactor.v3_1.ContextPropagationOperator;
|
8 | 8 | import java.io.Closeable;
|
| 9 | +import java.time.Duration; |
9 | 10 | import java.util.concurrent.atomic.AtomicBoolean;
|
10 | 11 | import java.util.function.Function;
|
11 | 12 | import org.slf4j.Logger;
|
| 13 | +import reactor.core.Disposable; |
| 14 | +import reactor.core.publisher.Mono; |
12 | 15 | import reactor.util.context.ContextView;
|
13 | 16 |
|
14 | 17 | public abstract class SemanticKernelTelemetrySpan implements Closeable {
|
15 | 18 |
|
16 | 19 | private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(
|
17 | 20 | SemanticKernelTelemetrySpan.class);
|
18 | 21 |
|
| 22 | + private static final long SPAN_TIMEOUT_MS = Long.parseLong((String) System.getProperties() |
| 23 | + .getOrDefault("semantickernel.telemetry.span_timeout", "120000")); |
| 24 | + |
19 | 25 | private final Span span;
|
20 | 26 | private final Function<reactor.util.context.Context, reactor.util.context.Context> reactorContextModifier;
|
21 | 27 | private final Scope spanScope;
|
22 | 28 | private final Scope contextScope;
|
23 | 29 | private final AtomicBoolean closed = new AtomicBoolean(false);
|
24 | 30 |
|
| 31 | + // Timeout to close the span if it was not closed within the specified time to avoid memory leaks |
| 32 | + private final Disposable watchdog; |
| 33 | + |
| 34 | + // This is a finalizer guardian to ensure that the span is closed if it was not closed explicitly |
| 35 | + @SuppressWarnings("unused") |
| 36 | + private final Object finalizerGuardian = new Object() { |
| 37 | + @Override |
| 38 | + protected void finalize() { |
| 39 | + if (closed.get() == false) { |
| 40 | + LOGGER.warn("Span was not closed"); |
| 41 | + close(); |
| 42 | + } |
| 43 | + } |
| 44 | + }; |
| 45 | + |
25 | 46 | public SemanticKernelTelemetrySpan(Span span,
|
26 | 47 | Function<reactor.util.context.Context, reactor.util.context.Context> reactorContextModifier,
|
27 | 48 | Scope spanScope, Scope contextScope) {
|
28 | 49 | this.span = span;
|
29 | 50 | this.reactorContextModifier = reactorContextModifier;
|
30 | 51 | this.spanScope = spanScope;
|
31 | 52 | this.contextScope = contextScope;
|
| 53 | + |
| 54 | + watchdog = Mono.just(1) |
| 55 | + .delay(Duration.ofMillis(SPAN_TIMEOUT_MS)) |
| 56 | + .subscribe(i -> { |
| 57 | + if (closed.get() == false) { |
| 58 | + LOGGER.warn("Span was not closed, timing out"); |
| 59 | + close(); |
| 60 | + } |
| 61 | + }); |
32 | 62 | }
|
33 | 63 |
|
34 | 64 | public interface SpanConstructor<T extends SemanticKernelTelemetrySpan> {
|
@@ -71,14 +101,27 @@ public void close() {
|
71 | 101 | if (closed.compareAndSet(false, true)) {
|
72 | 102 | LOGGER.trace("Closing span: {}", span);
|
73 | 103 | if (span.isRecording()) {
|
74 |
| - span.end(); |
| 104 | + try { |
| 105 | + span.end(); |
| 106 | + } catch (Exception e) { |
| 107 | + LOGGER.error("Error closing span", e); |
| 108 | + } |
75 | 109 | }
|
76 | 110 | if (contextScope != null) {
|
77 |
| - contextScope.close(); |
| 111 | + try { |
| 112 | + contextScope.close(); |
| 113 | + } catch (Exception e) { |
| 114 | + LOGGER.error("Error closing context scope", e); |
| 115 | + } |
78 | 116 | }
|
79 | 117 | if (spanScope != null) {
|
80 |
| - spanScope.close(); |
| 118 | + try { |
| 119 | + spanScope.close(); |
| 120 | + } catch (Exception e) { |
| 121 | + LOGGER.error("Error closing span scope", e); |
| 122 | + } |
81 | 123 | }
|
| 124 | + watchdog.dispose(); |
82 | 125 | }
|
83 | 126 | }
|
84 | 127 |
|
|
0 commit comments