Skip to content

Commit 7aedb96

Browse files
committed
Premerge run batch before sending it to clickhouse
1 parent 17bb0c0 commit 7aedb96

File tree

1 file changed

+26
-2
lines changed

1 file changed

+26
-2
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,26 @@ export class RunsReplicationService {
130130
flushInterval: options.flushIntervalMs ?? 100,
131131
maxConcurrency: options.maxFlushConcurrency ?? 100,
132132
callback: this.#flushBatch.bind(this),
133+
mergeBatch: (existingBatch: TaskRunInsert[], newBatch: TaskRunInsert[]) => {
134+
const merged = new Map<string, TaskRunInsert>();
135+
136+
for (const item of existingBatch) {
137+
const key = `${item.event}_${item.run.id}`;
138+
merged.set(key, item);
139+
}
140+
141+
for (const item of newBatch) {
142+
const key = `${item.event}_${item.run.id}`;
143+
const existingItem = merged.get(key);
144+
145+
// keep the run with the higher version (latest)
146+
if (!existingItem || item._version > existingItem._version) {
147+
merged.set(key, item);
148+
}
149+
}
150+
151+
return Array.from(merged.values());
152+
},
133153
logger: new Logger("ConcurrentFlushScheduler", options.logLevel ?? "info"),
134154
tracer: options.tracer,
135155
});
@@ -825,12 +845,13 @@ export type ConcurrentFlushSchedulerConfig<T> = {
825845
flushInterval: number;
826846
maxConcurrency?: number;
827847
callback: (flushId: string, batch: T[]) => Promise<void>;
848+
mergeBatch?: (existingBatch: T[], newBatch: T[]) => T[];
828849
tracer?: Tracer;
829850
logger?: Logger;
830851
};
831852

832853
export class ConcurrentFlushScheduler<T> {
833-
private currentBatch: T[]; // Adjust the type according to your data structure
854+
private currentBatch: T[];
834855
private readonly BATCH_SIZE: number;
835856
private readonly flushInterval: number;
836857
private readonly MAX_CONCURRENCY: number;
@@ -855,7 +876,10 @@ export class ConcurrentFlushScheduler<T> {
855876
}
856877

857878
addToBatch(items: T[]): void {
858-
this.currentBatch = this.currentBatch.concat(items);
879+
this.currentBatch = this.config.mergeBatch
880+
? this.config.mergeBatch(this.currentBatch, items)
881+
: this.currentBatch.concat(items);
882+
859883
this.#flushNextBatchIfNeeded();
860884
}
861885

0 commit comments

Comments
 (0)