@@ -9,7 +9,7 @@ import type { PrismaClientOrTransaction } from "~/db.server";
99import type { AuthenticatedEnvironment } from "~/services/apiAuth.server" ;
1010import { handleMetadataPacket , MetadataTooLargeError } from "~/utils/packets" ;
1111import { ServiceValidationError } from "~/v3/services/common.server" ;
12- import { Effect , Schedule , Duration } from "effect" ;
12+ import { Effect , Schedule , Duration , Fiber } from "effect" ;
1313import { type RuntimeFiber } from "effect/Fiber" ;
1414import { setTimeout } from "timers/promises" ;
1515import { Logger , LogLevel } from "@trigger.dev/core/logger" ;
@@ -102,6 +102,12 @@ export class UpdateMetadataService {
102102 this . _flushFiber = Effect . runFork ( program as Effect . Effect < void , never , never > ) ;
103103 }
104104
105+ stopFlushing ( ) {
106+ if ( this . _flushFiber ) {
107+ Effect . runFork ( Fiber . interrupt ( this . _flushFiber ) ) ;
108+ }
109+ }
110+
105111 private _processBufferedOperations = (
106112 operations : Map < string , BufferedRunMetadataChangeOperation [ ] >
107113 ) => {
@@ -226,6 +232,11 @@ export class UpdateMetadataService {
226232 return ;
227233 }
228234
235+ // Testing hook before update
236+ if ( this . options . onBeforeUpdate ) {
237+ yield * _ ( Effect . tryPromise ( ( ) => this . options . onBeforeUpdate ! ( runId ) ) ) ;
238+ }
239+
229240 const result = yield * _ (
230241 Effect . tryPromise ( ( ) =>
231242 this . _prisma . taskRun . updateMany ( {
@@ -378,6 +389,11 @@ export class UpdateMetadataService {
378389 throw new Error ( `Run ${ runId } not found` ) ;
379390 }
380391
392+ // Testing hook after read
393+ if ( this . options . onAfterRead ) {
394+ await this . options . onAfterRead ( runId , run . metadataVersion ) ;
395+ }
396+
381397 // Parse the current metadata
382398 const currentMetadata = await ( run . metadata
383399 ? parsePacket ( { data : run . metadata , dataType : run . metadataType } )
@@ -401,6 +417,11 @@ export class UpdateMetadataService {
401417 throw new ServiceValidationError ( "Unable to update metadata" ) ;
402418 }
403419
420+ // Testing hook before update
421+ if ( this . options . onBeforeUpdate ) {
422+ await this . options . onBeforeUpdate ( runId ) ;
423+ }
424+
404425 // Update with optimistic locking
405426 const result = await this . _prisma . taskRun . updateMany ( {
406427 where : {
0 commit comments