Skip to content

Commit 918cba8

Browse files
committed
Remove the per lock duration (duration same for all locks in a single RunLock instance)
1 parent f066447 commit 918cba8

File tree

11 files changed

+86
-233
lines changed

11 files changed

+86
-233
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ export class RunEngine {
9898
logger: this.logger,
9999
tracer: trace.getTracer("RunLocker"),
100100
meter: options.meter,
101-
defaultDuration: options.runLock.duration ?? 5000,
101+
duration: options.runLock.duration ?? 5000,
102102
automaticExtensionThreshold: options.runLock.automaticExtensionThreshold ?? 1000,
103103
retryConfig: {
104104
maxRetries: 10,
@@ -1162,7 +1162,7 @@ export class RunEngine {
11621162
tx?: PrismaClientOrTransaction;
11631163
}) {
11641164
const prisma = tx ?? this.prisma;
1165-
return await this.runLock.lock("handleStalledSnapshot", [runId], 5_000, async () => {
1165+
return await this.runLock.lock("handleStalledSnapshot", [runId], async () => {
11661166
const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId);
11671167
if (latestSnapshot.id !== snapshotId) {
11681168
this.logger.log(

internal-packages/run-engine/src/engine/locking.ts

Lines changed: 15 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,6 @@ export interface LockRetryConfig {
6666
maxTotalWaitTime?: number;
6767
}
6868

69-
interface LockOptions {
70-
/** Default lock duration in milliseconds (default: 5000) */
71-
defaultDuration?: number;
72-
/** Automatic extension threshold in milliseconds - how early to extend locks before expiration (default: 500) */
73-
automaticExtensionThreshold?: number;
74-
/** Retry configuration for lock acquisition */
75-
retryConfig?: LockRetryConfig;
76-
}
77-
7869
export class RunLocker {
7970
private redlock: InstanceType<typeof redlock.default>;
8071
private asyncLocalStorage: AsyncLocalStorage<LockContext>;
@@ -85,20 +76,20 @@ export class RunLocker {
8576
private activeManualContexts: Map<string, ManualLockContext> = new Map();
8677
private lockDurationHistogram: Histogram;
8778
private retryConfig: Required<LockRetryConfig>;
88-
private defaultDuration: number;
79+
private duration: number;
8980
private automaticExtensionThreshold: number;
9081

9182
constructor(options: {
9283
redis: Redis;
9384
logger: Logger;
9485
tracer: Tracer;
9586
meter?: Meter;
96-
defaultDuration?: number;
87+
duration?: number;
9788
automaticExtensionThreshold?: number;
9889
retryConfig?: LockRetryConfig;
9990
}) {
10091
// Initialize configuration values
101-
this.defaultDuration = options.defaultDuration ?? 5000;
92+
this.duration = options.duration ?? 5000;
10293
this.automaticExtensionThreshold = options.automaticExtensionThreshold ?? 500;
10394

10495
this.redlock = new Redlock([options.redis], {
@@ -157,48 +148,18 @@ export class RunLocker {
157148
}
158149

159150
/** Locks resources using RedLock. It won't lock again if we're already inside a lock with the same resources. */
160-
async lock<T>(
161-
name: string,
162-
resources: string[],
163-
duration: number | undefined,
164-
routine: (signal: redlock.RedlockAbortSignal) => Promise<T>
165-
): Promise<T>;
166-
async lock<T>(
167-
name: string,
168-
resources: string[],
169-
routine: (signal: redlock.RedlockAbortSignal) => Promise<T>
170-
): Promise<T>;
171-
async lock<T>(
172-
name: string,
173-
resources: string[],
174-
durationOrRoutine: number | undefined | ((signal: redlock.RedlockAbortSignal) => Promise<T>),
175-
routine?: (signal: redlock.RedlockAbortSignal) => Promise<T>
176-
): Promise<T> {
151+
async lock<T>(name: string, resources: string[], routine: () => Promise<T>): Promise<T> {
177152
const currentContext = this.asyncLocalStorage.getStore();
178153
const joinedResources = [...resources].sort().join(",");
179154

180-
// Handle overloaded parameters
181-
let actualDuration: number;
182-
let actualRoutine: (signal: redlock.RedlockAbortSignal) => Promise<T>;
183-
184-
if (typeof durationOrRoutine === "function") {
185-
// Called as lock(name, resources, routine) - use default duration
186-
actualDuration = this.defaultDuration;
187-
actualRoutine = durationOrRoutine;
188-
} else {
189-
// Called as lock(name, resources, duration, routine) - use provided duration
190-
actualDuration = durationOrRoutine ?? this.defaultDuration;
191-
actualRoutine = routine!;
192-
}
193-
194155
return startSpan(
195156
this.tracer,
196157
"RunLocker.lock",
197158
async (span) => {
198159
if (currentContext && currentContext.resources === joinedResources) {
199160
span.setAttribute("nested", true);
200161
// We're already inside a lock with the same resources, just run the routine
201-
return actualRoutine(currentContext.signal);
162+
return routine();
202163
}
203164

204165
span.setAttribute("nested", false);
@@ -208,14 +169,7 @@ export class RunLocker {
208169
const lockStartTime = performance.now();
209170

210171
const [error, result] = await tryCatch(
211-
this.#acquireAndExecute(
212-
name,
213-
resources,
214-
actualDuration,
215-
actualRoutine,
216-
lockId,
217-
lockStartTime
218-
)
172+
this.#acquireAndExecute(name, resources, this.duration, routine, lockId, lockStartTime)
219173
);
220174

221175
if (error) {
@@ -229,15 +183,15 @@ export class RunLocker {
229183
this.logger.error("[RunLocker] Error locking resources", {
230184
error,
231185
resources,
232-
duration: actualDuration,
186+
duration: this.duration,
233187
});
234188
throw error;
235189
}
236190

237191
return result;
238192
},
239193
{
240-
attributes: { name, resources, timeout: actualDuration },
194+
attributes: { name, resources, timeout: this.duration },
241195
}
242196
);
243197
}
@@ -247,7 +201,7 @@ export class RunLocker {
247201
name: string,
248202
resources: string[],
249203
duration: number,
250-
routine: (signal: redlock.RedlockAbortSignal) => Promise<T>,
204+
routine: () => Promise<T>,
251205
lockId: string,
252206
lockStartTime: number
253207
): Promise<T> {
@@ -260,7 +214,6 @@ export class RunLocker {
260214
this.retryConfig;
261215

262216
// Track timing for total wait time limit
263-
const retryStartTime = performance.now();
264217
let totalWaitTime = 0;
265218

266219
// Retry the lock acquisition with exponential backoff
@@ -398,7 +351,7 @@ export class RunLocker {
398351
let lockSuccess = true;
399352
try {
400353
const result = await this.asyncLocalStorage.run(newContext, async () => {
401-
return routine(signal);
354+
return routine();
402355
});
403356

404357
return result;
@@ -529,47 +482,12 @@ export class RunLocker {
529482
condition: boolean,
530483
name: string,
531484
resources: string[],
532-
duration: number | undefined,
533-
routine: (signal?: redlock.RedlockAbortSignal) => Promise<T>
534-
): Promise<T>;
535-
async lockIf<T>(
536-
condition: boolean,
537-
name: string,
538-
resources: string[],
539-
routine: (signal?: redlock.RedlockAbortSignal) => Promise<T>
540-
): Promise<T>;
541-
async lockIf<T>(
542-
condition: boolean,
543-
name: string,
544-
resources: string[],
545-
durationOrRoutine: number | undefined | ((signal?: redlock.RedlockAbortSignal) => Promise<T>),
546-
routine?: (signal?: redlock.RedlockAbortSignal) => Promise<T>
485+
routine: () => Promise<T>
547486
): Promise<T> {
548487
if (condition) {
549-
// Handle overloaded parameters
550-
if (typeof durationOrRoutine === "function") {
551-
// Called as lockIf(condition, name, resources, routine) - use default duration
552-
return this.lock(
553-
name,
554-
resources,
555-
durationOrRoutine as (signal: redlock.RedlockAbortSignal) => Promise<T>
556-
);
557-
} else {
558-
// Called as lockIf(condition, name, resources, duration, routine) - use provided duration
559-
return this.lock(
560-
name,
561-
resources,
562-
durationOrRoutine,
563-
routine! as (signal: redlock.RedlockAbortSignal) => Promise<T>
564-
);
565-
}
488+
return this.lock(name, resources, routine);
566489
} else {
567-
// Handle overloaded parameters for non-lock case
568-
if (typeof durationOrRoutine === "function") {
569-
return durationOrRoutine();
570-
} else {
571-
return routine!();
572-
}
490+
return routine();
573491
}
574492
}
575493

@@ -585,8 +503,8 @@ export class RunLocker {
585503
return { ...this.retryConfig };
586504
}
587505

588-
getDefaultDuration(): number {
589-
return this.defaultDuration;
506+
getDuration(): number {
507+
return this.duration;
590508
}
591509

592510
getAutomaticExtensionThreshold(): number {

internal-packages/run-engine/src/engine/systems/checkpointSystem.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ export class CheckpointSystem {
5353
}): Promise<CreateCheckpointResult> {
5454
const prisma = tx ?? this.$.prisma;
5555

56-
return await this.$.runLock.lock("createCheckpoint", [runId], 5_000, async () => {
56+
return await this.$.runLock.lock("createCheckpoint", [runId], async () => {
5757
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
5858

5959
const isValidSnapshot =
@@ -267,7 +267,7 @@ export class CheckpointSystem {
267267
}): Promise<ExecutionResult> {
268268
const prisma = tx ?? this.$.prisma;
269269

270-
return await this.$.runLock.lock("continueRunExecution", [runId], 5_000, async () => {
270+
return await this.$.runLock.lock("continueRunExecution", [runId], async () => {
271271
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
272272

273273
if (snapshot.id !== snapshotId) {

internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ export class DelayedRunSystem {
3737
this.$.tracer,
3838
"rescheduleDelayedRun",
3939
async () => {
40-
return await this.$.runLock.lock("rescheduleDelayedRun", [runId], 5_000, async () => {
40+
return await this.$.runLock.lock("rescheduleDelayedRun", [runId], async () => {
4141
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
4242

4343
//if the run isn't just created then we can't reschedule it

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,7 @@ export class DequeueSystem {
8080
const dequeuedRun = await this.$.runLock.lock(
8181
"dequeueFromWorkerQueue",
8282
[runId],
83-
5000,
84-
async (signal) => {
83+
async () => {
8584
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
8685

8786
if (!isDequeueableExecutionStatus(snapshot.executionStatus)) {
@@ -548,7 +547,7 @@ export class DequeueSystem {
548547
statusReason,
549548
});
550549

551-
return this.$.runLock.lock("pendingVersion", [runId], 5_000, async (signal) => {
550+
return this.$.runLock.lock("pendingVersion", [runId], async () => {
552551
this.$.logger.debug("RunEngine.dequeueFromWorkerQueue(): Pending version lock acquired", {
553552
runId,
554553
reason,

internal-packages/run-engine/src/engine/systems/enqueueSystem.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ export class EnqueueSystem {
5656
}) {
5757
const prisma = tx ?? this.$.prisma;
5858

59-
return await this.$.runLock.lockIf(!skipRunLock, "enqueueRun", [run.id], 5000, async () => {
59+
return await this.$.runLock.lockIf(!skipRunLock, "enqueueRun", [run.id], async () => {
6060
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
6161
run: run,
6262
snapshot: {

internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,6 @@ export class ReleaseConcurrencySystem {
263263
return await this.$.runLock.lock(
264264
"executeReleaseConcurrencyForSnapshot",
265265
[snapshot.runId],
266-
5_000,
267266
async () => {
268267
const latestSnapshot = await getLatestExecutionSnapshot(this.$.prisma, snapshot.runId);
269268

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ export class RunAttemptSystem {
8585
this.$.tracer,
8686
"startRunAttempt",
8787
async (span) => {
88-
return this.$.runLock.lock("startRunAttempt", [runId], 5000, async () => {
88+
return this.$.runLock.lock("startRunAttempt", [runId], async () => {
8989
const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId);
9090

9191
if (latestSnapshot.id !== snapshotId) {
@@ -441,7 +441,7 @@ export class RunAttemptSystem {
441441
this.$.tracer,
442442
"#completeRunAttemptSuccess",
443443
async (span) => {
444-
return this.$.runLock.lock("attemptSucceeded", [runId], 5_000, async (signal) => {
444+
return this.$.runLock.lock("attemptSucceeded", [runId], async () => {
445445
const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId);
446446

447447
if (latestSnapshot.id !== snapshotId) {
@@ -594,7 +594,7 @@ export class RunAttemptSystem {
594594
this.$.tracer,
595595
"completeRunAttemptFailure",
596596
async (span) => {
597-
return this.$.runLock.lock("attemptFailed", [runId], 5_000, async (signal) => {
597+
return this.$.runLock.lock("attemptFailed", [runId], async () => {
598598
const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId);
599599

600600
if (latestSnapshot.id !== snapshotId) {
@@ -905,7 +905,7 @@ export class RunAttemptSystem {
905905
}): Promise<{ wasRequeued: boolean } & ExecutionResult> {
906906
const prisma = tx ?? this.$.prisma;
907907

908-
return await this.$.runLock.lock("tryNackAndRequeue", [run.id], 5000, async (signal) => {
908+
return await this.$.runLock.lock("tryNackAndRequeue", [run.id], async () => {
909909
//we nack the message, this allows another work to pick up the run
910910
const gotRequeued = await this.$.runQueue.nackMessage({
911911
orgId,
@@ -982,7 +982,7 @@ export class RunAttemptSystem {
982982
reason = reason ?? "Cancelled by user";
983983

984984
return startSpan(this.$.tracer, "cancelRun", async (span) => {
985-
return this.$.runLock.lock("cancelRun", [runId], 5_000, async (signal) => {
985+
return this.$.runLock.lock("cancelRun", [runId], async () => {
986986
const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId);
987987

988988
//already finished, do nothing

internal-packages/run-engine/src/engine/systems/ttlSystem.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export class TtlSystem {
2323

2424
async expireRun({ runId, tx }: { runId: string; tx?: PrismaClientOrTransaction }) {
2525
const prisma = tx ?? this.$.prisma;
26-
await this.$.runLock.lock("expireRun", [runId], 5_000, async () => {
26+
await this.$.runLock.lock("expireRun", [runId], async () => {
2727
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
2828

2929
//if we're executing then we won't expire the run

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ export class WaitpointSystem {
380380

381381
let $waitpoints = typeof waitpoints === "string" ? [waitpoints] : waitpoints;
382382

383-
return await this.$.runLock.lock("blockRunWithWaitpoint", [runId], 5000, async () => {
383+
return await this.$.runLock.lock("blockRunWithWaitpoint", [runId], async () => {
384384
let snapshot: TaskRunExecutionSnapshot = await getLatestExecutionSnapshot(prisma, runId);
385385

386386
//block the run with the waitpoints, returning how many waitpoints are pending
@@ -549,7 +549,7 @@ export class WaitpointSystem {
549549
}
550550

551551
//4. Continue the run whether it's executing or not
552-
await this.$.runLock.lock("continueRunIfUnblocked", [runId], 5000, async () => {
552+
await this.$.runLock.lock("continueRunIfUnblocked", [runId], async () => {
553553
const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId);
554554

555555
if (isFinishedOrPendingFinished(snapshot.executionStatus)) {

0 commit comments

Comments
 (0)