Skip to content

Commit afc8025

Browse files
committed
fix(core): add a PromiseBuffer for incoming events on the client
1 parent 1a7189d commit afc8025

File tree

3 files changed

+101
-10
lines changed

3 files changed

+101
-10
lines changed

packages/core/src/client.ts

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { _INTERNAL_flushMetricsBuffer } from './metrics/internal';
1111
import type { Scope } from './scope';
1212
import { updateSession } from './session';
1313
import { getDynamicSamplingContextFromScope } from './tracing/dynamicSamplingContext';
14+
import { DEFAULT_TRANSPORT_BUFFER_SIZE } from './transports/base';
1415
import type { Breadcrumb, BreadcrumbHint, FetchBreadcrumbHint, XhrBreadcrumbHint } from './types-hoist/breadcrumb';
1516
import type { CheckIn, MonitorConfig } from './types-hoist/checkin';
1617
import type { EventDropReason, Outcome } from './types-hoist/clientreport';
@@ -43,6 +44,7 @@ import { merge } from './utils/merge';
4344
import { checkOrSetAlreadyCaught, uuid4 } from './utils/misc';
4445
import { parseSampleRate } from './utils/parseSampleRate';
4546
import { prepareEvent } from './utils/prepareEvent';
47+
import { type PromiseBuffer, makePromiseBuffer, SENTRY_BUFFER_FULL_ERROR } from './utils/promisebuffer';
4648
import { reparentChildSpans, shouldIgnoreSpan } from './utils/should-ignore-span';
4749
import { showSpanDropWarning } from './utils/spanUtils';
4850
import { rejectedSyncPromise } from './utils/syncpromise';
@@ -194,6 +196,8 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
194196
// eslint-disable-next-line @typescript-eslint/ban-types
195197
private _hooks: Record<string, Set<Function>>;
196198

