-
-
Notifications
You must be signed in to change notification settings - Fork 96
Open
Description
Thank you for this amazing project.
The max-prefetch argument doesn't work as expected. If there are tasks to be consumed, the prefetch queue will be bigger than the argument max-prefetch by 1.
Look at this part of the code in the receiver.prefetcher here; and consider the max-prefetch to be 0.
while True:
if finish_event.is_set():
break
try:
await self.sem_prefetch.acquire()
if (
self.max_tasks_to_execute
and fetched_tasks >= self.max_tasks_to_execute
):
logger.info("Max number of tasks executed.")
break
# Here we wait for the message to be fetched,
# but we make it with timeout so it can be interrupted
done, _ = await asyncio.wait({current_message}, timeout=0.3)
# If the message is not fetched, we release the semaphore
# and continue the loop. So it will check if finished event was set.
if not done:
self.sem_prefetch.release()
continue
# We're done, so now we need to check
# whether task has returned an error.
message = current_message.result()
## << consider the case where we already consumed a message from the previous iteration
## << and so message is an Ackable instance
current_message = asyncio.create_task(iterator.__anext__())
## << when the interpreter arrives here, the task to consume another message is created but not awaited
## << so we don't have control over when it will start consuming via the broker.
## << usually the broker will have enough time to consume a request and it will yield its value after the next await (line done, _ = await asyncio.wait({current_message}, timeout=0.3)
fetched_tasks += 1
await queue.put(message)
except (asyncio.CancelledError, StopAsyncIteration):
breakI think we shouldn't create a task an wait for it separately like we currently do.
we should something like:
while True:
if finish_event.is_set():
break
try:
await self.sem_prefetch.acquire()
if (
self.max_tasks_to_execute
and fetched_tasks >= self.max_tasks_to_execute
):
logger.info("Max number of tasks executed.")
break
# Here we wait for the message to be fetched,
# but we make it with timeout so it can be interrupted
## << create current task and consume it right away
current_message = asyncio.create_task(iterator.__anext__()) # type: ignore
done, _ = await asyncio.wait({current_message}, timeout=0.3)
# If the message is not fetched, we release the semaphore
# and continue the loop. So it will check if finished event was set.
if not done:
self.sem_prefetch.release()
continue
# We're done, so now we need to check
# whether task has returned an error.
message = current_message.result()
fetched_tasks += 1
await queue.put(message)
except (asyncio.CancelledError, StopAsyncIteration):
breakI can do a PR and do a proper fix if you agree
Metadata
Metadata
Assignees
Labels
No labels