@@ -5,17 +5,14 @@ import {
55 RunMetadataChangeOperation ,
66 UpdateMetadataRequestBody ,
77} from "@trigger.dev/core/v3" ;
8- import { prisma , PrismaClientOrTransaction } from "~/db.server" ;
9- import { AuthenticatedEnvironment } from "~/services/apiAuth.server" ;
10- import { handleMetadataPacket } from "~/utils/packets" ;
11- import { BaseService , ServiceValidationError } from "~/v3/services/baseService.server" ;
12-
8+ import type { PrismaClientOrTransaction } from "~/db.server" ;
9+ import type { AuthenticatedEnvironment } from "~/services/apiAuth.server" ;
10+ import { handleMetadataPacket , MetadataTooLargeError } from "~/utils/packets" ;
11+ import { ServiceValidationError } from "~/v3/services/common.server" ;
1312import { Effect , Schedule , Duration } from "effect" ;
1413import { type RuntimeFiber } from "effect/Fiber" ;
15- import { logger } from "../logger.server" ;
16- import { singleton } from "~/utils/singleton" ;
17- import { env } from "~/env.server" ;
1814import { setTimeout } from "timers/promises" ;
15+ import { Logger , LogLevel } from "@trigger.dev/core/logger" ;
1916
2017const RUN_UPDATABLE_WINDOW_MS = 60 * 60 * 1000 ; // 1 hour
2118
@@ -25,30 +22,49 @@ type BufferedRunMetadataChangeOperation = {
2522 operation : RunMetadataChangeOperation ;
2623} ;
2724
28- export class UpdateMetadataService extends BaseService {
25+ export type UpdateMetadataServiceOptions = {
26+ prisma : PrismaClientOrTransaction ;
27+ flushIntervalMs ?: number ;
28+ flushEnabled ?: boolean ;
29+ flushLoggingEnabled ?: boolean ;
30+ maximumSize ?: number ;
31+ logger ?: Logger ;
32+ logLevel ?: LogLevel ;
33+ // Testing hooks
34+ onBeforeUpdate ?: ( runId : string ) => Promise < void > ;
35+ onAfterRead ?: ( runId : string , metadataVersion : number ) => Promise < void > ;
36+ } ;
37+
38+ export class UpdateMetadataService {
2939 private _bufferedOperations : Map < string , BufferedRunMetadataChangeOperation [ ] > = new Map ( ) ;
3040 private _flushFiber : RuntimeFiber < void > | null = null ;
31-
32- constructor (
33- protected readonly _prisma : PrismaClientOrTransaction = prisma ,
34- private readonly flushIntervalMs : number = 5000 ,
35- private readonly flushEnabled : boolean = true ,
36- private readonly flushLoggingEnabled : boolean = true
37- ) {
38- super ( ) ;
41+ private readonly _prisma : PrismaClientOrTransaction ;
42+ private readonly flushIntervalMs : number ;
43+ private readonly flushEnabled : boolean ;
44+ private readonly flushLoggingEnabled : boolean ;
45+ private readonly maximumSize : number ;
46+ private readonly logger : Logger ;
47+
48+ constructor ( private readonly options : UpdateMetadataServiceOptions ) {
49+ this . _prisma = options . prisma ;
50+ this . flushIntervalMs = options . flushIntervalMs ?? 5000 ;
51+ this . flushEnabled = options . flushEnabled ?? true ;
52+ this . flushLoggingEnabled = options . flushLoggingEnabled ?? true ;
53+ this . maximumSize = options . maximumSize ?? 1024 * 1024 * 1 ; // 1MB
54+ this . logger = options . logger ?? new Logger ( "UpdateMetadataService" , options . logLevel ?? "info" ) ;
3955
4056 this . _startFlushing ( ) ;
4157 }
4258
4359 // Start a loop that periodically flushes buffered operations
4460 private _startFlushing ( ) {
4561 if ( ! this . flushEnabled ) {
46- logger . info ( "[UpdateMetadataService] 🚽 Flushing disabled" ) ;
62+ this . logger . info ( "[UpdateMetadataService] 🚽 Flushing disabled" ) ;
4763
4864 return ;
4965 }
5066
51- logger . info ( "[UpdateMetadataService] 🚽 Flushing started" ) ;
67+ this . logger . info ( "[UpdateMetadataService] 🚽 Flushing started" ) ;
5268
5369 // Create a program that sleeps, then processes buffered ops
5470 const program = Effect . gen ( this , function * ( _ ) {
@@ -62,7 +78,7 @@ export class UpdateMetadataService extends BaseService {
6278
6379 yield * Effect . sync ( ( ) => {
6480 if ( this . flushLoggingEnabled ) {
65- logger . debug ( `[UpdateMetadataService] Flushing operations` , {
81+ this . logger . debug ( `[UpdateMetadataService] Flushing operations` , {
6682 operations : Object . fromEntries ( currentOperations ) ,
6783 } ) ;
6884 }
@@ -77,7 +93,7 @@ export class UpdateMetadataService extends BaseService {
7793 // Handle any unexpected errors, ensuring program does not fail
7894 Effect . catchAll ( ( error ) =>
7995 Effect . sync ( ( ) => {
80- logger . error ( "Error in flushing program:" , { error } ) ;
96+ this . logger . error ( "Error in flushing program:" , { error } ) ;
8197 } )
8298 )
8399 ) ;
@@ -101,7 +117,7 @@ export class UpdateMetadataService extends BaseService {
101117
102118 yield * Effect . sync ( ( ) => {
103119 if ( this . flushLoggingEnabled ) {
104- logger . debug ( `[UpdateMetadataService] Processing operations for run` , {
120+ this . logger . debug ( `[UpdateMetadataService] Processing operations for run` , {
105121 runId,
106122 operationsCount : processedOps . length ,
107123 } ) ;
@@ -111,6 +127,25 @@ export class UpdateMetadataService extends BaseService {
111127 // Update run with retry
112128 yield * _ (
113129 this . _updateRunWithOperations ( runId , processedOps ) . pipe (
130+ // Catch MetadataTooLargeError before retry logic
131+ Effect . catchIf (
132+ ( error ) => error instanceof MetadataTooLargeError ,
133+ ( error ) =>
134+ Effect . sync ( ( ) => {
135+ // Log the error but don't return operations to buffer
136+ console . error (
137+ `[UpdateMetadataService] Dropping operations for run ${ runId } due to metadata size limit:` ,
138+ error . message
139+ ) ;
140+ if ( this . flushLoggingEnabled ) {
141+ this . logger . warn ( `[UpdateMetadataService] Metadata too large for run` , {
142+ runId,
143+ operationsCount : processedOps . length ,
144+ error : error . message ,
145+ } ) ;
146+ }
147+ } )
148+ ) ,
114149 Effect . retry ( Schedule . exponential ( Duration . millis ( 100 ) , 1.4 ) ) ,
115150 Effect . catchAll ( ( error ) =>
116151 Effect . sync ( ( ) => {
@@ -145,6 +180,11 @@ export class UpdateMetadataService extends BaseService {
145180 return yield * _ ( Effect . fail ( new Error ( `Run ${ runId } not found` ) ) ) ;
146181 }
147182
183+ // Testing hook after read
184+ if ( this . options . onAfterRead ) {
185+ yield * _ ( Effect . tryPromise ( ( ) => this . options . onAfterRead ! ( runId , run . metadataVersion ) ) ) ;
186+ }
187+
148188 const metadata = yield * _ (
149189 Effect . tryPromise ( ( ) =>
150190 run . metadata
@@ -160,19 +200,29 @@ export class UpdateMetadataService extends BaseService {
160200 ) ;
161201
162202 if ( applyResult . unappliedOperations . length === operations . length ) {
163- logger . warn ( `No operations applied for run ${ runId } ` ) ;
203+ this . logger . warn ( `No operations applied for run ${ runId } ` ) ;
164204 // If no operations were applied, return
165205 return ;
166206 }
167207
168208 // Stringify the metadata
169209 const newMetadataPacket = yield * _ (
170- Effect . try ( ( ) => handleMetadataPacket ( applyResult . newMetadata , run . metadataType ) )
210+ Effect . try ( ( ) =>
211+ handleMetadataPacket ( applyResult . newMetadata , run . metadataType , this . maximumSize )
212+ ) . pipe (
213+ Effect . mapError ( ( error ) => {
214+ // Preserve the original error if it's MetadataTooLargeError
215+ if ( "cause" in error && error . cause instanceof MetadataTooLargeError ) {
216+ return error . cause ;
217+ }
218+ return error ;
219+ } )
220+ )
171221 ) ;
172222
173223 if ( ! newMetadataPacket ) {
174224 // Log and skip if metadata is invalid
175- logger . warn ( `Invalid metadata after operations, skipping update` ) ;
225+ this . logger . warn ( `Invalid metadata after operations, skipping update` ) ;
176226 return ;
177227 }
178228
@@ -193,7 +243,7 @@ export class UpdateMetadataService extends BaseService {
193243
194244 if ( result . count === 0 ) {
195245 yield * Effect . sync ( ( ) => {
196- logger . warn ( `Optimistic lock failed for run ${ runId } ` , {
246+ this . logger . warn ( `Optimistic lock failed for run ${ runId } ` , {
197247 metadataVersion : run . metadataVersion ,
198248 } ) ;
199249 } ) ;
@@ -275,12 +325,12 @@ export class UpdateMetadataService extends BaseService {
275325 throw new ServiceValidationError ( "Cannot update metadata for a completed run" ) ;
276326 }
277327
278- if ( body . parentOperations && body . parentOperations . length > 0 && taskRun . parentTaskRun ) {
279- this . #ingestRunOperations( taskRun . parentTaskRun . id , body . parentOperations ) ;
328+ if ( body . parentOperations && body . parentOperations . length > 0 ) {
329+ this . #ingestRunOperations( taskRun . parentTaskRun ?. id ?? taskRun . id , body . parentOperations ) ;
280330 }
281331
282- if ( body . rootOperations && body . rootOperations . length > 0 && taskRun . rootTaskRun ) {
283- this . #ingestRunOperations( taskRun . rootTaskRun . id , body . rootOperations ) ;
332+ if ( body . rootOperations && body . rootOperations . length > 0 ) {
333+ this . #ingestRunOperations( taskRun . rootTaskRun ?. id ?? taskRun . id , body . rootOperations ) ;
284334 }
285335
286336 const newMetadata = await this . #updateRunMetadata( {
@@ -341,15 +391,25 @@ export class UpdateMetadataService extends BaseService {
341391 return currentMetadata ;
342392 }
343393
394+ const newMetadataPacket = handleMetadataPacket (
395+ applyResults . newMetadata ,
396+ run . metadataType ,
397+ this . maximumSize
398+ ) ;
399+
400+ if ( ! newMetadataPacket ) {
401+ throw new ServiceValidationError ( "Unable to update metadata" ) ;
402+ }
403+
344404 // Update with optimistic locking
345405 const result = await this . _prisma . taskRun . updateMany ( {
346406 where : {
347407 id : runId ,
348408 metadataVersion : run . metadataVersion ,
349409 } ,
350410 data : {
351- metadata : JSON . stringify ( applyResults . newMetadata ) ,
352- metadataType : run . metadataType ,
411+ metadata : newMetadataPacket . data ,
412+ metadataType : newMetadataPacket . dataType ,
353413 metadataVersion : {
354414 increment : 1 ,
355415 } ,
@@ -358,8 +418,8 @@ export class UpdateMetadataService extends BaseService {
358418
359419 if ( result . count === 0 ) {
360420 if ( this . flushLoggingEnabled ) {
361- logger . debug (
362- `[UpdateMetadataService][ updateRunMetadataWithOperations] Optimistic lock failed for run ${ runId } ` ,
421+ this . logger . debug (
422+ `[updateRunMetadataWithOperations] Optimistic lock failed for run ${ runId } ` ,
363423 {
364424 metadataVersion : run . metadataVersion ,
365425 }
@@ -379,13 +439,11 @@ export class UpdateMetadataService extends BaseService {
379439 }
380440
381441 if ( this . flushLoggingEnabled ) {
382- logger . debug (
383- `[UpdateMetadataService][updateRunMetadataWithOperations] Updated metadata for run ${ runId } ` ,
384- {
385- metadata : applyResults . newMetadata ,
386- operations : operations ,
387- }
388- ) ;
442+ this . logger . debug ( `[updateRunMetadataWithOperations] Updated metadata for run` , {
443+ metadata : applyResults . newMetadata ,
444+ operations : operations ,
445+ runId,
446+ } ) ;
389447 }
390448
391449 // Success! Return the new metadata
@@ -409,24 +467,25 @@ export class UpdateMetadataService extends BaseService {
409467 body : UpdateMetadataRequestBody ,
410468 existingMetadata : IOPacket
411469 ) {
412- const metadataPacket = handleMetadataPacket ( body . metadata , "application/json" ) ;
470+ const metadataPacket = handleMetadataPacket (
471+ body . metadata ,
472+ "application/json" ,
473+ this . maximumSize
474+ ) ;
413475
414476 if ( ! metadataPacket ) {
415- throw new ServiceValidationError ( "Invalid metadata" ) ;
477+ return { } ;
416478 }
417479
418480 if (
419481 metadataPacket . data !== "{}" ||
420482 ( existingMetadata . data && metadataPacket . data !== existingMetadata . data )
421483 ) {
422484 if ( this . flushLoggingEnabled ) {
423- logger . debug (
424- `[UpdateMetadataService][updateRunMetadataDirectly] Updating metadata directly for run` ,
425- {
426- metadata : metadataPacket . data ,
427- runId,
428- }
429- ) ;
485+ this . logger . debug ( `[updateRunMetadataDirectly] Updating metadata directly for run` , {
486+ metadata : metadataPacket . data ,
487+ runId,
488+ } ) ;
430489 }
431490
432491 // Update the metadata without version check
@@ -458,7 +517,7 @@ export class UpdateMetadataService extends BaseService {
458517 } ) ;
459518
460519 if ( this . flushLoggingEnabled ) {
461- logger . debug ( `[UpdateMetadataService ] Ingesting operations for run` , {
520+ this . logger . debug ( `[ingestRunOperations ] Ingesting operations for run` , {
462521 runId,
463522 bufferedOperations,
464523 } ) ;
@@ -468,15 +527,14 @@ export class UpdateMetadataService extends BaseService {
468527
469528 this . _bufferedOperations . set ( runId , [ ...existingBufferedOperations , ...bufferedOperations ] ) ;
470529 }
471- }
472530
473- export const updateMetadataService = singleton (
474- "update-metadata-service" ,
475- ( ) =>
476- new UpdateMetadataService (
477- prisma ,
478- env . BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS ,
479- env . BATCH_METADATA_OPERATIONS_FLUSH_ENABLED === "1" ,
480- env . BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED === "1"
481- )
482- ) ;
531+ // Testing method to manually trigger flush
532+ async flushOperations ( ) {
533+ const currentOperations = new Map ( this . _bufferedOperations ) ;
534+ this . _bufferedOperations . clear ( ) ;
535+
536+ if ( currentOperations . size > 0 ) {
537+ await Effect . runPromise ( this . _processBufferedOperations ( currentOperations ) ) ;
538+ }
539+ }
540+ }
0 commit comments