@@ -3,6 +3,9 @@ const { default: Redlock } = require("redlock");
33import { AsyncLocalStorage } from "async_hooks" ;
44import { Redis } from "@internal/redis" ;
55import * as redlock from "redlock" ;
6+ import { tryCatch } from "@trigger.dev/core" ;
7+ import { Logger } from "@trigger.dev/core/logger" ;
8+ import { startSpan , Tracer } from "@internal/tracing" ;
69
710interface LockContext {
811 resources : string ;
@@ -12,8 +15,10 @@ interface LockContext {
1215export class RunLocker {
1316 private redlock : InstanceType < typeof redlock . default > ;
1417 private asyncLocalStorage : AsyncLocalStorage < LockContext > ;
18+ private logger : Logger ;
19+ private tracer : Tracer ;
1520
16- constructor ( options : { redis : Redis } ) {
21+ constructor ( options : { redis : Redis ; logger : Logger ; tracer : Tracer } ) {
1722 this . redlock = new Redlock ( [ options . redis ] , {
1823 driftFactor : 0.01 ,
1924 retryCount : 10 ,
@@ -22,30 +27,54 @@ export class RunLocker {
2227 automaticExtensionThreshold : 500 , // time in ms
2328 } ) ;
2429 this . asyncLocalStorage = new AsyncLocalStorage < LockContext > ( ) ;
30+ this . logger = options . logger ;
31+ this . tracer = options . tracer ;
2532 }
2633
2734 /** Locks resources using RedLock. It won't lock again if we're already inside a lock with the same resources. */
2835 async lock < T > (
36+ name : string ,
2937 resources : string [ ] ,
3038 duration : number ,
3139 routine : ( signal : redlock . RedlockAbortSignal ) => Promise < T >
3240 ) : Promise < T > {
3341 const currentContext = this . asyncLocalStorage . getStore ( ) ;
3442 const joinedResources = resources . sort ( ) . join ( "," ) ;
3543
36- if ( currentContext && currentContext . resources === joinedResources ) {
37- // We're already inside a lock with the same resources, just run the routine
38- return routine ( currentContext . signal ) ;
39- }
44+ return startSpan (
45+ this . tracer ,
46+ "RunLocker.lock" ,
47+ async ( span ) => {
48+ if ( currentContext && currentContext . resources === joinedResources ) {
49+ span . setAttribute ( "nested" , true ) ;
50+ // We're already inside a lock with the same resources, just run the routine
51+ return routine ( currentContext . signal ) ;
52+ }
4053
41- // Different resources or not in a lock, proceed with new lock
42- return this . redlock . using ( resources , duration , async ( signal ) => {
43- const newContext : LockContext = { resources : joinedResources , signal } ;
54+ span . setAttribute ( "nested" , false ) ;
4455
45- return this . asyncLocalStorage . run ( newContext , async ( ) => {
46- return routine ( signal ) ;
47- } ) ;
48- } ) ;
56+ // Different resources or not in a lock, proceed with new lock
57+ const [ error , result ] = await tryCatch (
58+ this . redlock . using ( resources , duration , async ( signal ) => {
59+ const newContext : LockContext = { resources : joinedResources , signal } ;
60+
61+ return this . asyncLocalStorage . run ( newContext , async ( ) => {
62+ return routine ( signal ) ;
63+ } ) ;
64+ } )
65+ ) ;
66+
67+ if ( error ) {
68+ this . logger . error ( "[RunLocker] Error locking resources" , { error, resources, duration } ) ;
69+ throw error ;
70+ }
71+
72+ return result ;
73+ } ,
74+ {
75+ attributes : { name, resources, timeout : duration } ,
76+ }
77+ ) ;
4978 }
5079
5180 isInsideLock ( ) : boolean {
0 commit comments