Skip to content

Commit c7e88d4

Browse files
authored
fix(core): Add a PromiseBuffer for incoming events on the client (#18120)
## Problem Previously, the client would process all incoming events without any limit, which could lead to unbounded growth of pending events/promises in memory. This could cause performance issues and memory pressure in high-throughput scenarios. This occurs when two conditions are met: - when an integration with an async `processEvent` are added (e.g. `ContextLines`, which is a defaultIntegration) - events, e.g. `Sentry.captureException`, are called synchronously ```js Sentry.init({ ... }); // ... for (let i = 0; i < 5000; i++) { Sentry.captureException(new Error()); } ``` ## Solution This PR adds a `PromiseBuffer` to the `Client` class to limit the number of concurrent event processing operations. - Introduced a `_promiseBuffer` in the `Client` class that limits concurrent event processing - The buffer size defaults to `DEFAULT_TRANSPORT_BUFFER_SIZE` (64) but can be configured via `transportOptions.bufferSize` - When the buffer is full, events are rejected and properly tracked as dropped events with the `queue_overflow` reason - Please tak - Modified the `_process()` method to: - Accept a task producer function instead of a promise directly (lazy evaluation) - Use the promise buffer to manage concurrent operations - Track the data category for proper dropped event categorization ## Special 👀 on - About reusing `transportOptions.bufferSize`: Not sure if this is the best technique, but IMO both should have the same size - because if it wouldn't it would be capped at a later stage (asking myself if the transport still needs the promise buffer - as we have it now way earlier in place) - The `_process` takes now a `DataCategory`. At the time of the process the event type is almost unknown. Not sure if I assumed the categories correctly there, or if there is another technique of getting the type (**edit:** a [comment by Cursor](https://github.com/getsentry/sentry-javascript/pull/18120/files/2ee14b484d00432145d4f9a6773fbd31f92921d7#r2504259236) helped a little and I added [a helper function](7381a49)) - `recordDroppedEvent` is now printing it one after each other - theoretically we can count all occurences and print the count on it. I decided against this one, since it would delay the user feedback - this can be challenged though
1 parent 28e9cc6 commit c7e88d4

File tree

4 files changed

+112
-17
lines changed

4 files changed

+112
-17
lines changed

.size-limit.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ module.exports = [
3838
path: 'packages/browser/build/npm/esm/prod/index.js',
3939
import: createImport('init', 'browserTracingIntegration'),
4040
gzip: true,
41-
limit: '41.38 KB',
41+
limit: '41.5 KB',
4242
},
4343
{
4444
name: '@sentry/browser (incl. Tracing, Profiling)',
@@ -127,7 +127,7 @@ module.exports = [
127127
import: createImport('init', 'ErrorBoundary', 'reactRouterV6BrowserTracingIntegration'),
128128
ignore: ['react/jsx-runtime'],
129129
gzip: true,
130-
limit: '43.33 KB',
130+
limit: '43.5 KB',
131131
},
132132
// Vue SDK (ESM)
133133
{
@@ -142,7 +142,7 @@ module.exports = [
142142
path: 'packages/vue/build/esm/index.js',
143143
import: createImport('init', 'browserTracingIntegration'),
144144
gzip: true,
145-
limit: '43.2 KB',
145+
limit: '43.3 KB',
146146
},
147147
// Svelte SDK (ESM)
148148
{
@@ -163,7 +163,7 @@ module.exports = [
163163
name: 'CDN Bundle (incl. Tracing)',
164164
path: createCDNPath('bundle.tracing.min.js'),
165165
gzip: true,
166-
limit: '42 KB',
166+
limit: '42.1 KB',
167167
},
168168
{
169169
name: 'CDN Bundle (incl. Tracing, Replay)',
@@ -231,7 +231,7 @@ module.exports = [
231231
import: createImport('init'),
232232
ignore: [...builtinModules, ...nodePrefixedBuiltinModules],
233233
gzip: true,
234-
limit: '51 KB',
234+
limit: '51.1 KB',
235235
},
236236
// Node SDK (ESM)
237237
{

packages/core/src/client.ts

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ import { _INTERNAL_flushMetricsBuffer } from './metrics/internal';
1111
import type { Scope } from './scope';
1212
import { updateSession } from './session';
1313
import { getDynamicSamplingContextFromScope } from './tracing/dynamicSamplingContext';
14+
import { DEFAULT_TRANSPORT_BUFFER_SIZE } from './transports/base';
1415
import type { Breadcrumb, BreadcrumbHint, FetchBreadcrumbHint, XhrBreadcrumbHint } from './types-hoist/breadcrumb';
1516
import type { CheckIn, MonitorConfig } from './types-hoist/checkin';
1617
import type { EventDropReason, Outcome } from './types-hoist/clientreport';
1718
import type { DataCategory } from './types-hoist/datacategory';
1819
import type { DsnComponents } from './types-hoist/dsn';
1920
import type { DynamicSamplingContext, Envelope } from './types-hoist/envelope';
20-
import type { ErrorEvent, Event, EventHint, TransactionEvent } from './types-hoist/event';
21+
import type { ErrorEvent, Event, EventHint, EventType, TransactionEvent } from './types-hoist/event';
2122
import type { EventProcessor } from './types-hoist/eventprocessor';
2223
import type { FeedbackEvent } from './types-hoist/feedback';
2324
import type { Integration } from './types-hoist/integration';
@@ -43,6 +44,7 @@ import { merge } from './utils/merge';
4344
import { checkOrSetAlreadyCaught, uuid4 } from './utils/misc';
4445
import { parseSampleRate } from './utils/parseSampleRate';
4546
import { prepareEvent } from './utils/prepareEvent';
47+
import { type PromiseBuffer, makePromiseBuffer, SENTRY_BUFFER_FULL_ERROR } from './utils/promisebuffer';
4648
import { reparentChildSpans, shouldIgnoreSpan } from './utils/should-ignore-span';
4749
import { showSpanDropWarning } from './utils/spanUtils';
4850
import { rejectedSyncPromise } from './utils/syncpromise';
@@ -201,6 +203,8 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
201203
// eslint-disable-next-line @typescript-eslint/ban-types
202204
private _hooks: Record<string, Set<Function>>;
203205

206+
private _promiseBuffer: PromiseBuffer<unknown>;
207+
204208
/**
205209
* Initializes this client instance.
206210
*
@@ -213,6 +217,7 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
213217
this._outcomes = {};
214218
this._hooks = {};
215219
this._eventProcessors = [];
220+
this._promiseBuffer = makePromiseBuffer(options.transportOptions?.bufferSize ?? DEFAULT_TRANSPORT_BUFFER_SIZE);
216221

217222
if (options.dsn) {
218223
this._dsn = makeDsn(options.dsn);
@@ -275,9 +280,11 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
275280
};
276281

277282
this._process(
278-
this.eventFromException(exception, hintWithEventId).then(event =>
279-
this._captureEvent(event, hintWithEventId, scope),
280-
),
283+
() =>
284+
this.eventFromException(exception, hintWithEventId)
285+
.then(event => this._captureEvent(event, hintWithEventId, scope))
286+
.then(res => res),
287+
'error',
281288
);
282289

283290
return hintWithEventId.event_id;
@@ -300,12 +307,15 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
300307
};
301308

302309
const eventMessage = isParameterizedString(message) ? message : String(message);
303-
304-
const promisedEvent = isPrimitive(message)
310+
const isMessage = isPrimitive(message);
311+
const promisedEvent = isMessage
305312
? this.eventFromMessage(eventMessage, level, hintWithEventId)
306313
: this.eventFromException(message, hintWithEventId);
307314

308-
this._process(promisedEvent.then(event => this._captureEvent(event, hintWithEventId, currentScope)));
315+
this._process(
316+
() => promisedEvent.then(event => this._captureEvent(event, hintWithEventId, currentScope)),
317+
isMessage ? 'unknown' : 'error',
318+
);
309319

310320
return hintWithEventId.event_id;
311321
}
@@ -332,9 +342,11 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
332342
const sdkProcessingMetadata = event.sdkProcessingMetadata || {};
333343
const capturedSpanScope: Scope | undefined = sdkProcessingMetadata.capturedSpanScope;
334344
const capturedSpanIsolationScope: Scope | undefined = sdkProcessingMetadata.capturedSpanIsolationScope;
345+
const dataCategory = getDataCategoryByType(event.type);
335346

336347
this._process(
337-
this._captureEvent(event, hintWithEventId, capturedSpanScope || currentScope, capturedSpanIsolationScope),
348+
() => this._captureEvent(event, hintWithEventId, capturedSpanScope || currentScope, capturedSpanIsolationScope),
349+
dataCategory,
338350
);
339351

340352
return hintWithEventId.event_id;
@@ -1252,7 +1264,7 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
12521264
);
12531265
}
12541266

1255-
const dataCategory = (eventType === 'replay_event' ? 'replay' : eventType) satisfies DataCategory;
1267+
const dataCategory = getDataCategoryByType(event.type);
12561268

12571269
return this._prepareEvent(event, hint, currentScope, isolationScope)
12581270
.then(prepared => {
@@ -1335,15 +1347,21 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
13351347
/**
13361348
* Occupies the client with processing and event
13371349
*/
1338-
protected _process<T>(promise: PromiseLike<T>): void {
1350+
protected _process<T>(taskProducer: () => PromiseLike<T>, dataCategory: DataCategory): void {
13391351
this._numProcessing++;
1340-
void promise.then(
1352+
1353+
void this._promiseBuffer.add(taskProducer).then(
13411354
value => {
13421355
this._numProcessing--;
13431356
return value;
13441357
},
13451358
reason => {
13461359
this._numProcessing--;
1360+
1361+
if (reason === SENTRY_BUFFER_FULL_ERROR) {
1362+
this.recordDroppedEvent('queue_overflow', dataCategory);
1363+
}
1364+
13471365
return reason;
13481366
},
13491367
);
@@ -1408,6 +1426,10 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
14081426
): PromiseLike<Event>;
14091427
}
14101428

1429+
function getDataCategoryByType(type: EventType | 'replay_event' | undefined): DataCategory {
1430+
return type === 'replay_event' ? 'replay' : type || 'error';
1431+
}
1432+
14111433
/**
14121434
* Verifies that return value of configured `beforeSend` or `beforeSendTransaction` is of expected type, and returns the value if so.
14131435
*/

packages/core/test/lib/client.test.ts

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
import * as integrationModule from '../../src/integration';
1616
import { _INTERNAL_captureLog } from '../../src/logs/internal';
1717
import { _INTERNAL_captureMetric } from '../../src/metrics/internal';
18+
import { DEFAULT_TRANSPORT_BUFFER_SIZE } from '../../src/transports/base';
1819
import type { Envelope } from '../../src/types-hoist/envelope';
1920
import type { ErrorEvent, Event, TransactionEvent } from '../../src/types-hoist/event';
2021
import type { SpanJSON } from '../../src/types-hoist/span';
@@ -23,7 +24,7 @@ import * as miscModule from '../../src/utils/misc';
2324
import * as stringModule from '../../src/utils/string';
2425
import * as timeModule from '../../src/utils/time';
2526
import { getDefaultTestClientOptions, TestClient } from '../mocks/client';
26-
import { AdHocIntegration, TestIntegration } from '../mocks/integration';
27+
import { AdHocIntegration, AsyncTestIntegration, TestIntegration } from '../mocks/integration';
2728
import { makeFakeTransport } from '../mocks/transport';
2829
import { clearGlobalScope } from '../testutils';
2930

@@ -2935,4 +2936,66 @@ describe('Client', () => {
29352936
expect(sendEnvelopeSpy).toHaveBeenCalledTimes(1);
29362937
});
29372938
});
2939+
2940+
describe('promise buffer usage', () => {
2941+
it('respects the default value of the buffer size', async () => {
2942+
const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN });
2943+
const client = new TestClient(options);
2944+
2945+
client.addIntegration(new AsyncTestIntegration());
2946+
2947+
Array.from({ length: DEFAULT_TRANSPORT_BUFFER_SIZE + 1 }).forEach(() => {
2948+
client.captureException(new Error('ʕノ•ᴥ•ʔノ ︵ ┻━┻'));
2949+
});
2950+
2951+
expect(client._clearOutcomes()).toEqual([{ reason: 'queue_overflow', category: 'error', quantity: 1 }]);
2952+
});
2953+
2954+
it('records queue_overflow when promise buffer is full', async () => {
2955+
const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN, transportOptions: { bufferSize: 1 } });
2956+
const client = new TestClient(options);
2957+
2958+
client.addIntegration(new AsyncTestIntegration());
2959+
2960+
client.captureException(new Error('first'));
2961+
client.captureException(new Error('second'));
2962+
client.captureException(new Error('third'));
2963+
2964+
expect(client._clearOutcomes()).toEqual([{ reason: 'queue_overflow', category: 'error', quantity: 2 }]);
2965+
});
2966+
2967+
it('records different types of dropped events', async () => {
2968+
const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN, transportOptions: { bufferSize: 1 } });
2969+
const client = new TestClient(options);
2970+
2971+
client.addIntegration(new AsyncTestIntegration());
2972+
2973+
client.captureException(new Error('first')); // error
2974+
client.captureException(new Error('second')); // error
2975+
client.captureMessage('third'); // unknown
2976+
client.captureEvent({ message: 'fourth' }); // error
2977+
client.captureEvent({ message: 'fifth', type: 'replay_event' }); // replay
2978+
client.captureEvent({ message: 'sixth', type: 'transaction' }); // transaction
2979+
2980+
expect(client._clearOutcomes()).toEqual([
2981+
{ reason: 'queue_overflow', category: 'error', quantity: 2 },
2982+
{ reason: 'queue_overflow', category: 'unknown', quantity: 1 },
2983+
{ reason: 'queue_overflow', category: 'replay', quantity: 1 },
2984+
{ reason: 'queue_overflow', category: 'transaction', quantity: 1 },
2985+
]);
2986+
});
2987+
2988+
it('should skip the promise buffer with sync integrations', async () => {
2989+
const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN, transportOptions: { bufferSize: 1 } });
2990+
const client = new TestClient(options);
2991+
2992+
client.addIntegration(new TestIntegration());
2993+
2994+
client.captureException(new Error('first'));
2995+
client.captureException(new Error('second'));
2996+
client.captureException(new Error('third'));
2997+
2998+
expect(client._clearOutcomes()).toEqual([]);
2999+
});
3000+
});
29383001
});

packages/core/test/mocks/integration.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ export class TestIntegration implements Integration {
2424
}
2525
}
2626

27+
export class AsyncTestIntegration implements Integration {
28+
public static id: string = 'AsyncTestIntegration';
29+
30+
public name: string = 'AsyncTestIntegration';
31+
32+
processEvent(event: Event): Event | null | PromiseLike<Event | null> {
33+
return new Promise(resolve => setTimeout(() => resolve(event), 1));
34+
}
35+
}
36+
2737
export class AddAttachmentTestIntegration implements Integration {
2838
public static id: string = 'AddAttachmentTestIntegration';
2939

0 commit comments

Comments
 (0)