Skip to content

Commit 9a8c083

Browse files
committed
fix(core): add a PromiseBuffer for incoming events on the client
1 parent f0d5d76 commit 9a8c083

File tree

3 files changed

+101
-10
lines changed

3 files changed

+101
-10
lines changed

packages/core/src/client.ts

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { _INTERNAL_flushMetricsBuffer } from './metrics/internal';
1111
import type { Scope } from './scope';
1212
import { updateSession } from './session';
1313
import { getDynamicSamplingContextFromScope } from './tracing/dynamicSamplingContext';
14+
import { DEFAULT_TRANSPORT_BUFFER_SIZE } from './transports/base';
1415
import type { Breadcrumb, BreadcrumbHint, FetchBreadcrumbHint, XhrBreadcrumbHint } from './types-hoist/breadcrumb';
1516
import type { CheckIn, MonitorConfig } from './types-hoist/checkin';
1617
import type { EventDropReason, Outcome } from './types-hoist/clientreport';
@@ -43,6 +44,7 @@ import { merge } from './utils/merge';
4344
import { checkOrSetAlreadyCaught, uuid4 } from './utils/misc';
4445
import { parseSampleRate } from './utils/parseSampleRate';
4546
import { prepareEvent } from './utils/prepareEvent';
47+
import { type PromiseBuffer, makePromiseBuffer, SENTRY_BUFFER_FULL_ERROR } from './utils/promisebuffer';
4648
import { reparentChildSpans, shouldIgnoreSpan } from './utils/should-ignore-span';
4749
import { showSpanDropWarning } from './utils/spanUtils';
4850
import { rejectedSyncPromise } from './utils/syncpromise';
@@ -201,6 +203,8 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
201203
// eslint-disable-next-line @typescript-eslint/ban-types
202204
private _hooks: Record<string, Set<Function>>;
203205

