-
Notifications
You must be signed in to change notification settings - Fork 182
feat: Flow Control context refactor #1702
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: Flow Control context refactor #1702
Conversation
Introduces a more robust, thread-safe implementation of FlowItem to prevent race conditions during asynchronous request finalization. Key changes: - Replaces simple channel-based signaling with atomic.Pointer for `finalState` and `handle`. This provides safer, non-blocking access to the item's state from multiple goroutines (Controller & Processor). - Splits finalization logic into two idempotent methods: 1. `Finalize()`: For external, context-driven expirations. 2. `FinalizeWithOutcome()`: For synchronous, decision-based outcomes. - Adds comprehensive, table-driven unit tests to validate the new idempotency and outcome-inference logic.
Updates the ShardProcessor to align with the new, hardened FlowItem and its explicit lifecycle management. This change simplifies the processor's responsibilities: - The processor no longer infers item expiry. It now relies on the FlowItem's own final state, which is set externally by the controller for context-based expirations. - The background `runExpiryCleanup` goroutine is replaced with a `runCleanupSweep`. This new function periodically scans for and removes "zombie" items that were finalized externally, simplifying the processor's logic and improving separation of concerns. - The ownership contract is clarified. `Submit` and `SubmitOrBlock` now return an error on failure, indicating ownership remains with the caller. The processor only takes ownership on successful handoff.
This commit completes the refactoring of the request lifecycle by making context management explicit and adapting the FlowController to the new internal components. This is the final commit in a series: 1. Harden FlowItem for concurrent finalization. 2. Adapt ShardProcessor to new FlowItem lifecycle. 3. This commit: Introduce explicit context handling. Key changes: - The `FlowControlRequest` interface no longer includes `Context()`. - `FlowController.EnqueueAndWait` now accepts a `context.Context` as its first argument, making the request lifecycle explicit and caller-controlled. - The controller now actively monitors this context and will initiate an "asynchronous finalization" if the context expires after an item has been handed off to a processor. - Adds extensive integration and concurrency tests to validate the new end-to-end lifecycle management under contention.
✅ Deploy Preview for gateway-api-inference-extension ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: LukeAVanDrie The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Hi @LukeAVanDrie. Thanks for your PR. I'm waiting for a kubernetes-sigs member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
return types.ErrFlowControllerNotRunning | ||
} | ||
|
||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls add a comment that the default clause below makes this non-blocking
return types.ErrFlowControllerNotRunning | ||
} | ||
|
||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls add a comment that the absence of a default clause makes this blocking
) | ||
// --- External Finalization Check --- | ||
// Check if the item was finalized by the Controller while buffered in enqueueChan. | ||
if finalState := item.FinalState(); finalState != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it guaranteed that this check will not change before this function returns? if not, do we handle the case where a finalized item is "committed" (added to the queue below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it is not guaranteed. Finalization is a persistent race condition scenario in the actor model the processor uses.
However, we have a periodic cleanup loop that sweeps finalized items from the queues, reclaiming queue capacity (the request routine itself will be immediately unblocked on finalization--this is the core improvement this PR introduces).
This is a lightweight check we perform before major points of commitment (e.g., adding an item to a queue) to catch 99.99% of race condition cases. I don't litter it everywhere though as the cost of the race condition is negligible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not in favor of multiple checks, it complicates the code and causes confusion when debugging since there are multiple places where a request will get rejected for the same reason, do we really think doing this here will make a difference vs only relying on the core mechanism for handling this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me; removing these will also lower our testing surface. I tried to limit the checks to 3 places (2 in processor and 1 in controller) before critical points. They are not strictly necessary and should rarely be hit.
That being said, the check in enqueue is somewhat valuable since requests may sit in the enqueueChan
processing buffer for a bit before being committed to a queue. Of all the checks, this one has the highest likelihood of being reached when running at high load.
Also, the pre-distribution check in controller is still desirable for inspecting the flow control lifecycle context between retries though. Since I already had that select block, I thought it prudent to also inspect the request context there.
All three checks should be safe to remove though. Its a tradeoff of complexity vs responsiveness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's accept a small loss in responsiveness for removing each of those finalization checks as none of the checks are required for correctness.
I will add a metric to track delay between finalization and sweep. If this delay is larger than we are comfortable with at high load, we can add some of these back in very focused places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will leave it to your judgement which one is likely to be hit frequently and makes a difference to keep, but if you decide to keep any, please document that this is not the ultimate check.
finalState := item.finalState | ||
return true, finalState.Outcome, finalState.Err | ||
// Final check for external finalization right before dispatch. | ||
if finalState := removedItem.FinalState(); finalState != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question, is it guaranteed that if the item was not finalized, it will not flip to finalized before getting to FinalizeWithOutcome
call below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My response on the other thread also applies here.
Finalize/FinalizeWithOutcome is idempotent, so whatever condition occurs first will be ultimately reported over the Done
channel and calls to FinalState
.
Practically, this check is just to branch the log statements. It is less important than the one on the enqueue path. In the future, we may want to know if dispatchCycle
actually performed work (i.e., that it dispatched a valid HoL request). This would also help us distinguish this case. E.g,. per quanta of "unsaturated" service, dispatchCycle
can perform N dispatch operations.
} | ||
|
||
if effectiveTTL > 0 { | ||
reqCtx, cancel := context.WithDeadlineCause(ctx, enqueueTime.Add(effectiveTTL), types.ErrTTLExpired) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just so I understand, perviously we had a loop that periodically checks and rejects requests with expired TTL; now we added another one based using context with deadline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perviously we had a loop that periodically checks and rejects requests with expired TTL
Yes
now we added another one based using context with deadline
Not quite. We removed all responsibilities from the processor for identifying items with expired TLLs or contexts. It is now solely responsible for sweeping already finalized items. You can see the impact of this at the bottom of processor.go where we have deleted all the checkExpiry
related code.
TTL expiry (or external context expiry--whatever happens first) is now immediately detected in the controller when the context with deadline expires. Now there is 0 delay between context / TTL expiry and the EnqueueAndWait function unblocking the request routine.
The periodic cleanup loop now just sweeps "zombie" items to reclaim queue capacity. We have decoupled the cleanup loop from the request routine management.
// 2. Enter the distribution loop to find a home for the request. | ||
// This loop is responsible for retrying on `ErrShardDraining`. | ||
for { | ||
// Pre-distribution checks for immediate failure. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if either the parent or the request context get cancelled right after those checks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will then be finalized in the blocking select below that monitors async finalization signals (and the Done
channel).
We have some lightweight checks for more optimal behavior, but the system is resilient for any race conditions that slip through. This is similar justification as the finalization checks we do before points of commitment in processor.go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, I am not in favor of this approach.
// Benign race with the chosen `contracts.RegistryShard` becoming Draining post selection but before the item was | ||
// enqueued into its respective `contracts.ManagedQueue`. Simply try again. | ||
// Distribution was successful; ownership of the item has been transferred to a processor. | ||
// Now, we wait for the final outcome. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please mention explicitly that it is here where we block until the request is finally dispatched or cancelled.
} | ||
|
||
// We must create a fresh FlowItem on each attempt as finalization is per-lifecycle. | ||
item := internal.NewItem(req, effectiveTTL, enqueueTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If my understanding is correct, we now have two two mechanisms enforcing TTL; If so, why do we need both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We just have one (enforced by context deadline now). The periodic sweep only cleans up finalized items now. It is no longer responsible for "finalizing" them based on TTL durations or checking if the parent context was already cancelled.
What type of PR is this?
/kind feature
What this PR does / why we need it:
This PR introduces a fundamental refactoring of the
FlowController
'sEnqueueAndWait
path to ensure it correctly and immediately respects the Gocontext
contract, making its timeout and cancellation behavior predictable and robust.The Problem:
Previously, a caller to
EnqueueAndWait
could remain blocked long after its requestcontext
had expired (due to TTL or cancellation). TheFlowController
would successfully hand the request item to an internalShardProcessor
, but the caller's goroutine would then wait passively for the processor to report a final status.If the item was never dispatched (e.g., due to a full queue), it would only be evicted by a periodic background sweep (every M seconds). This meant a request with a N-second TTL could block the caller for N + M seconds, subverting caller expecations around context management on blocking methods and leading to stuck goroutines.
The Solution:
The new implementation fixes this by making the
FlowController
an active participant in the request lifecycle. The core of the change is in the newawaitFinalization
function:This
select
statement races the caller's context against the processor's work. Whichever finishes first wins. This guarantees thatEnqueueAndWait
unblocks immediately when the context is done, making the controller a well-behaved component that fully honors thecontext
contract.To enable this, the following internal refactoring was required:
FlowItem
: The internalFlowItem
was refactored to useatomic.Pointer
for its state, allowing for safe, race-free, and idempotent finalization by either the Controller or the Processor.ShardProcessor
: The processor's responsibility is now simpler. It no longer tracks expiry; it just acts on items and is notified of external cancellations via a periodic "zombie sweep".EnqueueAndWait
signature now requires acontext.Context
, making the lifecycle explicit to the caller.This holistic change makes the entire flow control system more resilient, predictable, and easier to reason about under high concurrency. It also provides a clear foundation for future distributed tracing.
This comes at a complexity tradeoff. The
FlowController
andFlowItem
necessarily become more complex while theShardProcessor
has reduced responsibilities.Which issue(s) this PR fixes:
Tracks #674
Does this PR introduce a user-facing change?: