NS-661 Optimize ServiceAgentReservationist queue creation for each instance.#797
NS-661 Optimize ServiceAgentReservationist queue creation for each instance.#797andreidenissov-cog wants to merge 2 commits intomainfrom
Conversation
a23172b to
167ed6f
Compare
| reservationist = ServiceAgentReservationist() | ||
| self.queues.sync_q.put(reservationist.get_queue()) | ||
| reservationist = ServiceAgentReservationist(self.queues) | ||
|
|
There was a problem hiding this comment.
Pass "main queue" as a constructor parameter to ServiceAgentReservationist.
There was a problem hiding this comment.
This ServiceAgentReservationist is the instance that is handed to the CodedTools to do their business.
Now that you have handed the main queue-of-queues to that instance, you have also handed that main queue-of-queues to the coded tools. Now any coded tool can mess with the reservations system.
I believe until this change there was no other spot in the entirety of the system where any one request had the remotest ability to mess with any other request. Now you are introducing one.
You can never underestimate the hackiness of a python programmer.
"I have this reference here, that means I can do X!"
This is a security alert in the making.
| reservationist = ServiceAgentReservationist() | ||
| self.queues.sync_q.put(reservationist.get_queue()) | ||
| reservationist = ServiceAgentReservationist(self.queues) | ||
|
|
There was a problem hiding this comment.
Pass "main queue" as a constructor parameter to ServiceAgentReservationist.
| self.queue: AsyncCollatingQueue = AsyncCollatingQueue() | ||
|
|
||
| def get_queue(self) -> AsyncCollatingQueue: | ||
| """ |
There was a problem hiding this comment.
This getter is no longer used.
| # We haven't created our queue yet, so we need to do that and put it on the main queue. | ||
| self.queue = AsyncCollatingQueue() | ||
| self.main_queue.sync_q.put(self.queue) | ||
|
|
There was a problem hiding this comment.
Create per-ServiceAgentReservationist reservations queue only when we have something to put into it.
And then submit it to the "main queue", so some processing thread can pick up this item ASAP.
There was a problem hiding this comment.
Pull request overview
This PR optimizes how ServiceAgentReservationist participates in the “queue of queues” model by lazily creating and registering its per-request AsyncCollatingQueue only when there is at least one deployment item to enqueue, avoiding worker threads blocking on empty queues.
Changes:
- Make
ServiceAgentReservationistaccept the shared Janus “main queue” and defer per-instance queue creation until firstdeploy(). - Update both sync and async agent services to construct
ServiceAgentReservationistwith the shared queue, removing eager queue registration.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
neuro_san/service/generic/service_agent_reservationist.py |
Lazily creates/registers the per-request AsyncCollatingQueue on first deploy instead of constructor-time. |
neuro_san/service/generic/async_agent_service.py |
Updates reservationist instantiation to pass the shared Janus queue (removes eager enqueue). |
neuro_san/service/generic/agent_service.py |
Updates reservationist instantiation to pass the shared Janus queue (removes eager enqueue). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self.max_lifetime_in_seconds: float = max_lifetime_in_seconds | ||
| self.queue: AsyncCollatingQueue = AsyncCollatingQueue() | ||
|
|
||
| def get_queue(self) -> AsyncCollatingQueue: | ||
| """ | ||
| :return: The AsyncCollatingQueue belonging to this Reservationist | ||
| """ | ||
| return self.queue | ||
| self.main_queue: Queue[AsyncCollatingQueue] = main_queue | ||
| self.queue: AsyncCollatingQueue = None | ||
|
|
There was a problem hiding this comment.
self.queue is initialized to None, but the class’s close() method unconditionally calls await self.queue.put_final_item(...). If no deployments occurred, close() will raise an AttributeError. Make queue optional and have close() no-op (or otherwise safely handle) when no per-request queue was ever created/registered.
There was a problem hiding this comment.
Good catch. Fixed.
| if self.queue is None: | ||
| # We haven't created our queue yet, so we need to do that and put it on the main queue. | ||
| self.queue = AsyncCollatingQueue() | ||
| self.main_queue.sync_q.put(self.queue) |
There was a problem hiding this comment.
This method is async, but it pushes the per-request queue onto the Janus main queue via sync_q.put(...). Even if it’s currently unbounded, this is a blocking call on the event loop and could deadlock/block if a maxsize is introduced later. Prefer using the async side (await main_queue.async_q.put(...)) or a non-blocking put_nowait on the sync side.
| self.main_queue.sync_q.put(self.queue) | |
| await self.main_queue.async_q.put(self.queue) |
There was a problem hiding this comment.
Used put_nowait here, but AFAIU, we cannot use async operations on this queue right here, since it is bound to another event loop.
d1donlydfink
left a comment
There was a problem hiding this comment.
In doing this optimization you are handing the CodedTools the ability to mess with the system as a whole. There's a reason it was done this way.
Simple optimization for ServiceAgentReservationist own processing queues.
Right now we create an empty queue in ServiceAgentReservationist constructor and immediately submit it to "main" queue of queues. This will result in some processing thread to latch on this empty queue and start blocking until something appears in it.
Instead, we now create the queue only when we have at least the first generated reservation item, so we'll not use up a thread from our pool for nothing.
Tested: running limited stress-test.