-
Notifications
You must be signed in to change notification settings - Fork 135
[WIP] feat(flowcontrol): Implement the FlowRegistry #1319
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for gateway-api-inference-extension ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
Hi @LukeAVanDrie. Thanks for your PR. I'm waiting for a kubernetes-sigs member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
/assign @kfswain |
This commit introduces the complete, concrete implementation of the `FlowRegistry`, the stateful control plane for the flow control system. It is responsible for managing the lifecycle of all flows, queues, and shards. Key features of this implementation include: - An actor-based, serialized event loop that processes all state changes to ensure correctness and eliminate race conditions in the control plane. - A robust garbage collection system for idle flows and drained components, using a "Trust but Verify" pattern to safely handle races between the data path and control plane. - A well-defined component lifecycle (Active, Draining, Drained) with atomic state transitions and exactly-once edge signaling. - A sharded architecture where the `FlowRegistry` orchestrates the `registryShard` data plane slices.
6b26721
to
3d6ed72
Compare
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: LukeAVanDrie The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/ok-to-test |
@LukeAVanDrie: The following test failed, say
Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
s.generation++ // Invalidate any pending GC timers for the old generation. | ||
|
||
// If priority did not change, there's nothing more to do. | ||
if oldPriority == spec.Priority { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the future: priority change could have been left out of this PR to simplify this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, this was to prepare for syncing registry state to our CRDs. Right now, priority is the only supported update story (as at the time, only criticality can change for a given flow identifier). Once we have a complete config API for the flow control system, we need to carefully role out support for other update scenarios.
This makes the current PR a lot more complex (as it also heavily influences the GC logic). Will keep this in mind in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would implement this as a delete then add flow rather than having the internal flow management support update.
However, the question in my mind is this: now that flow id is set by the requests and is decoupled from criticality definition, what happens if requests constantly use the same flow id across different criticalities?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great point, and it gets to the heart of a critical design trade-off between implementation simplicity and operational robustness. You're right that this PR's complexity is significantly driven by the need to support updates.
Let's break down the "update-in-place" vs. "delete-then-add" models.
Why Update-in-Place vs. Delete-then-Add:
A "delete-then-add" approach is indeed simpler from a state management perspective. However, it comes with a consequence that is unacceptable for this system: it would lose all in-flight requests.
When a logical flow is "deleted," all its managedQueue
instances would be drained and evicted. If we immediately "add" it back at a new priority, any requests that were waiting in the old queue are gone. For a live system managing active traffic, losing work during a simple configuration change (like adjusting a priority) would be a critical failure.
The core principle driving the current design is work conservation. The "update-in-place" model, with its graceful draining mechanism, guarantees that no enqueued requests are ever lost during a priority update. They continue to be processed at their original priority until the old queue is empty. The complexity of the update logic, the drainingQueuesEmptyOnShards
map, and the generation counters are all in service of this single, non-negotiable requirement.
Handling Priority Thrashing:
Your second question—what happens when a flow's priority changes rapidly—is the perfect stress test for this design. The current implementation is not only robust to this "thrashing" but is explicitly optimized for it.
Consider this scenario: flow F
changes P1 -> P2
, then immediately back P1 <- P2
.
P1 -> P2
:
- The
managedQueue
at priorityP1
(Q1
) is marked asDraining
. It stops accepting new items. - A new
managedQueue
at priorityP2
(Q2
) is created and becomesActive
.
P2 -> P1
(beforeQ1
is empty):
- The
flowState.update
function is called again. It checks if a queue for the new target priority (P1
) already exists in a draining state. - It finds
Q1
in thedrainingQueuesEmptyOnShards
map. - Instead of creating a new queue, it reactivates
Q1
, transitioning it back toActive
. This is a highly efficient operation that avoids any new memory allocations. - Simultaneously,
Q2
is marked asDraining
.
Note that the controller worker (shardProcessor
) that operates on this slice of the registry state only routes new requests to the currently Active
flow queue instance throughout this process.
While I absolutely agree with the philosophy of keeping PRs simple, the update logic's complexity is essential. It's the price for guaranteeing zero request loss and providing efficient, robust handling of dynamic configuration changes, which are core requirements for this system.
Clarification on the Flow Control Data Model:
This brings up an important clarification on the system's core data model, which underpins this whole design. In this system, a logical flow is uniquely identified by its ID and is defined by its entire types.FlowSpecification
, which includes a single, authoritative priority level. A flow, as a managed entity, is at a specific priority relative to all other flows. This is why the system has the concept of a single 'active' queue per flow; it's the queue that corresponds to the flow's current, singular priority.
The alternative model, where a single flowID
could map to multiple active priority queues simultaneously, is a valid but fundamentally different architecture. It would treat flowID
as more of a grouping label and priority
(criticality) as a request-level attribute. The current model was chosen because its primary job is to manage contention between different workloads (flows). Assigning a single priority to each workload is the mechanism for that, while the intra-flow and inter-flow policies handle fairness within a given priority band.
In short, whatever mechanism binds the flow ID set in the request header to its flow configuration must be a sticky binding. Priority (criticality) must be defined at the flow level, not the request level. While the criticality can change (e.g., if a config map updates) dynamically (as supported by the update story in the registry), it must still be a 1:1 mapping to a flow ID, not a request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kfswain Just to explicitly loop you in on this point, as it's foundational: the design relies on the core principle that a flow ID has a sticky, 1:1 mapping to a single priority (criticality) at the flow-level, not the request-level. It's critical that we're all aligned on this model. Please let me know if you have any different expectations here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline, our current direction will work. Will reevaluate as we implement Phase 2 of API changes
// and the actual queue instances (`managedQueue`) for its assigned partition. It provides a read-optimized, operational | ||
// view for a single `controller.FlowController` worker. | ||
// | ||
// The registryShard is deliberately kept simple regarding coordination logic; it relies on the parent `FlowRegistry` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why are we calling it a registry shard instead of just shard? not asking to change anything now (please don't), I am just wondering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great question, and it's a trade-off I considered. You're absolutely right that the idiomatic Go convention would be to name it just shard
within the registry
package. My initial thinking for registryShard
was to create a very explicit link to the public contracts.RegistryShard
interface. It prioritizes cross-package clarity and discoverability (especially when coming from the controller
package where this dependency is used) over the local idiom. That said, I see the value in both conventions and I'm not strongly opinionated on which is better here. I'm happy to align with whatever you feel is best for the project's long-term consistency. If you prefer the more idiomatic shard
, it's a simple find-and-replace for me to make.
@@ -138,59 +150,37 @@ func (mq *managedQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAcc | |||
// items. | |||
func (mq *managedQueue) Cleanup(predicate framework.PredicateFunc) (cleanedItems []types.QueueItemAccessor, err error) { | |||
cleanedItems, err = mq.queue.Cleanup(predicate) | |||
if err != nil || len(cleanedItems) == 0 { | |||
return cleanedItems, err | |||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we have clean and drain? semantically they seem the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's an excellent question, and it highlights a key architectural decision for ensuring the system is robust and extensible. On the surface, they seem similar because Cleanup
could technically implement Drain
's behavior with a predicate that always returns true
. However, they serve two fundamentally different purposes, and separating them is crucial for performance and, more importantly, for enabling future features like infallible atomic migrations.
1. Functional and Performance Differences:
Cleanup(predicate)
is for selective, conditional removal. Its primary use case today is garbage collecting expired items, which requires iterating over the queue's contents. It's inherently anO(N)
operation (at best).Drain()
is for unconditional, total removal. Its primary use case is to atomically empty the entire queue. For many queue implementations (like a slice-based one), this can be a highly efficient O(1) operation (e.g., by swapping the internal slice with a new empty one).
2. Architectural Purpose:
The most critical reason for Drain's existence is to support infallible, atomic queue migrations. My long-term plan is to support dynamic updates to a flow's IntraFlowDispatchPolicy
. If a policy update requires changing the underlying queue's comparator (e.g., switching from FCFS
to a priority-based policy), we must atomically migrate all items from the old queue to a new one. The process would be:
items := oldQueue.Drain()
for _, item := range items {
newQueue.Add(item)
}
To make this migration bulletproof and synchronous across all shards (avoiding any and all consistency issues), I intend to make the Add
and Drain
methods infallible by removing their error return values from the frameowork.SafeQueue
contract.
By having a separate Drain
method, we can make this specific, critical migration operation infallible without sacrificing the error return on Cleanup
. The Cleanup
contract still benefits from being fallible, as it performs a more complex operation that could conceivably encounter issues (I have justification for this trust/fallibility model, but that digresses from the registry implementation choices).
In summary: Cleanup
is for fallible, conditional GC, while Drain
is the primitive for infallible, atomic migration. Separating them keeps the contracts clean and unlocks a much more robust strategy for future dynamic updates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, my concern is that at day 1 we are implementing support for extreme flexibility that may never be needed. I am not against having a future proof design, we should be open for extension, but closed for modification (OCP), but it seems we are not only designing and but also executing for the future...
// CRITICAL: Check if the queue is *already* empty at the moment it's marked as draining (or if it was already | ||
// draining and empty). If so, we must immediately attempt the transition to Drained to ensure timely GC. | ||
// This handles the race where the queue becomes empty just before or during being marked draining. | ||
if mq.Len() == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to do this here? the transition from draining to drained should already be handled when removing the last item.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are absolutely right that propagateStatsDelta
(triggered by removing the last item) handles the Draining
-> Drained
transition in the normal case. This check, however, is for the specific edge case where the queue becomes empty before it is marked as Draining
.
Here is the exact sequence of events that would lead to a stranded queue instance without this check:
T=0
: State: A queue for flowF
at prioritypCritical
isActive
and contains 1 item.T=1
: Last Item Removed: The last item is removed from thepCritical
queue.
- Inside
propagateStatsDelta
, the logic seesnewLen == 0
while the status is stillActive
. - It correctly signals
QueueBecameEmpty
, which tells theFlowRegistry
to start the slow, inactivity-based GC timer for the entire flow. - Crucially, it does not signal
QueueBecameDrained
, because the queue is not yet in theDraining
state.
T=2
: Priority Change: ARegisterOrUpdateFlow
call immediately changes flowF
's priority topStandard
.
- This triggers a call to
markAsDraining()
on the now-empty queue instance atpCritical
.
T=3
: The Problem: At this point, thepCritical
queue is nowDraining
but is already empty. Because no more requests will be enqueued (it's draining) and no more requests can be removed (it's empty), thepropagateStatsDelta
method will never be called on it again.
Without the check inside markAsDraining
, this specific queue instance would be stranded. It has no way to signal QueueBecameDrained
to trigger its own immediate garbage collection. While it would eventually be cleaned up by the slow flow-level inactivity timer, this check ensures the much more timely and correct GC path is taken.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This highlights a concern I have in the current events based design, which is the potential of missing events in corner cases. Do we really need to do it this way vs having a GC thread kicking in every minute to check which flows have been empty for a while and delete them? This wouldn't require timers, just timestamps of last usage.
// start begins a new GC timer for a given flow and generation. If a timer already exists for the flow, it is implicitly | ||
// stopped and replaced. This is the desired behavior, as a new call to start a timer for a flow (e.g., because it just | ||
// became idle) should always supersede any previous timer. | ||
func (gc *gcTracker) start(flowID string, generation uint64, timeout time.Duration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expected this to be called to reset the timer when adding a new item to the flow's queue, I can't find that. If the caller logic is not part of this Pr, why do we have this file in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no confusion, I missed the registry.go file that was collapsed and I didn't see during the review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks for clarifying!
You've hit on a really key part of the design though, so I'll add a more detailed explanation here for the benefit of anyone else reading the PR in the future.
The system's architecture relies on a strict decoupling of the high-performance data path from the stateful control plane. Instead of having the managedQueue
(data path) directly manipulate the GC timer, it uses an asynchronous, event-driven approach. Here's how it works:
[Queue on Data Path] --sends signal--> [Event Channel] --processed by--> [Registry on Control Plane]
- The Queue's Role: When an item is added to an empty active queue, the
managedQueue
's only job is to do its work and send a single, fire-and-forgetQueueBecameNonEmpty
signal to theFlowRegistry
's central event channel. It remains fast and lock-free, with no knowledge of the complex GC logic. - The Registry's Role: The
FlowRegistry
's main event loop processes this signal. It then calls the centralizedevaluateFlowGCStateLocked
function. This function is the single source of truth for all GC decisions. It looks at the flow's state across all active shards and decides whether to stop the timer by callinggc.stop()
.
This design is intentional and critical for two reasons:
- Performance: It keeps the hot data path (adding/removing from queues) free of locks and complex, potentially slow logic.
- Correctness: It prevents race conditions. A logical flow exists across multiple shards, so only the central
FlowRegistry
can safely determine if the flow is truly idle or active on a global level. Centralizing thestart/stop
logic inevaluateFlowGCStateLocked
ensures this decision is made atomically and correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channel based approach is more scalable, the question is what is the rate of creating and delete flows we are designing for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we are designing the system to handle a high rate of create/update/delete of flows, and this is a major source of complexity in this PR; I am not sure this is necessary at this point, we don't have the requirements for it yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for raising this, it's a critical point. I'd like to clarify a misunderstanding that my design has created: this complexity is not to handle a high rate of flow changes. It is the minimum complexity required to handle even a single change correctly, safely, and without data loss in a live, concurrent system.
You're right that we don't have a requirement for frequent updates; in fact, the system is designed on the assumption that these changes are infrequent. The complexity you're seeing is driven entirely by three foundational, non-negotiable requirements for any production-grade system:
1. Work Conservation (Requires the update
logic)
The alternative to a complex update
is a simple delete-then-add
. However, a delete-then-add
operation would lose all in-flight requests for a flow during a simple priority change. The entire "graceful draining" mechanism exists solely to satisfy the core requirement: "Do not lose user work during a configuration change." This is necessary even if updates happen only once a day.
2. Resource Safety (Requires the GC logic)
Without garbage collection, every flow ever registered would exist in memory forever, leading to an unbounded memory leak. The GC logic—with its timers and eventual consistency—is the minimum required to safely reclaim resources without accidentally deleting an active flow that is only temporarily idle.
3. Concurrency Safety (Requires the Actor Model and Events)
All these changes happen while many FlowController
workers (paired 1:1 with each registry shard) are concurrently reading from and modifying the state (via atomic stats). A simpler implementation without the Actor model, event channel, and generation counters would be riddled with subtle race conditions or high lock contention. This complexity is the cost of ensuring that a flow update is an atomic, consistent operation across all shards.
This PR introduces the necessary machinery to guarantee correctness and safety. While this machinery is also designed to be scalable, its primary purpose is to provide a strong foundation, even for infrequent control plane operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On update: the question is whether we need it in the first place given our discussion on the data model, if a flow is a grouping mechanism within a band, and it can be exist across bands, then there is no need for migration.
On GC, we need garbage collection for sure (I proposed that if you recall, so naturally I agree it is needed), but GC can be implemented differently. We don't need to be super accurate with our tracking. We don't need timers and an event drive system to handle that, here is another approach: for each flow we maintain a timestamp of the last time it was active, a gc goroutine would periodically check all last active timestamps and GC the ones that were not active for a period of time, this will require using locks, but we should still be able to do it efficiently.
I am mainly trying to explore ways to simplify the design, but I will leave the decision to you on how to move forward since this is already implemented ...
} | ||
|
||
// We use AfterFunc which works efficiently with both `RealClock` and `FakeClock` (for tests). | ||
timer := gc.clock.AfterFunc(timeout, func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a potential (ephemeral) goroutine leak here.
Cause: The gcTracker
uses time.AfterFunc
, which creates a new time.Timer
and an associated goroutine for every timer started. While Stop()
is called, the Go runtime doesn't guarantee the immediate cleanup of the underlying resources if the timer has already fired.
Impact: This is a very minor performance concern that only matters at O(hundreds of thousands) of flows. If the system creates and destroys tons of short-lived flows, this could lead to a build-up of dormant timer-related goroutines.
Solution: For extreme hardening, we can consider replacing the one-goroutine-per-timer model with a single "timer wheel" (circular buffer) or a min-heap-based scheduler that manages all pending expirations in one goroutine.
For most practical purposes, the current implementation is simpler and likely sufficient though.
I was wondering if this is worth documenting somewhere (GitHub issue, code comment, etc.) or if we just let this slide. It really depends on how we anticipate users using our request header for flows. This is just another source of goroutine bloat on top of the existing one-routine-per-request constraint we are already working under.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you'd feel most comfortable, you can document this as a potential high usage problem. But given that inference traffic is currently much lower volume, I think we can avoid optimizing here until we need to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually a request rate times the amount of time the EPP is active problem, which might be significant.
totalByteSize atomic.Uint64 | ||
totalLen atomic.Uint64 | ||
// perPriorityBandStats stores *bandStats, keyed by priority (uint). | ||
perPriorityBandStats sync.Map |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This map is populated once at initialization and then only read from in the Stats()
method.
We could consider replacing with a standard map protected by the registry's mu lock (or an RWMutex for read access) which is more conventional and potentially more performant as sync.Map
is optimized for a different access pattern (concurrent reads and writes).
This would simplify the code a bit as well.
The only writes we are concerned with are key additions and deletions. The values are already atomic.Uint64
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This map is populated once at initialization and then only read from in the Stats() method.
The only writes we are concerned with are key additions and deletions. The values are already atomic.Uint64.
I don't understand, is it written to a runtime or not? Adding keys to a map while someone else is reading it is a way to get a panic.
// without blocking the data path, but consumes more memory. | ||
// | ||
// Optional: Defaults to 4096. | ||
EventChannelBufferSize uint32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buffer size is defaulted to 4096. This value was not determined through load testing; I just guessed. In a "thundering herd" scenario where many thousands of queues change state simultaneously, this buffer could fill, causing latency on the request path. This is the correct behavior, but it's worth ensuring the default is sized appropriately for common workloads.
Any suggestions for configuration tuning? I could make the default a bit higher to be safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't expect that we will initially have cases for high flow churn.
} | ||
|
||
// Perform compatibility check. (This check is also done during config validation, but repeated here defensively). | ||
if err := validateBandCompatibility(*bandConfig); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We repeat a compatibility check (validateBandCompatibility
) defensively that is also performed during initial config validation.
The system's invariants, protected by the control plane lock, should prevent an invalid configuration from being used after startup. If we trust these invariants, this redundant check could be removed to slightly streamline the flow registration path. Thoughts?
// | ||
// (See package documentation in `doc.go` for the full overview of the Architecture and Concurrency Model) | ||
type FlowRegistry struct { | ||
config *Config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Config
is partitioned and distributed to shards. The FlowRegistry
holds the master config, and shards hold pointers to their partitioned version.
If we ever want to support configuration hot-reloading we could consider treating the Config
object as deeply immutable after its initial validation. The FlowRegistry
could hold its master config within a sync/atomic.Value
. This would provide a clear, thread-safe mechanism for updating the entire configuration atomically.; however, the current locking model is sufficient for the present design.
Is this a story we would ever want to support? There are some cascading impacts from changing the config. E.g., if you change the default inter/intra flow policies, how should we treat existing flow instances? This feels like an indirect way of calling RegisterOrUpdateFlow
. I can see some cases where modifying the capacity limits, event buffer size, etc. could be useful and non-disruptive on the fly though.
// This is safe because scale-down always removes shards from the end of the list, and the `allShards` slice is | ||
// ordered with active shards first. | ||
queues = queues[:currentTotalShardCount] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if currentTotalShardCount changes again by the time we get here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. You're right to scrutinize this section for race conditions.
To answer your direct question: Yes, the queues = queues[:currentTotalShardCount]
operation is guaranteed to be safe. The entire "Phase 2" ("Commit") block is under the fr.mu
write lock, which serializes this operation against any concurrent calls to UpdateShardCount
.
Architectural Trade-off: The Prepare-Commit Optimization
The reason the code is structured this way—with a complex revalidation step inside the lock—is due to a "prepare-commit" optimization. We prepare the expensive plugin and queue instances outside the critical section to minimize lock contention.
Since you're looking at this, I wanted to proactively get your thoughts on a complexity vs. performance trade-off I made here. The logic to handle a shard count change during the "prepare" phase is arguably the most complex part of this function.
I see three potential implementation strategies:
- Current Implementation (Optimized): Prepare outside the lock, then revalidate and patch the prepared data inside the lock.
- Pro: Highest performance; avoids re-running preparation during a race.
- Con: Most complex logic; hardest to read and maintain.
- Retry Loop (Simplified): Wrap the prepare-commit logic in a loop. If the shard count changes, the entire operation is retried from the beginning.
- Pro: Dramatically simplifies the logic inside the critical section. No more slice patching.
- Con: In the rare event of a race, we re-run the (potentially expensive) preparation phase.
- Full Lock (Simplest): Move the entire prepare-commit logic under the registry lock.
- Pro: Simplest possible logic; removes the race condition entirely.
- Con: Lowest performance; holds the registry's global lock for the longest duration, which could become a bottleneck.
My preference leans toward option 2. Shard scaling is likely to be an infrequent, operator-driven event compared to flow registration. The performance cost of re-running the preparation phase in the rare event of a race seems like an acceptable trade-off for the significant gain in code simplicity and long-term maintainability.
What are your thoughts on this trade-off? I'm happy to refactor to the simpler retry loop if you agree it's a better balance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, it is fine, I missed the lock in the middle
What are your thoughts on this trade-off?
I would start simplest, evaluate and then optimize; but since you did this already, I don't recommend changing it.
} | ||
|
||
for i, shard := range allShards { | ||
shard.synchronizeFlow(spec, policy, queues[i]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name is not intuitive, addOrUpdateFlow
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great point. I agree that synchronizeFlow
isn't as intuitive as it could be, and I appreciate the suggestion of addOrUpdateFlow
.
My hesitation with addOrUpdateFlow
is that it feels a bit like a simple CRUD operation, and doesn't fully capture the more complex state transitions the function handles, like reactivating a draining queue. My original idea was reconcileFlow
, but I rejected it because "Reconcile" has a very specific, loaded meaning in the context of Kubernetes controllers.
What do you think of a third option: applyFlowSpec
?
The verb "Apply" feels like a good fit here. It's well-understood to mean "take this specification and make the live state match," which is exactly what this infallible function does. It seems to strike a better balance between being intuitive and being architecturally precise than either synchronizeFlow
or addOrUpdateFlow
.
I'm happy to go with addOrUpdateFlow
if you still feel it's the clearest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO the function name should focus on capturing the what not the how; but I don't want us to spend time changing this now.
shard.synchronizeFlow(spec, policy, queues[i]) | ||
} | ||
|
||
// If this was a priority change, attempt to GC the newly-draining queue immediately if it's already empty. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this if the default GC mechanism will clean this up anyway? this looks out of place here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's an excellent question—in short, no we do not need this. You are correct that this is not a strict correctness issue—the default asynchronous GC mechanism will eventually clean up this queue. The justification for placing this check here is an efficiency optimization that also provides stronger transactional consistency.
The core argument is this: The final "verify" step of any destructive GC action requires acquiring the registry's lock (fr.mu), so we should leverage the lock we are already holding during this update.
Let's compare the two cleanup paths:
Path 1: Relying on Default Async GC (without this check)
RegisterOrUpdateFlow
completes the priority change and releasesfr.mu
.- One or more
QueueBecameDrained
signals are processed asynchronously by theFlowRegistry
's event loop. - The event handler must re-acquire
fr.mu
to safely check the global state. - It calls
garbageCollectDrainedQueueLocked
to perform the "Trust but Verify" check. - The handler then releases
fr.mu
.
This path involves a full asynchronous cycle, including a potential event channel delay, a context switch, and a completely separate lock acquisition.
Path 2: Using the Explicit Check (the current code)
- Inside
RegisterOrUpdateFlow
, while still holdingfr.mu
, we callgarbageCollectDrainedQueueLocked
. - It performs the "Trust but Verify" check using the lock we already have.
RegisterOrUpdateFlow
completes and releasesfr.mu
.
By performing the check here, we make the cleanup part of the same synchronous, atomic transaction as the update itself. We avoid the overhead of the async path entirely for this common case.
While it does mix some GC logic into this function (which has some code smell), I believe the performance gain and the benefit of completing the transaction more predictably justify the trade-off. If you feel that simplifying this function is the better long-term choice, I am happy to remove the explicit check and rely purely on the async path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This highly depends on the flows rate of change, it seems we are assuming it to be high, why is that?
s.generation++ // Invalidate any pending GC timers for the old generation. | ||
|
||
// If priority did not change, there's nothing more to do. | ||
if oldPriority == spec.Priority { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would implement this as a delete then add flow rather than having the internal flow management support update.
However, the question in my mind is this: now that flow id is set by the requests and is decoupled from criticality definition, what happens if requests constantly use the same flow id across different criticalities?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still reviewing, I just have some comments that have been hanging since last night
// | ||
// - Registered: A logical flow is Registered when it is known to the `FlowRegistry`. It has exactly one Active | ||
// instance across all priority bands and zero or more Draining instances. | ||
// - Active: A specific instance of a flow within a priority band is Active if it is the designated target for all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how can only a single flow within a priority band accept new enqueues? This reads like if 2 'Critical' requests came in, from different 'flow' identities, then they would both end up in the same flow
} | ||
|
||
// We use AfterFunc which works efficiently with both `RealClock` and `FakeClock` (for tests). | ||
timer := gc.clock.AfterFunc(timeout, func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you'd feel most comfortable, you can document this as a potential high usage problem. But given that inference traffic is currently much lower volume, I think we can avoid optimizing here until we need to.
} | ||
|
||
// --- Step 3: ACT (Destruction) --- | ||
fr.logger.Info("All shards empty for draining queue, triggering garbage collection", "flowID", flowID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already discussed offline, but this comment is here just a reminder to set logs to default or verbose for now, just so we dont forget. I'd eventually like to clean up our log volume per-level to something more reasonable, but for now lets just move to higher verbosity levels.
|
||
// garbageCollect removes a queue instance from the shard. | ||
// This must be called under the shard's write lock. | ||
func (s *registryShard) garbageCollect(flowID string, priority uint) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little interesting that the actual reference removal is done here rather than the file named GC. But that's a non-blocking comment.
An additional non-blocking comment; the use of the term 'Garbage collection' had me thinking we were actually going to do manual GC (I'm relieved we are not). I wonder if this is the right term. This is all internal implementation code, so I'm fine to run with this name as is. But just voicing this for now, if we hear additional voices share the same concern, we may want to think about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gc.go
is an independent utility. The actual garbage collect logic on the registry and shard requires acquiring their respective locks (fr.mu
and s.mu
) and being defined as methods on their structs. It felt worse to write a registryShard
or FlowRegistry
method outside the file the types are defined in.
currentTotalShardCount := len(fr.activeShards) + len(fr.drainingShards) | ||
|
||
// 4. Revalidation: Check if the shard count changed while we were preparing. | ||
if currentTotalShardCount != initialTotalShardCount { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest breaking this out into its own function, its only semi-related to RegisterOrUpdateFlow, so we can tuck it away
policy framework.IntraFlowDispatchPolicy, | ||
queues []framework.SafeQueue, | ||
) { | ||
flowID := spec.ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we just spec.ID + spec.Priority, all problems with priority per request should be solved, right? It's not clean but it can be improved upon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not the cleanest solution, but it keeps us from a heavy rework for now.
CC: @ahg-g @LukeAVanDrie
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we should update the ID to have priority appended, since ID is used everywhere.
Which is even easier, since the FlowSpec is only a flow control concept, we can make this name change in the director when we assemble the FlowSpecification, before flow control entry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Granted, this breaks fairness only across priority bands. Which honestly, is an open question with how we want to handle fairness cross-band anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Granted, this breaks fairness only across priority bands. Which honestly, is an open question with how we want to handle fairness cross-band anyway.
This was not a well-solved problem anyways, even with the previous model I was operating under.
I think this is the most pragmatic short-term solution, and it offers some simplifications for the registry. I will instrument this change.
If the code is an orchestrator, why is it called a registry and not simply FlowOrchestrator? |
This PR introduces the complete implementation of the
FlowRegistry
, which acts as the stateful control plane for the entire flow control system. This is a foundational architectural component that manages the lifecycle of all flows, queues, and policies, providing a sharded, concurrent-safe view of its state to theFlowController
workers.This work is a major step towards a robust, production-grade flow control engine.
This tracks #674
(WIP Note: The top-level
registry_test.go
with integration tests for theFlowRegistry
will be added in a subsequently.)Suggested Review Path
contracts/
directory to understand the high-level interfaces and responsibilities.pkg/epp/flowcontrol/registry/doc.go
to understand the design patterns (Actor Model, GC).Architectural Overview
The design introduces a clear separation between the control plane and the data plane, employing several patterns to ensure correctness, performance, and stability:
FlowRegistry
uses an actor-like pattern. A single background goroutine processes all state change events (e.g., GC timers, queue emptiness signals) from a channel. This serializes all mutations to the registry's core state, eliminating a significant class of race conditions.managedQueue
instances before committing to the destructive action.managedQueue
,registryShard
) now follow a formalActive
->Draining
->Drained
lifecycle, managed by atomic state transitions. This ensures that signals for garbage collection (e.g.,BecameDrained
) are generated reliably and exactly once.Key Components Introduced
pkg/epp/flowcontrol/registry/registry.go
: TheFlowRegistry
itself. The central orchestrator.pkg/epp/flowcontrol/registry/flowstate.go
: A newflowState
struct that acts as the eventually consistent cache for a flow's GC state.pkg/epp/flowcontrol/registry/gc.go
: A newgcTracker
for decoupled, generation-based management of GC timers.pkg/epp/flowcontrol/registry/lifecycle.go
: Defines the new component statuses, signals, and events that drive the state machine.Major Refinements
managedQueue
: Has been significantly enhanced to implement the atomic lifecycle, manage its own state transitions, and emit edge-triggered signals to the control plane.registryShard
: Now acts as a pure data plane slice, with its lifecycle managed by theFlowRegistry
. It propagates signals from its queues up to the control plane.contracts
: TheFlowRegistry
andRegistryShard
interfaces have been significantly expanded and documented to reflect the new architecture, including detailed explanations of system invariants and dynamic update strategies.Testing
This PR includes comprehensive unit tests for all new and modified components, including:
flowstate_test.go
: Tests the logic of the GC state cache.gc_test.go
: Tests the timer manager with aFakeClock
.managedqueue_test.go
: Includes extensive concurrency and race condition tests for the new lifecycle and signaling logic.shard_test.go
: Includes extensive concurrency and race condition tests for the shard's lifecycle management.