@@ -236,18 +236,13 @@ function toErrorResponse(
236236// ---------------------------------------------------------------------------
237237
238238export abstract class RouterIntegration < T = Record < string , unknown > > {
239- /**
240- * Optional bulk lookup before transformation — runs once for the entire batch.
241- * Use when the integration needs external data (e.g. resolve 30 distinct users once
242- * instead of one lookup per event). Cache results for use in batchTransform.
243- */
244- preTransform ?( inputs : RouterTransformationRequestData [ ] ) : Promise < void > ; // need to be merged with batchTransform
245-
246239 /**
247240 * Integration transforms ALL events and groups them into endpoint buckets.
241+ * Async to allow bulk lookups (e.g. deduplication, identity resolution) before transformation.
242+ * Cache lookup results on `this` for use during transformation.
248243 * Must be implemented by each destination.
249244 */
250- abstract batchTransform ( inputs : RouterTransformationRequestData [ ] ) : BatchTransformResult < T > ;
245+ abstract batchTransform ( inputs : RouterTransformationRequestData [ ] ) : Promise < BatchTransformResult < T > > ;
251246
252247 /**
253248 * Chunks the group and builds one BatchRequest per chunk.
@@ -345,17 +340,14 @@ export async function processBatchedDestination<T>(
345340 // 2. Validate — invalid events are pushed to results, valid inputs returned
346341 const validInputs = integration . validate ( inputs , results ) ;
347342
348- // 3. Optional bulk lookup before transformation
349- if ( integration . preTransform ) {
350- await integration . preTransform ( validInputs ) ;
351- }
352-
353- // 4. Split valid inputs on dontBatch flag
343+ // 3. Split valid inputs on dontBatch flag
354344 const { batchable, nonBatchable } = groupByDontBatchDirective ( validInputs ) ;
355345
356- // 5. NonBatchable — each event processed as a batch of 1
357- for ( const event of nonBatchable ) {
358- const { groupedEvents, errorEvents } = integration . batchTransform ( [ event ] ) ;
346+ // 4. NonBatchable — each event processed as a batch of 1 (parallel)
347+ const nonBatchableResults = await Promise . all (
348+ nonBatchable . map ( async ( event ) => ( { event, result : await integration . batchTransform ( [ event ] ) } ) ) ,
349+ ) ;
350+ for ( const { event, result : { groupedEvents, errorEvents } } of nonBatchableResults ) {
359351
360352 for ( const e of errorEvents ) {
361353 results . push ( toErrorResponse ( e , metadataMap , event . destination ) ) ;
@@ -373,9 +365,9 @@ export async function processBatchedDestination<T>(
373365 }
374366 }
375367
376- // 6 . Batchable — integration transforms + groups all events at once
368+ // 5 . Batchable — integration transforms + groups all events at once
377369 if ( batchable . length > 0 ) {
378- const { groupedEvents, errorEvents } = integration . batchTransform ( batchable ) ;
370+ const { groupedEvents, errorEvents } = await integration . batchTransform ( batchable ) ;
379371 const { destination } = batchable [ 0 ] ;
380372
381373 for ( const e of errorEvents ) {
0 commit comments