Skip to content

Commit e2bfa09

Browse files
Merge pull request #576 from elbwalker/560-duplicate-events-in-api-destination
batch with wildcard mapping
2 parents a28cb74 + d96f6e9 commit e2bfa09

File tree

3 files changed

+252
-45
lines changed

3 files changed

+252
-45
lines changed

packages/collector/src/__tests__/destination.test.ts

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,4 +422,187 @@ describe('Destination', () => {
422422
);
423423
});
424424
});
425+
426+
describe('batch with wildcard mapping', () => {
427+
beforeEach(() => {
428+
jest.useFakeTimers();
429+
});
430+
431+
afterEach(() => {
432+
jest.useRealTimers();
433+
});
434+
435+
it('should not duplicate events when using wildcard batch config', async () => {
436+
// Capture events at call time (since batched.events gets cleared after pushBatch)
437+
const capturedBatches: { events: WalkerOS.Events; key: string }[] = [];
438+
const mockPushBatch = jest.fn((batch) => {
439+
// Clone the events array to capture at call time
440+
capturedBatches.push({
441+
events: [...batch.events],
442+
key: batch.key,
443+
});
444+
});
445+
const mockPush = jest.fn();
446+
447+
const destinationWithBatch: Destination.Instance = {
448+
push: mockPush,
449+
pushBatch: mockPushBatch,
450+
config: {
451+
init: true,
452+
mapping: {
453+
'*': {
454+
'*': { batch: 50 }, // 50ms debounce
455+
},
456+
},
457+
},
458+
};
459+
460+
const { elb } = await startFlow({
461+
destinations: { batchDest: { code: destinationWithBatch } },
462+
});
463+
464+
// Send different event types (all should match wildcard)
465+
await elb('page view');
466+
await elb('product click');
467+
await elb('button press');
468+
469+
// Advance timers to trigger debounce
470+
jest.advanceTimersByTime(100);
471+
472+
// pushBatch should be called exactly once with all 3 events
473+
expect(mockPushBatch).toHaveBeenCalledTimes(1);
474+
expect(capturedBatches[0].events).toHaveLength(3);
475+
476+
// Verify all events are present (not duplicated)
477+
const eventNames = capturedBatches[0].events.map((e) => e.name);
478+
expect(eventNames).toContain('page view');
479+
expect(eventNames).toContain('product click');
480+
expect(eventNames).toContain('button press');
481+
482+
// Individual push should NOT be called (batch handles it)
483+
expect(mockPush).not.toHaveBeenCalled();
484+
});
485+
486+
it('should batch events separately per mapping key', async () => {
487+
// Capture events at call time
488+
const capturedBatches: { events: WalkerOS.Events; key: string }[] = [];
489+
const mockPushBatch = jest.fn((batch) => {
490+
capturedBatches.push({
491+
events: [...batch.events],
492+
key: batch.key,
493+
});
494+
});
495+
const mockPush = jest.fn();
496+
497+
const destinationWithBatch: Destination.Instance = {
498+
push: mockPush,
499+
pushBatch: mockPushBatch,
500+
config: {
501+
init: true,
502+
mapping: {
503+
page: {
504+
'*': { batch: 50 }, // page events batch together
505+
},
506+
product: {
507+
'*': { batch: 50 }, // product events batch together
508+
},
509+
},
510+
},
511+
};
512+
513+
const { elb } = await startFlow({
514+
destinations: { batchDest: { code: destinationWithBatch } },
515+
});
516+
517+
// Send events that match different mapping keys
518+
await elb('page view');
519+
await elb('page scroll');
520+
await elb('product click');
521+
await elb('product view');
522+
523+
// Advance timers to trigger debounce
524+
jest.advanceTimersByTime(100);
525+
526+
// pushBatch should be called twice (once per mapping key)
527+
expect(mockPushBatch).toHaveBeenCalledTimes(2);
528+
529+
// Each batch should have 2 events
530+
expect(capturedBatches[0].events).toHaveLength(2);
531+
expect(capturedBatches[1].events).toHaveLength(2);
532+
});
533+
534+
it('should isolate batch state when multiple destinations share same mapping config', async () => {
535+
// This test reproduces the bug where shared mapping config causes duplicate events
536+
// Two destinations with the SAME mapping config object (shared reference)
537+
const sharedMapping = {
538+
'*': {
539+
'*': { batch: 50 },
540+
},
541+
};
542+
543+
const capturedBatches: {
544+
destination: string;
545+
events: WalkerOS.Events;
546+
key: string;
547+
}[] = [];
548+
549+
const mockPushBatch1 = jest.fn((batch) => {
550+
capturedBatches.push({
551+
destination: 'dest1',
552+
events: [...batch.events],
553+
key: batch.key,
554+
});
555+
});
556+
const mockPushBatch2 = jest.fn((batch) => {
557+
capturedBatches.push({
558+
destination: 'dest2',
559+
events: [...batch.events],
560+
key: batch.key,
561+
});
562+
});
563+
564+
const destination1: Destination.Instance = {
565+
push: jest.fn(),
566+
pushBatch: mockPushBatch1,
567+
config: {
568+
init: true,
569+
mapping: sharedMapping, // Shared reference!
570+
},
571+
};
572+
573+
const destination2: Destination.Instance = {
574+
push: jest.fn(),
575+
pushBatch: mockPushBatch2,
576+
config: {
577+
init: true,
578+
mapping: sharedMapping, // Same shared reference!
579+
},
580+
};
581+
582+
const { elb } = await startFlow({
583+
destinations: {
584+
dest1: { code: destination1 },
585+
dest2: { code: destination2 },
586+
},
587+
});
588+
589+
// Send events
590+
await elb('page view');
591+
await elb('product click');
592+
593+
// Advance timers to trigger debounce
594+
jest.advanceTimersByTime(100);
595+
596+
// Each destination should receive its own batch
597+
// BUG: Currently shared mapping causes only last destination to receive events
598+
// or events get duplicated across destinations
599+
const totalPushBatchCalls =
600+
mockPushBatch1.mock.calls.length + mockPushBatch2.mock.calls.length;
601+
expect(totalPushBatchCalls).toBe(2); // Should be 2 (one per destination)
602+
603+
// Each destination should receive exactly 2 events
604+
expect(mockPushBatch1).toHaveBeenCalledTimes(1);
605+
expect(mockPushBatch2).toHaveBeenCalledTimes(1);
606+
});
607+
});
425608
});

