-
Notifications
You must be signed in to change notification settings - Fork 182
feat: Add top-level Flow Controller #1525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add top-level Flow Controller #1525
Conversation
✅ Deploy Preview for gateway-api-inference-extension ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
Hi @LukeAVanDrie. Thanks for your PR. I'm waiting for a kubernetes-sigs member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
/assign @kfswain I broke this into several commits for easier review. PTAL at the reviewer guide. I think this change is atomic, but I am also happy to split each of these out into separate PRs if you would prefer. |
@LukeAVanDrie: GitHub didn't allow me to request PR reviews from the following users: rahulgurnani. Note that only kubernetes-sigs members and repo collaborators can review this PR, and authors cannot review their own PRs. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
This is the final functional change for the Flow Control layer. Remaining work involves
|
/ok-to-test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got through the new files, left some comments. Will continue the PR in the AM
// processor. This buffer acts as a shock absorber, decoupling the high-frequency distributor from the processor's | ||
// serial execution loop and allowing the system to handle short bursts of traffic without blocking. | ||
// Optional: Defaults to `defaultEnqueueChannelBufferSize` (100). | ||
EnqueueChannelBufferSize int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if this buffer overflows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If running with data parallelism (more than one shard), the distribution algorithm will select the best candidate and attempt a non-blocking send. If the candidate buffer is full, it immediately falls back to the next best candidate and so on. If every worker's buffer is full, then we change from a non-blocking to a blocking send on the best candidate.
As the type comment mentions, it is simply a shock absorber for transient request bursts. The goal is to get the request enqueued (in the actual queue structure) as quickly as possible. If the buffer is full, it's fine, we just try to find a less congested worker even if it is not the "best" candidate for our data parallelism strategy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kfswain Hopefully this clears it up. If you don't find the config documentation sufficient here, I can update it to better incorporate these details. Else, if no change is needed, can you resolve this thread?
|
||
// newConfig performs validation and initialization, returning a guaranteed-valid `Config` object. | ||
// This is the required constructor for creating a new configuration. | ||
// It does not mutate the input `cfg`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OOC: Why not? Can we just log what the value was before it was defaulted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// It does not mutate the input
cfg
.
We could do this. I prefer this being a pure function though. It is easier to test and less prone to unexpected side effects.
Also, I am changing this in a followup PR and removing the requirement for this constructor to be invoked (this becomes test-only convenience utility). In registry/config.co and controller/config.go I will be exposing a Config.ValidateAndApplyDefaults() *Config
method that the caller is expected to invoke before passing the config structs to the FlowRegistry
and FlowController
respectively.
Would you like me to absorb this refactoring (for at least the controller package into this PR) instead of a followup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To keep this PR focused squarely on the controller implementation itself, I'd prefer to tackle the config changes in the immediate follow-up PR I have staged.
In that PR, I implement the plan I mentioned: removing the newConfig constructor in favor of exposing a public ValidateAndApplyDefaults() method on the Config struct itself. This will make the EPP runner responsible for calling it, which is a cleaner, more explicit approach.
Does that sound like a reasonable path forward? If so, I'll resolve this thread and proceed with the follow-up PR after this one merges.
Mostly looks good, left some smaller comments. Would love to see diagrams for this and see it workin in action |
// backpressure to the caller. | ||
func (fc *FlowController) EnqueueAndWait(req types.FlowControlRequest) (types.QueueOutcome, error) { | ||
if req == nil { | ||
return types.QueueOutcomeRejectedOther, fmt.Errorf("%w: %w", types.ErrRejected, types.ErrNilRequest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rejected doesn't look correct here, shouldn't this be something like an "internal error"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, internal is also appropriate. I use two high level sentinel errors to represent pre-enqueue errors (rejection) vs post-enqueue errors (evicted). In this case though, it is clearly bad user input. I can probably also remove the ErrNilRequest sentinel error as I cannot imagine a caller will ever switch on that.
The other sentinel errors are actually useful for mapping to the proper API error at the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the error handling for a nil request based on the feedback that treating it as a "rejection" was semantically confusing. A nil request is a programmatic/caller error, not an operational rejection by the queueing system.
The current implementation now returns a direct, non-sentinel error: errors.New("request cannot be nil")
.
This leads to a point of design friction I'd like to discuss:
- Original Intent: The system was designed with two high-level sentinel errors to categorize failures:
ErrRejected
for pre-enqueue issues (like capacity) andErrEvicted
for post-enqueue issues (like TTL expiry). TheQueueOutcome
enum was meant to provide granular labels for metrics, with eachRejected
orEvicted
outcome paired with its corresponding high-level error. - Current State: By removing the ErrRejected wrapper for the nil request case, we've made the error itself more semantically correct. However, we are still returning
QueueOutcomeRejectedOther
. This slightly breaks the clean pairing, as we now have a "Rejected" outcome that isn't associated with anErrRejected
error.
This is a reasonable compromise, but a bit confusing.
This commit refactors some of the core Flow Control contracts to improve clarity and better align with their intended roles. The goal is to create a more intuitive and robust interface for the upcoming top-level FlowController. Key changes include: - The `FlowRegistryClient` interface is renamed to `FlowRegistryDataPlane` to more accurately reflect its role in the high-throughput request path. - The `FlowRegistryAdmin` interface is renamed to `FlowRegistryObserver` to clarify its read-only, observational nature. - The `ActiveFlowConnection.Shards()` method is renamed to `ActiveFlowConnection.ActiveShards()` to make it explicit that it returns only active, schedulable shards. This removes ambiguity for the distributor logic. - `ShardStats` is enriched with `ID` and `IsActive` fields, providing consumers with more context about the shard's state at the time the snapshot was taken. - The registry implementation has been updated to match these new contract definitions.
This commit refactors the `ShardProcessor` to function as a stateful worker managed by a higher-level supervisor. This is a preparatory step for the introduction of the new top-level `FlowController`. The public API of the processor is changed from a direct `Enqueue` method to a more sophisticated, channel-based submission model with `Submit` (non-blocking) and `SubmitOrBlock` (blocking). This decouples the producer from the processor's main loop, enabling better backpressure signals and higher throughput. Key changes include: - Introduction of `Submit` and `SubmitOrBlock` for asynchronous request handoff. - `FlowItem`'s finalization logic is improved to be more robust and channel-based. - Error handling within the dispatch cycle is refactored (no logic change) to be more clear about how it promotes work conservation by isolating failures to a single priority band.
This commit introduces the `FlowController`, a high-throughput, sharded supervisor that orchestrates a pool of stateful `ShardProcessor` workers. This new component is the central processing engine of the Flow Control system, implementing a "supervisor-worker" pattern. Key features of the `FlowController` include: - Supervisor-Worker Architecture: Acts as a stateless supervisor, managing the lifecycle of stateful `ShardProcessor` workers. It includes a reconciliation loop to garbage-collect workers for stale shards. - Flow-Aware Load Balancing: Implements a "Join-Shortest-Queue-by-Bytes" (JSQ-Bytes) algorithm to distribute incoming requests to the least-loaded worker, promoting emergent fairness. - Synchronous API: Exposes a blocking `EnqueueAndWait` method, which simplifies client integration (e.g., with Envoy `ext_proc`) and provides direct backpressure. - Lazy Worker Initialization: Workers are created on-demand when a shard shard first becomes active to conserve resources and reduce contention on the hot path. - Configuration: A new `Config` object allows for tuning parameters like TTLs, buffer sizes, and reconciliation intervals.
This commit updates documentation and code comments across various framework components to align with the concepts and architecture introduced by the `FlowController`. Key changes include: - FCFS Policy: Clarified the distinction between "logical" and "physical" enqueue time and the behavioral trade-offs when pairing with different queue capabilities. - ListQueue: Expanded the documentation to explain its role as a high-performance, approximate FCFS queue in the context of the `FlowController`'s retry mechanics. - Request Types: Refined the comments for `QueueItemAccessor` to be more precise about the meaning of `EnqueueTime`.
7185020
to
5b2225a
Compare
5b2225a
to
57d248f
Compare
This commit refactors the `FlowController` to simplify its startup and shutdown lifecycle, making it more robust and easier to reason about. It also incorporates several smaller improvements based on reviewer feedback. The primary change addresses a complex lifecycle implementation that used an `atomic.Bool` (`isRunning`) and a `ready` channel to manage state. Key changes: - **Simplified Lifecycle:** The controller's lifecycle is now tied directly to a `context` passed into `NewFlowController`. The `Run` method has been unexported, and the main `run` loop is started as a goroutine from the constructor. This eliminates the `ready` channel and `isRunning` flag in addition to simplifying the interface for callers. - **Robust Worker Creation:** The `getOrStartWorker` logic has been improved to ensure that in a race to create a worker, the "losing" goroutine correctly cleans up its resources and does not start a redundant processor. This fixes a bug where the losing worker would evict all items from its queues on shutdown which were shared instances with the winning worker resulting in premature request finalization. - **Comment Reduction:** The extensive explanatory comments in `distributeRequest` have been condensed to be more concise while retaining the essential details of the algorithm. - **Minor Cleanups:** - The initial, unnecessary call to `reconcileProcessors()` at startup has been removed. - Error messages have been clarified (e.g., "acquire lease" instead of "establish connection"). - A typed error for nil requests was replaced with a standard `errors.New`.
57d248f
to
16bc129
Compare
/lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: ahg-g, LukeAVanDrie The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
What type of PR is this?
/kind feature
What this PR does / why we need it:
This PR introduces the
FlowController
, a high-throughput, sharded supervisor that acts as the central processing engine for the Flow Control system. It completes the work for the Flow Control MVP (sans wiring into the request lifecycle) by adding the top-level request distributor.The controller implements a supervisor-worker pattern, managing the lifecycle of stateful
ShardProcessor
workers. It distributes incoming requests using a flow-aware "Join-Shortest-Queue-by-Bytes" (JSQ-Bytes) algorithm to balance load across workers and promote emergent fairness.Guided Review:
To make this large but atomic change easier to review, it has been organized into four distinct, logical commits. It is highly recommended to review this PR commit-by-commit.
eb049100
: feat(flowcontrol): Refactor FlowRegistry contractsFlowRegistryClient
->FlowRegistryDataPlane
) and makes theActiveFlowConnection
contract more explicit by renamingShards()
toActiveShards()
.d4b18535
: refactor: Adapt ShardProcessor to a worker roleShardProcessor
into a stateful worker. The public API is changed from a directEnqueue
method to a channel-basedSubmit
/SubmitOrBlock
model to decouple it from the producer and enable better backpressure signaling.99dedfb1
: feat: Introduce the FlowController supervisorFlowController
itself. It includes the worker lifecycle management, a reconciliation loop to garbage-collect stale workers, and the JSQ-Bytes distribution logic.59e04f75
: docs: Update comments to align with FlowControllerWhich issue(s) this PR fixes:
Tracks #674
Not fixed until final wiring and benchmarking. Consequently, I will add the release note in the wiring PR.
Does this PR introduce a user-facing change?:
cc: @rahulgurnani