Skip to content

Commit 3f96ae0

Browse files
committed
Add env vars and additional spans
1 parent 3478194 commit 3f96ae0

File tree

6 files changed

+181
-98
lines changed

6 files changed

+181
-98
lines changed

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,10 @@ const EnvironmentSchema = z.object({
496496
RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000),
497497
RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
498498
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
499-
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(500),
499+
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(1000),
500+
RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS: z.coerce.number().int().default(10_000),
501+
RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD: z.coerce.number().int().default(10),
502+
RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT: z.coerce.number().int().default(10),
500503
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_SCHEDULE: z.string().optional(),
501504
RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_SCHEDULE: z.string().optional(),
502505
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS: z.coerce.number().int().optional(),

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ function createRunEngine() {
6767
dequeueBlockingTimeoutSeconds: env.RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS,
6868
masterQueueConsumersIntervalMs: env.RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS,
6969
masterQueueConsumersDisabled: env.RUN_ENGINE_WORKER_ENABLED === "0",
70+
masterQueueCooloffPeriodMs: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS,
71+
masterQueueCooloffCountThreshold: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD,
72+
masterQueueConsumerDequeueCount: env.RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT,
7073
concurrencySweeper: {
7174
scanSchedule: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_SCHEDULE,
7275
processMarkedSchedule: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_SCHEDULE,

apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,10 @@ export class WorkerGroupTokenService extends WithRunEngine {
193193
return;
194194
}
195195

196+
const cacheKey = ["worker-group-token", token, instanceName];
197+
196198
const result = await authenticatedWorkerInstanceCache.authenticatedWorkerInstance.swr(
197-
`worker-group-token-${token}`,
199+
cacheKey.join("-"),
198200
async () => {
199201
const workerGroup = await this.findWorkerGroup({ token });
200202

internal-packages/run-engine/src/engine/locking.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,16 @@ export class RunLocker {
217217
let lastError: Error | undefined;
218218

219219
for (let attempt = 0; attempt <= maxAttempts; attempt++) {
220-
const [error, acquiredLock] = await tryCatch(this.redlock.acquire(sortedResources, duration));
220+
const [error, acquiredLock] = await tryCatch(
221+
startSpan(this.tracer, "RunLocker.acquireLock", async (span) => {
222+
span.setAttributes({
223+
resources: joinedResources,
224+
attempt,
225+
totalWaitTime,
226+
});
227+
return await this.redlock.acquire(sortedResources, duration);
228+
})
229+
);
221230

222231
if (!error && acquiredLock) {
223232
lock = acquiredLock;
@@ -390,7 +399,15 @@ export class RunLocker {
390399
this.#cleanupExtension(manualContext);
391400

392401
// Release the lock using tryCatch
393-
const [releaseError] = await tryCatch(lock.release());
402+
const [releaseError] = await tryCatch(
403+
startSpan(this.tracer, "RunLocker.releaseLock", async (span) => {
404+
span.setAttributes({
405+
resources: joinedResources,
406+
lockValue: lock.value,
407+
});
408+
return await lock.release();
409+
})
410+
);
394411
if (releaseError) {
395412
this.logger.warn("[RunLocker] Error releasing lock", {
396413
error: releaseError,

internal-packages/run-engine/src/engine/types.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ export type RunEngineOptions = {
3939
masterQueueConsumersDisabled?: boolean;
4040
processWorkerQueueDebounceMs?: number;
4141
masterQueueConsumersIntervalMs?: number;
42+
masterQueueCooloffPeriodMs?: number;
43+
masterQueueCooloffCountThreshold?: number;
44+
masterQueueConsumerDequeueCount?: number;
4245
workerOptions?: WorkerConcurrencyOptions;
4346
retryOptions?: RetryOptions;
4447
defaultEnvConcurrency?: number;

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 149 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1451,92 +1451,107 @@ export class RunQueue {
14511451
shard: number;
14521452
maxCount: number;
14531453
}): Promise<DequeuedMessage[]> {
1454-
const queueConcurrencyLimitKey = this.keys.queueConcurrencyLimitKeyFromQueue(messageQueue);
1455-
const queueCurrentConcurrencyKey = this.keys.queueCurrentConcurrencyKeyFromQueue(messageQueue);
1456-
const envConcurrencyLimitKey = this.keys.envConcurrencyLimitKeyFromQueue(messageQueue);
1457-
const envConcurrencyLimitBurstFactorKey =
1458-
this.keys.envConcurrencyLimitBurstFactorKeyFromQueue(messageQueue);
1459-
const envCurrentConcurrencyKey = this.keys.envCurrentConcurrencyKeyFromQueue(messageQueue);
1460-
const messageKeyPrefix = this.keys.messageKeyPrefixFromQueue(messageQueue);
1461-
const envQueueKey = this.keys.envQueueKeyFromQueue(messageQueue);
1462-
const masterQueueKey = this.keys.masterQueueKeyForShard(shard);
1463-
1464-
this.logger.debug("#callDequeueMessagesFromQueue", {
1465-
messageQueue,
1466-
queueConcurrencyLimitKey,
1467-
envConcurrencyLimitKey,
1468-
envConcurrencyLimitBurstFactorKey,
1469-
queueCurrentConcurrencyKey,
1470-
envCurrentConcurrencyKey,
1471-
messageKeyPrefix,
1472-
envQueueKey,
1473-
masterQueueKey,
1474-
shard,
1475-
maxCount,
1476-
});
1454+
return this.#trace("callDequeueMessagesFromQueue", async (span) => {
1455+
span.setAttributes({
1456+
messageQueue,
1457+
shard,
1458+
maxCount,
1459+
});
14771460

1478-
const result = await this.redis.dequeueMessagesFromQueue(
1479-
//keys
1480-
messageQueue,
1481-
queueConcurrencyLimitKey,
1482-
envConcurrencyLimitKey,
1483-
envConcurrencyLimitBurstFactorKey,
1484-
queueCurrentConcurrencyKey,
1485-
envCurrentConcurrencyKey,
1486-
messageKeyPrefix,
1487-
envQueueKey,
1488-
masterQueueKey,
1489-
//args
1490-
messageQueue,
1491-
String(Date.now()),
1492-
String(this.options.defaultEnvConcurrency),
1493-
String(this.options.defaultEnvConcurrencyBurstFactor ?? 1),
1494-
this.options.redis.keyPrefix ?? "",
1495-
String(maxCount)
1496-
);
1461+
const queueConcurrencyLimitKey = this.keys.queueConcurrencyLimitKeyFromQueue(messageQueue);
1462+
const queueCurrentConcurrencyKey =
1463+
this.keys.queueCurrentConcurrencyKeyFromQueue(messageQueue);
1464+
const envConcurrencyLimitKey = this.keys.envConcurrencyLimitKeyFromQueue(messageQueue);
1465+
const envConcurrencyLimitBurstFactorKey =
1466+
this.keys.envConcurrencyLimitBurstFactorKeyFromQueue(messageQueue);
1467+
const envCurrentConcurrencyKey = this.keys.envCurrentConcurrencyKeyFromQueue(messageQueue);
1468+
const messageKeyPrefix = this.keys.messageKeyPrefixFromQueue(messageQueue);
1469+
const envQueueKey = this.keys.envQueueKeyFromQueue(messageQueue);
1470+
const masterQueueKey = this.keys.masterQueueKeyForShard(shard);
14971471

1498-
if (!result) {
1499-
return [];
1500-
}
1472+
this.logger.debug("#callDequeueMessagesFromQueue", {
1473+
messageQueue,
1474+
queueConcurrencyLimitKey,
1475+
envConcurrencyLimitKey,
1476+
envConcurrencyLimitBurstFactorKey,
1477+
queueCurrentConcurrencyKey,
1478+
envCurrentConcurrencyKey,
1479+
messageKeyPrefix,
1480+
envQueueKey,
1481+
masterQueueKey,
1482+
shard,
1483+
maxCount,
1484+
});
15011485

1502-
this.logger.debug("dequeueMessagesFromQueue raw result", {
1503-
result,
1504-
service: this.name,
1505-
});
1486+
const result = await this.redis.dequeueMessagesFromQueue(
1487+
//keys
1488+
messageQueue,
1489+
queueConcurrencyLimitKey,
1490+
envConcurrencyLimitKey,
1491+
envConcurrencyLimitBurstFactorKey,
1492+
queueCurrentConcurrencyKey,
1493+
envCurrentConcurrencyKey,
1494+
messageKeyPrefix,
1495+
envQueueKey,
1496+
masterQueueKey,
1497+
//args
1498+
messageQueue,
1499+
String(Date.now()),
1500+
String(this.options.defaultEnvConcurrency),
1501+
String(this.options.defaultEnvConcurrencyBurstFactor ?? 1),
1502+
this.options.redis.keyPrefix ?? "",
1503+
String(maxCount)
1504+
);
15061505

1507-
const messages = [];
1508-
for (let i = 0; i < result.length; i += 3) {
1509-
const messageId = result[i];
1510-
const messageScore = result[i + 1];
1511-
const rawMessage = result[i + 2];
1506+
if (!result) {
1507+
span.setAttribute("message_count", 0);
15121508

1513-
//read message
1514-
const parsedMessage = OutputPayload.safeParse(JSON.parse(rawMessage));
1515-
if (!parsedMessage.success) {
1516-
this.logger.error(`[${this.name}] Failed to parse message`, {
1509+
return [];
1510+
}
1511+
1512+
this.logger.debug("dequeueMessagesFromQueue raw result", {
1513+
result,
1514+
service: this.name,
1515+
});
1516+
1517+
const messages = [];
1518+
for (let i = 0; i < result.length; i += 3) {
1519+
const messageId = result[i];
1520+
const messageScore = result[i + 1];
1521+
const rawMessage = result[i + 2];
1522+
1523+
//read message
1524+
const parsedMessage = OutputPayload.safeParse(JSON.parse(rawMessage));
1525+
if (!parsedMessage.success) {
1526+
this.logger.error(`[${this.name}] Failed to parse message`, {
1527+
messageId,
1528+
error: parsedMessage.error,
1529+
service: this.name,
1530+
});
1531+
1532+
continue;
1533+
}
1534+
1535+
const message = parsedMessage.data;
1536+
1537+
messages.push({
15171538
messageId,
1518-
error: parsedMessage.error,
1519-
service: this.name,
1539+
messageScore,
1540+
message,
15201541
});
1521-
1522-
continue;
15231542
}
15241543

1525-
const message = parsedMessage.data;
1526-
1527-
messages.push({
1528-
messageId,
1529-
messageScore,
1530-
message,
1544+
this.logger.debug("dequeueMessagesFromQueue parsed result", {
1545+
messages,
1546+
service: this.name,
15311547
});
1532-
}
15331548

1534-
this.logger.debug("dequeueMessagesFromQueue parsed result", {
1535-
messages,
1536-
service: this.name,
1537-
});
1549+
const filteredMessages = messages.filter(Boolean) as DequeuedMessage[];
1550+
1551+
span.setAttribute("message_count", filteredMessages.length);
15381552

1539-
return messages.filter(Boolean) as DequeuedMessage[];
1553+
return filteredMessages;
1554+
});
15401555
}
15411556

15421557
async #callDequeueMessageFromWorkerQueue({
@@ -1569,7 +1584,16 @@ export class RunQueue {
15691584

15701585
this.abortController.signal.addEventListener("abort", cleanup);
15711586

1572-
const result = await blockingClient.blpop(workerQueueKey, blockingPopTimeoutSeconds);
1587+
const result = await this.#trace("popMessageFromWorkerQueue", async (span) => {
1588+
span.setAttributes({
1589+
workerQueue,
1590+
workerQueueKey,
1591+
blockingPopTimeoutSeconds,
1592+
blocking: true,
1593+
});
1594+
1595+
return await blockingClient.blpop(workerQueueKey, blockingPopTimeoutSeconds);
1596+
});
15731597

15741598
this.abortController.signal.removeEventListener("abort", cleanup);
15751599

@@ -1607,7 +1631,15 @@ export class RunQueue {
16071631

16081632
const [, messageKey] = result;
16091633

1610-
const workerQueueLength = await this.redis.llen(workerQueueKey);
1634+
const workerQueueLength = await this.#trace("getWorkerQueueLength", async (span) => {
1635+
span.setAttributes({
1636+
workerQueue,
1637+
workerQueueKey,
1638+
});
1639+
1640+
return await this.redis.llen(workerQueueKey);
1641+
});
1642+
16111643
const message = await this.#dequeueMessageFromKey(messageKey);
16121644

16131645
if (!message) {
@@ -1626,7 +1658,15 @@ export class RunQueue {
16261658
workerQueueKey,
16271659
});
16281660

1629-
const result = await this.redis.dequeueMessageFromWorkerQueueNonBlocking(workerQueueKey);
1661+
const result = await this.#trace("popMessageFromWorkerQueue", async (span) => {
1662+
span.setAttributes({
1663+
workerQueue,
1664+
workerQueueKey,
1665+
blocking: false,
1666+
});
1667+
1668+
return await this.redis.dequeueMessageFromWorkerQueueNonBlocking(workerQueueKey);
1669+
});
16301670

16311671
if (!result) {
16321672
return;
@@ -2070,27 +2110,42 @@ export class RunQueue {
20702110
}
20712111

20722112
async #dequeueMessageFromKey(messageKey: string) {
2073-
const rawMessage = await this.redis.dequeueMessageFromKey(
2074-
messageKey,
2075-
this.options.redis.keyPrefix ?? ""
2076-
);
2113+
return this.#trace("dequeueMessageFromKey", async (span) => {
2114+
span.setAttributes({
2115+
messageKey,
2116+
});
20772117

2078-
if (!rawMessage) {
2079-
return;
2080-
}
2118+
const rawMessage = await this.redis.dequeueMessageFromKey(
2119+
messageKey,
2120+
this.options.redis.keyPrefix ?? ""
2121+
);
20812122

2082-
const [error, message] = parseRawMessage(rawMessage);
2123+
if (!rawMessage) {
2124+
span.setAttribute("result", "NO_MESSAGE");
20832125

2084-
if (error) {
2085-
this.logger.error(`[${this.name}] Failed to parse message`, {
2086-
messageKey,
2087-
error,
2088-
service: this.name,
2089-
message: message ?? rawMessage,
2090-
});
2091-
}
2126+
return;
2127+
}
20922128

2093-
return message;
2129+
const [error, message] = parseRawMessage(rawMessage);
2130+
2131+
if (error) {
2132+
this.logger.error(`[${this.name}] Failed to parse message`, {
2133+
messageKey,
2134+
error,
2135+
service: this.name,
2136+
message: message ?? rawMessage,
2137+
});
2138+
}
2139+
2140+
if (message) {
2141+
span.setAttribute("result", "SUCCESS");
2142+
span.setAttribute("messageId", message.runId);
2143+
} else {
2144+
span.setAttribute("result", "NO_MESSAGE");
2145+
}
2146+
2147+
return message;
2148+
});
20942149
}
20952150

20962151
#registerCommands() {

0 commit comments

Comments
 (0)