11import type { ClickHouse , RawTaskRunPayloadV1 , TaskRunV2 } from "@internal/clickhouse" ;
2- import { RedisOptions } from "@internal/redis" ;
2+ import { type RedisOptions } from "@internal/redis" ;
33import {
44 LogicalReplicationClient ,
55 type MessageDelete ,
@@ -8,14 +8,13 @@ import {
88 type PgoutputMessage ,
99} from "@internal/replication" ;
1010import { recordSpanError , startSpan , trace , type Tracer } from "@internal/tracing" ;
11- import { Logger , LogLevel } from "@trigger.dev/core/logger" ;
11+ import { Logger , type LogLevel } from "@trigger.dev/core/logger" ;
1212import { tryCatch } from "@trigger.dev/core/utils" ;
1313import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization" ;
14- import { TaskRun } from "@trigger.dev/database" ;
14+ import { type TaskRun } from "@trigger.dev/database" ;
1515import { nanoid } from "nanoid" ;
1616import EventEmitter from "node:events" ;
1717import pLimit from "p-limit" ;
18- import { logger } from "./logger.server" ;
1918import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings" ;
2019
2120interface TransactionEvent < T = any > {
@@ -64,6 +63,9 @@ type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update
6463
6564export type RunsReplicationServiceEvents = {
6665 message : [ { lsn : string ; message : PgoutputMessage ; service : RunsReplicationService } ] ;
66+ batchFlushed : [
67+ { flushId : string ; taskRunInserts : TaskRunV2 [ ] ; payloadInserts : RawTaskRunPayloadV1 [ ] }
68+ ] ;
6769} ;
6870
6971export class RunsReplicationService {
@@ -130,6 +132,29 @@ export class RunsReplicationService {
130132 flushInterval : options . flushIntervalMs ?? 100 ,
131133 maxConcurrency : options . maxFlushConcurrency ?? 100 ,
132134 callback : this . #flushBatch. bind ( this ) ,
135+ // we can do some pre-merging to reduce the amount of data we need to send to clickhouse
136+ mergeBatch : ( existingBatch : TaskRunInsert [ ] , newBatch : TaskRunInsert [ ] ) => {
137+ const merged = new Map < string , TaskRunInsert > ( ) ;
138+
139+ for ( const item of existingBatch ) {
140+ const key = `${ item . event } _${ item . run . id } ` ;
141+ merged . set ( key , item ) ;
142+ }
143+
144+ for ( const item of newBatch ) {
145+ const key = `${ item . event } _${ item . run . id } ` ;
146+ const existingItem = merged . get ( key ) ;
147+
148+ // Keep the run with the higher version (latest)
149+ // and take the last occurrence for that version.
150+ // Items originating from the same DB transaction have the same version.
151+ if ( ! existingItem || item . _version >= existingItem . _version ) {
152+ merged . set ( key , item ) ;
153+ }
154+ }
155+
156+ return Array . from ( merged . values ( ) ) ;
157+ } ,
133158 logger : new Logger ( "ConcurrentFlushScheduler" , options . logLevel ?? "info" ) ,
134159 tracer : options . tracer ,
135160 } ) ;
@@ -467,11 +492,33 @@ export class RunsReplicationService {
467492
468493 const taskRunInserts = preparedInserts
469494 . map ( ( { taskRunInsert } ) => taskRunInsert )
470- . filter ( Boolean ) ;
495+ . filter ( Boolean )
496+ // batch inserts in clickhouse are more performant if the items
497+ // are pre-sorted by the primary key
498+ . sort ( ( a , b ) => {
499+ if ( a . organization_id !== b . organization_id ) {
500+ return a . organization_id < b . organization_id ? - 1 : 1 ;
501+ }
502+ if ( a . project_id !== b . project_id ) {
503+ return a . project_id < b . project_id ? - 1 : 1 ;
504+ }
505+ if ( a . environment_id !== b . environment_id ) {
506+ return a . environment_id < b . environment_id ? - 1 : 1 ;
507+ }
508+ if ( a . created_at !== b . created_at ) {
509+ return a . created_at - b . created_at ;
510+ }
511+ return a . run_id < b . run_id ? - 1 : 1 ;
512+ } ) ;
471513
472514 const payloadInserts = preparedInserts
473515 . map ( ( { payloadInsert } ) => payloadInsert )
474- . filter ( Boolean ) ;
516+ . filter ( Boolean )
517+ // batch inserts in clickhouse are more performant if the items
518+ // are pre-sorted by the primary key
519+ . sort ( ( a , b ) => {
520+ return a . run_id < b . run_id ? - 1 : 1 ;
521+ } ) ;
475522
476523 span . setAttribute ( "task_run_inserts" , taskRunInserts . length ) ;
477524 span . setAttribute ( "payload_inserts" , payloadInserts . length ) ;
@@ -519,6 +566,8 @@ export class RunsReplicationService {
519566 taskRunInserts : taskRunInserts . length ,
520567 payloadInserts : payloadInserts . length ,
521568 } ) ;
569+
570+ this . events . emit ( "batchFlushed" , { flushId, taskRunInserts, payloadInserts } ) ;
522571 } ) ;
523572 }
524573
@@ -825,12 +874,13 @@ export type ConcurrentFlushSchedulerConfig<T> = {
825874 flushInterval : number ;
826875 maxConcurrency ?: number ;
827876 callback : ( flushId : string , batch : T [ ] ) => Promise < void > ;
877+ mergeBatch ?: ( existingBatch : T [ ] , newBatch : T [ ] ) => T [ ] ;
828878 tracer ?: Tracer ;
829879 logger ?: Logger ;
830880} ;
831881
832882export class ConcurrentFlushScheduler < T > {
833- private currentBatch : T [ ] ; // Adjust the type according to your data structure
883+ private currentBatch : T [ ] ;
834884 private readonly BATCH_SIZE : number ;
835885 private readonly flushInterval : number ;
836886 private readonly MAX_CONCURRENCY : number ;
@@ -855,7 +905,10 @@ export class ConcurrentFlushScheduler<T> {
855905 }
856906
857907 addToBatch ( items : T [ ] ) : void {
858- this . currentBatch = this . currentBatch . concat ( items ) ;
908+ this . currentBatch = this . config . mergeBatch
909+ ? this . config . mergeBatch ( this . currentBatch , items )
910+ : this . currentBatch . concat ( items ) ;
911+
859912 this . #flushNextBatchIfNeeded( ) ;
860913 }
861914
0 commit comments