199+
private _promiseBuffer: PromiseBuffer<unknown>;
200+
197201
/**
198202
* Initializes this client instance.
199203
*
@@ -206,6 +210,7 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
206210
this._outcomes = {};
207211
this._hooks = {};
208212
this._eventProcessors = [];
213+
this._promiseBuffer = makePromiseBuffer(options.transportOptions?.bufferSize ?? DEFAULT_TRANSPORT_BUFFER_SIZE);
209214

210215
if (options.dsn) {
211216
this._dsn = makeDsn(options.dsn);
@@ -264,9 +269,11 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
264269
};
265270

266271
this._process(
267-
this.eventFromException(exception, hintWithEventId).then(event =>
268-
this._captureEvent(event, hintWithEventId, scope),
269-
),
272+
() =>
273+
this.eventFromException(exception, hintWithEventId)
274+
.then(event => this._captureEvent(event, hintWithEventId, scope))
275+
.then(res => res),
276+
'error',
270277
);
271278

272279
return hintWithEventId.event_id;
@@ -289,12 +296,15 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
289296
};
290297

291298
const eventMessage = isParameterizedString(message) ? message : String(message);
292-
293-
const promisedEvent = isPrimitive(message)
299+
const isMessage = isPrimitive(message);
300+
const promisedEvent = isMessage
294301
? this.eventFromMessage(eventMessage, level, hintWithEventId)
295302
: this.eventFromException(message, hintWithEventId);
296303

297-
this._process(promisedEvent.then(event => this._captureEvent(event, hintWithEventId, currentScope)));
304+
this._process(
305+
() => promisedEvent.then(event => this._captureEvent(event, hintWithEventId, currentScope)),
306+
isMessage ? 'unknown' : 'error',
307+
);
298308

299309
return hintWithEventId.event_id;
300310
}
@@ -321,9 +331,11 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
321331
const sdkProcessingMetadata = event.sdkProcessingMetadata || {};
322332
const capturedSpanScope: Scope | undefined = sdkProcessingMetadata.capturedSpanScope;
323333
const capturedSpanIsolationScope: Scope | undefined = sdkProcessingMetadata.capturedSpanIsolationScope;
334+
const dataCategory = event.type === 'replay_event' ? 'replay' : (event.type ?? 'unknown');
324335

325336
this._process(
326-
this._captureEvent(event, hintWithEventId, capturedSpanScope || currentScope, capturedSpanIsolationScope),
337+
() => this._captureEvent(event, hintWithEventId, capturedSpanScope || currentScope, capturedSpanIsolationScope),
338+
dataCategory,
327339
);
328340

329341
return hintWithEventId.event_id;
@@ -1308,15 +1320,21 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
13081320
/**
13091321
* Occupies the client with processing and event
13101322
*/
1311-
protected _process<T>(promise: PromiseLike<T>): void {
1323+
protected _process<T>(taskProducer: () => PromiseLike<T>, dataCategory: DataCategory): void {
13121324
this._numProcessing++;
1313-
void promise.then(
1325+
1326+
void this._promiseBuffer.add(taskProducer).then(
13141327
value => {
13151328
this._numProcessing--;
13161329
return value;
13171330
},
13181331
reason => {
13191332
this._numProcessing--;
1333+
1334+
if (reason === SENTRY_BUFFER_FULL_ERROR) {
1335+
this.recordDroppedEvent('queue_overflow', dataCategory);
1336+
}
1337+
13201338
return reason;
13211339
},
13221340
);

packages/core/test/lib/client.test.ts

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ import * as miscModule from '../../src/utils/misc';
2222
import * as stringModule from '../../src/utils/string';
2323
import * as timeModule from '../../src/utils/time';
2424
import { getDefaultTestClientOptions, TestClient } from '../mocks/client';
25-
import { AdHocIntegration, TestIntegration } from '../mocks/integration';
25+
import { AdHocIntegration, AsyncTestIntegration, TestIntegration } from '../mocks/integration';
2626
import { makeFakeTransport } from '../mocks/transport';
2727
import { clearGlobalScope } from '../testutils';
28+
import { DEFAULT_TRANSPORT_BUFFER_SIZE } from '../../src/transports/base';
2829

2930
const PUBLIC_DSN = 'https://username@domain/123';
3031
// eslint-disable-next-line no-var
@@ -2806,4 +2807,66 @@ describe('Client', () => {
28062807
expect(sendEnvelopeSpy).toHaveBeenCalledTimes(1);
28072808
});
28082809
});
2810+
2811+
describe('promise buffer usage', () => {
2812+
it('respects the default value of the buffer size', async () => {
2813+
const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN });
2814+
const client = new TestClient(options);
2815+
2816+
client.addIntegration(new AsyncTestIntegration());
2817+
2818+
Array.from({ length: DEFAULT_TRANSPORT_BUFFER_SIZE + 1 }).forEach(() => {
2819+
client.captureException(new Error('ʕノ•ᴥ•ʔノ ︵ ┻━┻'));
2820+
});
2821+
2822+
expect(client._clearOutcomes()).toEqual([{ reason: 'queue_overflow', category: 'error', quantity: 1 }]);
2823+
});
2824+
2825+
it('records queue_overflow when promise buffer is full', async () => {
2826+
const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN, transportOptions: { bufferSize: 1 } });
2827+
const client = new TestClient(options);
2828+
2829+
client.addIntegration(new AsyncTestIntegration());
2830+
2831+
client.captureException(new Error('first'));
2832+
client.captureException(new Error('second'));
2833+
client.captureException(new Error('third'));
2834+
2835+
expect(client._clearOutcomes()).toEqual([{ reason: 'queue_overflow', category: 'error', quantity: 2 }]);
2836+
});
2837+
2838+
it('records different types of dropped events', async () => {
2839+
const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN, transportOptions: { bufferSize: 1 } });
2840+
const client = new TestClient(options);
2841+
2842+
client.addIntegration(new AsyncTestIntegration());
2843+
2844+
client.captureException(new Error('first')); // error
2845+
client.captureException(new Error('second')); // error
2846+
client.captureMessage('third'); // unknown
2847+
client.captureEvent({ message: 'fourth' }); // unknown
2848+
client.captureEvent({ message: 'fifth', type: 'replay_event' }); // replay
2849+
client.captureEvent({ message: 'sixth', type: 'transaction' }); // transaction
2850+
2851+
expect(client._clearOutcomes()).toEqual([
2852+
{ reason: 'queue_overflow', category: 'error', quantity: 1 },
2853+
{ reason: 'queue_overflow', category: 'unknown', quantity: 2 },
2854+
{ reason: 'queue_overflow', category: 'replay', quantity: 1 },
2855+
{ reason: 'queue_overflow', category: 'transaction', quantity: 1 },
2856+
]);
2857+
});
2858+
2859+
it('should skip the promise buffer with sync integrations', async () => {
2860+
const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN, transportOptions: { bufferSize: 1 } });
2861+
const client = new TestClient(options);
2862+
2863+
client.addIntegration(new TestIntegration());
2864+
2865+
client.captureException(new Error('first'));
2866+
client.captureException(new Error('second'));
2867+
client.captureException(new Error('third'));
2868+
2869+
expect(client._clearOutcomes()).toEqual([]);
2870+
});
2871+
});
28092872
});

packages/core/test/mocks/integration.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ export class TestIntegration implements Integration {
2424
}
2525
}
2626

27+
export class AsyncTestIntegration implements Integration {
28+
public static id: string = 'AsyncTestIntegration';
29+
30+
public name: string = 'AsyncTestIntegration';
31+
32+
processEvent(event: Event): Event | null | PromiseLike<Event | null> {
33+
return new Promise((resolve) => setTimeout(() => resolve(event), 1));
34+
}
35+
}
36+
2737
export class AddAttachmentTestIntegration implements Integration {
2838
public static id: string = 'AddAttachmentTestIntegration';
2939

0 commit comments

Comments
 (0)