|
2 | 2 | // SPDX-License-Identifier: Apache-2.0 |
3 | 3 | // Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License. |
4 | 4 |
|
5 | | -import { context, Context, diag, TraceFlags } from '@opentelemetry/api'; |
6 | | -import { |
7 | | - BindOnceFuture, |
8 | | - ExportResultCode, |
9 | | - getEnv, |
10 | | - globalErrorHandler, |
11 | | - suppressTracing, |
12 | | - unrefTimer, |
13 | | -} from '@opentelemetry/core'; |
14 | | -import { BufferConfig, ReadableSpan, Span, SpanExporter, SpanProcessor } from '@opentelemetry/sdk-trace-base'; |
| 5 | +import { Context, TraceFlags } from '@opentelemetry/api'; |
| 6 | +import { ReadableSpan, BufferConfig, Span } from '@opentelemetry/sdk-trace-base'; |
15 | 7 | import { AWS_ATTRIBUTE_KEYS } from './aws-attribute-keys'; |
| 8 | +import { BatchSpanProcessorBase } from '@opentelemetry/sdk-trace-base/build/src/export/BatchSpanProcessorBase'; |
16 | 9 |
|
17 | 10 | /** |
18 | 11 | * This class is a customized version of the `BatchSpanProcessorBase` from the |
@@ -41,211 +34,24 @@ import { AWS_ATTRIBUTE_KEYS } from './aws-attribute-keys'; |
41 | 34 | * The rest of the behavior—batch processing, queuing, and exporting spans in |
42 | 35 | * batches—is inherited from the base class and remains largely the same. |
43 | 36 | */ |
44 | | -export class AwsBatchUnsampledSpanProcessor implements SpanProcessor { |
45 | | - private readonly _maxExportBatchSize: number; |
46 | | - private readonly _maxQueueSize: number; |
47 | | - private readonly _scheduledDelayMillis: number; |
48 | | - private readonly _exportTimeoutMillis: number; |
49 | | - |
50 | | - private _isExporting: boolean = false; |
51 | | - private _finishedSpans: ReadableSpan[] = []; |
52 | | - private _timer: NodeJS.Timeout | undefined; |
53 | | - private _shutdownOnce: BindOnceFuture<void>; |
54 | | - private _droppedSpansCount: number = 0; |
55 | | - |
56 | | - constructor(private readonly _exporter: SpanExporter, config?: BufferConfig) { |
57 | | - const env = getEnv(); |
58 | | - this._maxExportBatchSize = |
59 | | - typeof config?.maxExportBatchSize === 'number' ? config.maxExportBatchSize : env.OTEL_BSP_MAX_EXPORT_BATCH_SIZE; |
60 | | - this._maxQueueSize = typeof config?.maxQueueSize === 'number' ? config.maxQueueSize : env.OTEL_BSP_MAX_QUEUE_SIZE; |
61 | | - this._scheduledDelayMillis = |
62 | | - typeof config?.scheduledDelayMillis === 'number' ? config.scheduledDelayMillis : env.OTEL_BSP_SCHEDULE_DELAY; |
63 | | - this._exportTimeoutMillis = |
64 | | - typeof config?.exportTimeoutMillis === 'number' ? config.exportTimeoutMillis : env.OTEL_BSP_EXPORT_TIMEOUT; |
65 | | - |
66 | | - this._shutdownOnce = new BindOnceFuture(this._shutdown, this); |
67 | | - |
68 | | - if (this._maxExportBatchSize > this._maxQueueSize) { |
69 | | - diag.warn( |
70 | | - 'BatchSpanProcessor: maxExportBatchSize must be smaller or equal to maxQueueSize, setting maxExportBatchSize to match maxQueueSize' |
71 | | - ); |
72 | | - this._maxExportBatchSize = this._maxQueueSize; |
73 | | - } |
74 | | - } |
75 | | - |
76 | | - forceFlush(): Promise<void> { |
77 | | - if (this._shutdownOnce.isCalled) { |
78 | | - return this._shutdownOnce.promise; |
79 | | - } |
80 | | - return this._flushAll(); |
81 | | - } |
82 | | - |
83 | | - onStart(span: Span, _parentContext: Context): void { |
| 37 | +export class AwsBatchUnsampledSpanProcessor extends BatchSpanProcessorBase<BufferConfig> { |
| 38 | + override onStart(span: Span, _parentContext: Context): void { |
84 | 39 | if ((span.spanContext().traceFlags & TraceFlags.SAMPLED) === 0) { |
85 | 40 | span.setAttribute(AWS_ATTRIBUTE_KEYS.AWS_TRACE_FLAG_UNSAMPLED, true); |
86 | 41 | return; |
87 | 42 | } |
88 | 43 | } |
89 | 44 |
|
90 | | - onEnd(span: ReadableSpan): void { |
91 | | - if (this._shutdownOnce.isCalled) { |
| 45 | + override onEnd(span: ReadableSpan): void { |
| 46 | + if ((this as any)._shutdownOnce.isCalled) { |
92 | 47 | return; |
93 | 48 | } |
94 | 49 |
|
95 | 50 | if ((span.spanContext().traceFlags & TraceFlags.SAMPLED) === 1) { |
96 | 51 | return; |
97 | 52 | } |
98 | 53 |
|
99 | | - this._addToBuffer(span); |
100 | | - } |
101 | | - |
102 | | - shutdown(): Promise<void> { |
103 | | - return this._shutdownOnce.call(); |
104 | | - } |
105 | | - |
106 | | - private _shutdown() { |
107 | | - return Promise.resolve() |
108 | | - .then(() => { |
109 | | - return this.onShutdown(); |
110 | | - }) |
111 | | - .then(() => { |
112 | | - return this._flushAll(); |
113 | | - }) |
114 | | - .then(() => { |
115 | | - return this._exporter.shutdown(); |
116 | | - }); |
117 | | - } |
118 | | - |
119 | | - /** Add a span in the buffer. */ |
120 | | - private _addToBuffer(span: ReadableSpan) { |
121 | | - if (this._finishedSpans.length >= this._maxQueueSize) { |
122 | | - // limit reached, drop span |
123 | | - |
124 | | - if (this._droppedSpansCount === 0) { |
125 | | - diag.debug('maxQueueSize reached, dropping spans'); |
126 | | - } |
127 | | - this._droppedSpansCount++; |
128 | | - |
129 | | - return; |
130 | | - } |
131 | | - |
132 | | - if (this._droppedSpansCount > 0) { |
133 | | - // some spans were dropped, log once with count of spans dropped |
134 | | - diag.warn(`Dropped ${this._droppedSpansCount} spans because maxQueueSize reached`); |
135 | | - this._droppedSpansCount = 0; |
136 | | - } |
137 | | - |
138 | | - this._finishedSpans.push(span); |
139 | | - this._maybeStartTimer(); |
140 | | - } |
141 | | - |
142 | | - /** |
143 | | - * Send all spans to the exporter respecting the batch size limit |
144 | | - * This function is used only on forceFlush or shutdown, |
145 | | - * for all other cases _flush should be used |
146 | | - * */ |
147 | | - private _flushAll(): Promise<void> { |
148 | | - return new Promise((resolve, reject) => { |
149 | | - const promises = []; |
150 | | - // calculate number of batches |
151 | | - const count = Math.ceil(this._finishedSpans.length / this._maxExportBatchSize); |
152 | | - for (let i = 0, j = count; i < j; i++) { |
153 | | - promises.push(this._flushOneBatch()); |
154 | | - } |
155 | | - Promise.all(promises) |
156 | | - .then(() => { |
157 | | - resolve(); |
158 | | - }) |
159 | | - .catch(reject); |
160 | | - }); |
161 | | - } |
162 | | - |
163 | | - private _flushOneBatch(): Promise<void> { |
164 | | - this._clearTimer(); |
165 | | - if (this._finishedSpans.length === 0) { |
166 | | - return Promise.resolve(); |
167 | | - } |
168 | | - return new Promise((resolve, reject) => { |
169 | | - const timer = setTimeout(() => { |
170 | | - // don't wait anymore for export, this way the next batch can start |
171 | | - reject(new Error('Timeout')); |
172 | | - }, this._exportTimeoutMillis); |
173 | | - // prevent downstream exporter calls from generating spans |
174 | | - context.with(suppressTracing(context.active()), () => { |
175 | | - // Reset the finished spans buffer here because the next invocations of the _flush method |
176 | | - // could pass the same finished spans to the exporter if the buffer is cleared |
177 | | - // outside the execution of this callback. |
178 | | - let spans: ReadableSpan[]; |
179 | | - if (this._finishedSpans.length <= this._maxExportBatchSize) { |
180 | | - spans = this._finishedSpans; |
181 | | - this._finishedSpans = []; |
182 | | - } else { |
183 | | - spans = this._finishedSpans.splice(0, this._maxExportBatchSize); |
184 | | - } |
185 | | - |
186 | | - const doExport = () => |
187 | | - this._exporter.export(spans, result => { |
188 | | - clearTimeout(timer); |
189 | | - if (result.code === ExportResultCode.SUCCESS) { |
190 | | - resolve(); |
191 | | - } else { |
192 | | - reject(result.error ?? new Error('BatchSpanProcessor: span export failed')); |
193 | | - } |
194 | | - }); |
195 | | - |
196 | | - let pendingResources: Array<Promise<void>> | null = null; |
197 | | - for (let i = 0, len = spans.length; i < len; i++) { |
198 | | - const span = spans[i]; |
199 | | - if (span.resource.asyncAttributesPending && span.resource.waitForAsyncAttributes) { |
200 | | - pendingResources ??= []; |
201 | | - pendingResources.push(span.resource.waitForAsyncAttributes()); |
202 | | - } |
203 | | - } |
204 | | - |
205 | | - // Avoid scheduling a promise to make the behavior more predictable and easier to test |
206 | | - if (pendingResources === null) { |
207 | | - doExport(); |
208 | | - } else { |
209 | | - Promise.all(pendingResources).then(doExport, err => { |
210 | | - globalErrorHandler(err); |
211 | | - reject(err); |
212 | | - }); |
213 | | - } |
214 | | - }); |
215 | | - }); |
216 | | - } |
217 | | - |
218 | | - private _maybeStartTimer() { |
219 | | - if (this._isExporting) return; |
220 | | - const flush = () => { |
221 | | - this._isExporting = true; |
222 | | - this._flushOneBatch() |
223 | | - .finally(() => { |
224 | | - this._isExporting = false; |
225 | | - if (this._finishedSpans.length > 0) { |
226 | | - this._clearTimer(); |
227 | | - this._maybeStartTimer(); |
228 | | - } |
229 | | - }) |
230 | | - .catch(e => { |
231 | | - this._isExporting = false; |
232 | | - globalErrorHandler(e); |
233 | | - }); |
234 | | - }; |
235 | | - // we only wait if the queue doesn't have enough elements yet |
236 | | - if (this._finishedSpans.length >= this._maxExportBatchSize) { |
237 | | - return flush(); |
238 | | - } |
239 | | - if (this._timer !== undefined) return; |
240 | | - this._timer = setTimeout(() => flush(), this._scheduledDelayMillis); |
241 | | - unrefTimer(this._timer); |
242 | | - } |
243 | | - |
244 | | - private _clearTimer() { |
245 | | - if (this._timer !== undefined) { |
246 | | - clearTimeout(this._timer); |
247 | | - this._timer = undefined; |
248 | | - } |
| 54 | + (this as any)._addToBuffer(span); |
249 | 55 | } |
250 | 56 |
|
251 | 57 | onShutdown(): void {} |
|
0 commit comments