packages/collector/src/destination.ts

Lines changed: 55 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -290,48 +290,62 @@ export async function destinationPush<Destination extends Destination.Instance>(
290290
};
291291

292292
const eventMapping = processed.mapping;
293+
const mappingKey = processed.mappingKey || '* *';
294+
293295
if (eventMapping?.batch && destination.pushBatch) {
294-
const batched = eventMapping.batched || {
295-
key: processed.mappingKey || '',
296-
events: [],
297-
data: [],
298-
};
299-
batched.events.push(processed.event);
300-
if (isDefined(processed.data)) batched.data.push(processed.data);
301-
302-
eventMapping.batchFn =
303-
eventMapping.batchFn ||
304-
debounce((destination, collector) => {
305-
const batchDestType = destination.type || 'unknown';
306-
const batchLogger = collector.logger.scope(batchDestType);
307-
308-
const batchContext: Destination.PushBatchContext = {
309-
collector,
310-
config,
311-
data: processed.data,
312-
mapping: eventMapping,
313-
env: mergeEnvironments(destination.env, config.env),
314-
logger: batchLogger,
315-
};
316-
317-
batchLogger.debug('push batch', {
318-
events: batched.events.length,
319-
});
320-
321-
useHooks(
322-
destination.pushBatch!,
323-
'DestinationPushBatch',
324-
(collector as Collector.Instance).hooks,
325-
)(batched, batchContext);
326-
327-
batchLogger.debug('push batch done');
328-
329-
batched.events = [];
330-
batched.data = [];
331-
}, eventMapping.batch);
332-
333-
eventMapping.batched = batched;
334-
eventMapping.batchFn?.(destination, collector);
296+
// Initialize batch registry on destination (not on shared mapping config)
297+
destination.batches = destination.batches || {};
298+
299+
// Get or create batch state for this mapping key
300+
if (!destination.batches[mappingKey]) {
301+
const batched: Destination.Batch<unknown> = {
302+
key: mappingKey,
303+
events: [],
304+
data: [],
305+
};
306+
307+
destination.batches[mappingKey] = {
308+
batched,
309+
batchFn: debounce(() => {
310+
const batchState = destination.batches![mappingKey];
311+
const currentBatched = batchState.batched;
312+
313+
const batchContext: Destination.PushBatchContext = {
314+
collector,
315+
config,
316+
// Note: batch.data contains all transformed data; context.data is for single events
317+
data: undefined,
318+
mapping: eventMapping,
319+
env: mergeEnvironments(destination.env, config.env),
320+
logger: destLogger,
321+
};
322+
323+
destLogger.debug('push batch', {
324+
events: currentBatched.events.length,
325+
});
326+
327+
useHooks(
328+
destination.pushBatch!,
329+
'DestinationPushBatch',
330+
collector.hooks,
331+
)(currentBatched, batchContext);
332+
333+
destLogger.debug('push batch done');
334+
335+
// Reset batch
336+
currentBatched.events = [];
337+
currentBatched.data = [];
338+
}, eventMapping.batch),
339+
};
340+
}
341+
342+
// Add event to batch
343+
const batchState = destination.batches[mappingKey];
344+
batchState.batched.events.push(processed.event);
345+
if (isDefined(processed.data)) batchState.batched.data.push(processed.data);
346+
347+
// Trigger debounced batch
348+
batchState.batchFn();
335349
} else {
336350
destLogger.debug('push', { event: processed.event.name });
337351

packages/core/src/types/destination.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ export interface Instance<T extends TypesGeneric = Types> {
6060
config: Config<T>;
6161
queue?: WalkerOS.Events;
6262
dlq?: DLQ;
63+
batches?: BatchRegistry<Mapping<T>>;
6364
type?: string;
6465
env?: Env<T>;
6566
init?: InitFn<T>;
@@ -126,13 +127,15 @@ export interface InitContext<T extends TypesGeneric = Types> {
126127
logger: Logger.Instance;
127128
}
128129

129-
export interface PushContext<T extends TypesGeneric = Types>
130-
extends Context<T> {
130+
export interface PushContext<
131+
T extends TypesGeneric = Types,
132+
> extends Context<T> {
131133
mapping?: WalkerOSMapping.Rule<Mapping<T>>;
132134
}
133135

134-
export interface PushBatchContext<T extends TypesGeneric = Types>
135-
extends Context<T> {
136+
export interface PushBatchContext<
137+
T extends TypesGeneric = Types,
138+
> extends Context<T> {
136139
mapping?: WalkerOSMapping.Rule<Mapping<T>>;
137140
}
138141

@@ -163,6 +166,13 @@ export interface Batch<Mapping> {
163166
mapping?: WalkerOSMapping.Rule<Mapping>;
164167
}
165168

169+
export interface BatchRegistry<Mapping> {
170+
[mappingKey: string]: {
171+
batched: Batch<Mapping>;
172+
batchFn: () => void;
173+
};
174+
}
175+
166176
export type Data =
167177
| WalkerOS.Property
168178
| undefined

0 commit comments

Comments
 (0)