Skip to content

Commit 0eaa748

Browse files
eraklisamuelcolvin
andauthored
create worker pool with worker's queue (#218)
* create worker pool with worker's queue * add test Co-authored-by: Samuel Colvin <[email protected]>
1 parent dde0ea9 commit 0eaa748

File tree

2 files changed

+28
-1
lines changed

2 files changed

+28
-1
lines changed

arq/worker.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,10 @@ def pool(self) -> ArqRedis:
292292
async def main(self) -> None:
293293
if self._pool is None:
294294
self._pool = await create_pool(
295-
self.redis_settings, job_deserializer=self.job_deserializer, job_serializer=self.job_serializer
295+
self.redis_settings,
296+
job_deserializer=self.job_deserializer,
297+
job_serializer=self.job_serializer,
298+
default_queue_name=self.queue_name,
296299
)
297300

298301
logger.info('Starting worker for %d functions: %s', len(self.functions), ', '.join(self.functions))

tests/test_main.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,30 @@ async def parent_job(ctx):
9090
assert inner_result == 42
9191

9292

93+
async def test_enqueue_job_custom_queue(arq_redis: ArqRedis, worker):
94+
async def foobar(ctx):
95+
return 42
96+
97+
async def parent_job(ctx):
98+
inner_job = await ctx['redis'].enqueue_job('foobar')
99+
return inner_job.job_id
100+
101+
job = await arq_redis.enqueue_job('parent_job', _queue_name='spanner')
102+
103+
worker: Worker = worker(
104+
functions=[func(parent_job, name='parent_job'), func(foobar, name='foobar')],
105+
arq_redis=None,
106+
queue_name='spanner',
107+
)
108+
109+
await worker.main()
110+
inner_job_id = await job.result(pole_delay=0)
111+
assert inner_job_id is not None
112+
inner_job = Job(inner_job_id, arq_redis, _queue_name='spanner')
113+
inner_result = await inner_job.result(pole_delay=0)
114+
assert inner_result == 42
115+
116+
93117
async def test_job_error(arq_redis: ArqRedis, worker):
94118
async def foobar(ctx):
95119
raise RuntimeError('foobar error')

0 commit comments

Comments
 (0)