Skip to content

Commit 8e6a8d3

Browse files
committed
fix(run-queue): Scan for queues using a duplicate redis client instead of the instance version
1 parent 6f6ca01 commit 8e6a8d3

File tree

1 file changed

+27
-3
lines changed
  • internal-packages/run-engine/src/run-queue

1 file changed

+27
-3
lines changed

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -909,7 +909,9 @@ export class RunQueue {
909909
onError?: (error: Error) => void
910910
): { stream: Readable; redis: Redis } {
911911
const pattern = this.keys.currentConcurrencySetKeyScanPattern();
912-
const stream = this.redis.scanStream({
912+
const redis = this.redis.duplicate();
913+
914+
const stream = redis.scanStream({
913915
match: pattern,
914916
count,
915917
type: "set",
@@ -925,7 +927,7 @@ export class RunQueue {
925927

926928
return {
927929
stream,
928-
redis: this.redis,
930+
redis,
929931
};
930932
}
931933

@@ -1938,11 +1940,23 @@ export class RunQueue {
19381940
);
19391941
});
19401942

1943+
const [scanError] = await tryCatch(promise);
1944+
1945+
if (scanError) {
1946+
this.logger.error("Error scanning concurrency sets", {
1947+
error: scanError,
1948+
});
1949+
}
1950+
1951+
await redis.quit();
1952+
19411953
return promise;
19421954
}
19431955

19441956
private async processConcurrencySet(concurrencyKey: string) {
1945-
const stream = this.redis.sscanStream(concurrencyKey, {
1957+
const redis = this.redis.duplicate();
1958+
1959+
const stream = redis.sscanStream(concurrencyKey, {
19461960
count: 100,
19471961
});
19481962

@@ -1991,6 +2005,16 @@ export class RunQueue {
19912005
stream.resume();
19922006
});
19932007

2008+
const [scanError] = await tryCatch(promise);
2009+
2010+
if (scanError) {
2011+
this.logger.error("Error scanning concurrency sets", {
2012+
error: scanError,
2013+
});
2014+
}
2015+
2016+
await redis.quit();
2017+
19942018
return promise;
19952019
}
19962020

0 commit comments

Comments
 (0)