206+
private _promiseBuffer: PromiseBuffer<unknown>;
207+
204208
/**
205209
* Initializes this client instance.
206210
*
@@ -213,6 +217,7 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
213217
this._outcomes = {};
214218
this._hooks = {};
215219
this._eventProcessors = [];
220+
this._promiseBuffer = makePromiseBuffer(options.transportOptions?.bufferSize ?? DEFAULT_TRANSPORT_BUFFER_SIZE);
216221

217222
if (options.dsn) {
218223
this._dsn = makeDsn(options.dsn);
@@ -275,9 +280,11 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
275280
};
276281

277282
this._process(
278-
this.eventFromException(exception, hintWithEventId).then(event =>
279-
this._captureEvent(event, hintWithEventId, scope),
280-
),
283+
() =>
284+
this.eventFromException(exception, hintWithEventId)
285+
.then(event => this._captureEvent(event, hintWithEventId, scope))
286+
.then(res => res),
287+
'error',
281288
);
282289

283290
return hintWithEventId.event_id;
@@ -300,12 +307,15 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
300307
};
301308

302309
const eventMessage = isParameterizedString(message) ? message : String(message);
303-
304-
const promisedEvent = isPrimitive(message)
310+
const isMessage = isPrimitive(message);
311+
const promisedEvent = isMessage
305312
? this.eventFromMessage(eventMessage, level, hintWithEventId)
306313
: this.eventFromException(message, hintWithEventId);
307314

308-
this._process(promisedEvent.then(event => this._captureEvent(event, hintWithEventId, currentScope)));
315+
this._process(
316+
() => promisedEvent.then(event => this._captureEvent(event, hintWithEventId, currentScope)),
317+
isMessage ? 'unknown' : 'error',
318+
);
309319

310320
return hintWithEventId.event_id;
311321
}
@@ -332,9 +342,11 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
332342
const sdkProcessingMetadata = event.sdkProcessingMetadata || {};
333343
const capturedSpanScope: Scope | undefined = sdkProcessingMetadata.capturedSpanScope;
334344
const capturedSpanIsolationScope: Scope | undefined = sdkProcessingMetadata.capturedSpanIsolationScope;
345+
const dataCategory = event.type === 'replay_event' ? 'replay' : (event.type ?? 'unknown');
335346

336347
this._process(
337-
this._captureEvent(event, hintWithEventId, capturedSpanScope || currentScope, capturedSpanIsolationScope),
348+
() => this._captureEvent(event, hintWithEventId, capturedSpanScope || currentScope, capturedSpanIsolationScope),
349+
dataCategory,
338350
);
339351

340352
return hintWithEventId.event_id;
@@ -1335,15 +1347,21 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
13351347
/**
13361348
* Occupies the client with processing and event
13371349
*/
1338-
protected _process<T>(promise: PromiseLike<T>): void {
1350+
protected _process<T>(taskProducer: () => PromiseLike<T>, dataCategory: DataCategory): void {
13391351
this._numProcessing++;
1340-
void promise.then(
1352+
1353+
void this._promiseBuffer.add(taskProducer).then(
13411354
value => {
13421355
this._numProcessing--;
13431356
return value;
13441357
},
13451358
reason => {
13461359
this._numProcessing--;
1360+
1361+
if (reason === SENTRY_BUFFER_FULL_ERROR) {
1362+
this.recordDroppedEvent('queue_overflow', dataCategory);
1363+
}
1364+
13471365
return reason;
13481366
},
13491367
);

packages/core/test/lib/client.test.ts

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
import * as integrationModule from '../../src/integration';
1616
import { _INTERNAL_captureLog } from '../../src/logs/internal';
1717
import { _INTERNAL_captureMetric } from '../../src/metrics/internal';
18+
import { DEFAULT_TRANSPORT_BUFFER_SIZE } from '../../src/transports/base';
1819
import type { Envelope } from '../../src/types-hoist/envelope';
1920
import type { ErrorEvent, Event, TransactionEvent } from '../../src/types-hoist/event';
2021
import type { SpanJSON } from '../../src/types-hoist/span';
@@ -23,7 +24,7 @@ import * as miscModule from '../../src/utils/misc';
2324
import * as stringModule from '../../src/utils/string';
2425
import * as timeModule from '../../src/utils/time';
2526
import { getDefaultTestClientOptions, TestClient } from '../mocks/client';
26-
import { AdHocIntegration, TestIntegration } from '../mocks/integration';
27+
import { AdHocIntegration, AsyncTestIntegration, TestIntegration } from '../mocks/integration';
2728
import { makeFakeTransport } from '../mocks/transport';
2829
import { clearGlobalScope } from '../testutils';
2930

@@ -2935,4 +2936,66 @@ describe('Client', () => {
29352936
expect(sendEnvelopeSpy).toHaveBeenCalledTimes(1);
29362937
});
29372938
});
2939+
2940+
describe('promise buffer usage', () => {
2941+
it('respects the default value of the buffer size', async () => {
2942+
const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN });
2943+
const client = new TestClient(options);
2944+
2945+
client.addIntegration(new AsyncTestIntegration());
2946+
2947+
Array.from({ length: DEFAULT_TRANSPORT_BUFFER_SIZE + 1 }).forEach(() => {
2948+
client.captureException(new Error('ʕノ•ᴥ•ʔノ ︵ ┻━┻'));
2949+
});
2950+
2951+
expect(client._clearOutcomes()).toEqual([{ reason: 'queue_overflow', category: 'error', quantity: 1 }]);
2952+
});
2953+
2954+
it('records queue_overflow when promise buffer is full', async () => {
2955+
const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN, transportOptions: { bufferSize: 1 } });
2956+
const client = new TestClient(options);
2957+
2958+
client.addIntegration(new AsyncTestIntegration());
2959+
2960+
client.captureException(new Error('first'));
2961+
client.captureException(new Error('second'));
2962+
client.captureException(new Error('third'));
2963+
2964+
expect(client._clearOutcomes()).toEqual([{ reason: 'queue_overflow', category: 'error', quantity: 2 }]);
2965+
});
2966+
2967+
it('records different types of dropped events', async () => {
2968+
const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN, transportOptions: { bufferSize: 1 } });
2969+
const client = new TestClient(options);
2970+
2971+
client.addIntegration(new AsyncTestIntegration());
2972+
2973+
client.captureException(new Error('first')); // error
2974+
client.captureException(new Error('second')); // error
2975+
client.captureMessage('third'); // unknown
2976+
client.captureEvent({ message: 'fourth' }); // unknown
2977+
client.captureEvent({ message: 'fifth', type: 'replay_event' }); // replay
2978+
client.captureEvent({ message: 'sixth', type: 'transaction' }); // transaction
2979+
2980+
expect(client._clearOutcomes()).toEqual([
2981+
{ reason: 'queue_overflow', category: 'error', quantity: 1 },
2982+
{ reason: 'queue_overflow', category: 'unknown', quantity: 2 },
2983+
{ reason: 'queue_overflow', category: 'replay', quantity: 1 },
2984+
{ reason: 'queue_overflow', category: 'transaction', quantity: 1 },
2985+
]);
2986+
});
2987+
2988+
it('should skip the promise buffer with sync integrations', async () => {
2989+
const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN, transportOptions: { bufferSize: 1 } });
2990+
const client = new TestClient(options);
2991+
2992+
client.addIntegration(new TestIntegration());
2993+
2994+
client.captureException(new Error('first'));
2995+
client.captureException(new Error('second'));
2996+
client.captureException(new Error('third'));
2997+
2998+
expect(client._clearOutcomes()).toEqual([]);
2999+
});
3000+
});
29383001
});

packages/core/test/mocks/integration.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ export class TestIntegration implements Integration {
2424
}
2525
}
2626

27+
export class AsyncTestIntegration implements Integration {
28+
public static id: string = 'AsyncTestIntegration';
29+
30+
public name: string = 'AsyncTestIntegration';
31+
32+
processEvent(event: Event): Event | null | PromiseLike<Event | null> {
33+
return new Promise(resolve => setTimeout(() => resolve(event), 1));
34+
}
35+
}
36+
2737
export class AddAttachmentTestIntegration implements Integration {
2838
public static id: string = 'AddAttachmentTestIntegration';
2939

0 commit comments

Comments
 (0)