Skip to content

Commit d44a525

Browse files
committed
fixed configuration
1 parent 33d134f commit d44a525

File tree

2 files changed

+16
-7
lines changed

2 files changed

+16
-7
lines changed

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,8 +415,9 @@ const EnvironmentSchema = z.object({
415415

416416
MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED: z.string().default("0"),
417417
MARQS_WORKER_ENABLED: z.string().default("0"),
418-
MARQS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(10),
419-
MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
418+
MARQS_WORKER_COUNT: z.coerce.number().int().default(2),
419+
MARQS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50),
420+
MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(5),
420421
MARQS_WORKER_POLL_INTERVAL_MS: z.coerce.number().int().default(100),
421422
MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS: z.coerce.number().int().default(100),
422423
MARQS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
trace,
99
Tracer,
1010
} from "@opentelemetry/api";
11+
import { type RedisOptions } from "@internal/redis";
1112
import {
1213
SEMATTRS_MESSAGE_ID,
1314
SEMATTRS_MESSAGING_SYSTEM,
@@ -84,6 +85,7 @@ export type MarQSOptions = {
8485
shutdownTimeoutMs?: number;
8586
concurrency?: WorkerConcurrencyOptions;
8687
enabled?: boolean;
88+
redisOptions: RedisOptions;
8789
};
8890
};
8991

@@ -116,10 +118,7 @@ export class MarQS {
116118

117119
this.worker = new Worker({
118120
name: "marqs-worker",
119-
redisOptions: {
120-
...options.redis.options,
121-
keyPrefix: `${options.redis.options.keyPrefix}:worker`,
122-
},
121+
redisOptions: options.workerOptions.redisOptions,
123122
catalog: workerCatalog,
124123
concurrency: options.workerOptions?.concurrency,
125124
pollIntervalMs: options.workerOptions?.pollIntervalMs ?? 1000,
@@ -2621,10 +2620,19 @@ function getMarQSClient() {
26212620
immediatePollIntervalMs: env.MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS,
26222621
shutdownTimeoutMs: env.MARQS_WORKER_SHUTDOWN_TIMEOUT_MS,
26232622
concurrency: {
2624-
workers: env.MARQS_WORKER_CONCURRENCY_LIMIT,
2623+
workers: env.MARQS_WORKER_COUNT,
26252624
tasksPerWorker: env.MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER,
26262625
limit: env.MARQS_WORKER_CONCURRENCY_LIMIT,
26272626
},
2627+
redisOptions: {
2628+
keyPrefix: KEY_PREFIX,
2629+
port: env.REDIS_PORT ?? undefined,
2630+
host: env.REDIS_HOST ?? undefined,
2631+
username: env.REDIS_USERNAME ?? undefined,
2632+
password: env.REDIS_PASSWORD ?? undefined,
2633+
enableAutoPipelining: true,
2634+
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
2635+
},
26282636
},
26292637
});
26302638
}

0 commit comments

Comments
 (0)