-
Notifications
You must be signed in to change notification settings - Fork 15
Add Batch routing support via @service_endpoint
with configurable batch size and timeout
#304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Instead of selecting a replica immediately, incoming requests are enqueued | ||
and grouped into batches. Once a batch is ready (either reaching the maximum | ||
size or exceeding the maximum wait time), the batcher makes a single routing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to base the wait time on the status of the replica instead of a fixed time? For example, if the replica is still busy, we can let the batch grow larger, but if the replica is free for some minimum time interval, then we can send the batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely. We could make the batch timeout adaptive based on replica load (e.g., wait longer when replicas are busy and flush earlier when they’re idle). I’d prefer to land this current version first, then explore that as a follow-up improvement once the base batching logic is stable. Just added a TODO in the while loop.
# TODO: make timeout adaptive based on replica load.
@service_endpoint
with configurable batch size and timeout@service_endpoint
with configurable batch size and timeout
session_id=None, | ||
function=self.function, | ||
args=args, | ||
kwargs={}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@allenwang28 Do we want to support kwargs here?
results = [results] * len(batch) | ||
else: | ||
# scalar result, broadcast to batch size | ||
results = [results] * len(batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@allenwang28 Do we want to handle when the returned results have different length or a scalar?
Migrated from #177
Context: #160
Add batch routing to
Service
to improve request throughput and maintain session-aware routing.Added new
@service_endpoint
decorator that supports routing configuration (router
,batch_size
,batch_timeout
).Introduced
ServiceEndpointProperty
to distinguish between@endpoint
and@service_endpoint
.Centralized endpoint-to-router mapping in
Service
(self.routers
) with support for both plain routers and batchers.Updated
ServiceInterface
to register endpoints through_set_router
, ensuring consistent setup for both standard and service endpoints.Extended
_call
and_get_replica
to handle batch routing, session routing, and fallback routing in a unified way.Enhanced
Service.stop
to gracefully shut down any active batchers in addition to replicas.Added integration tests to validate:
batch_size
is reachedTest