Skip to content

Commit c4a315a

Browse files
LukeAVanDriekfswain
authored andcommitted
refactor(registry): Replace event-driven GC with a lease-based lifecycle (kubernetes-sigs#1476)
* refactor: Reorder methods for logical grouping Reorganizes the methods in `registry.go` to improve readability and group related functionality. Public API methods are placed first, followed by administrative helpers, GC logic, and finally callbacks and statistics propagation. This is to minimize the delta for subsequent changes. No logical changes are included in this commit. * refactor: Simplify managedQueue to a decorator Removes the complex, stateful signaling mechanism from the `managedQueue`. Its sole responsibility is now to decorate a `SafeQueue` with atomic, strictly consistent statistics tracking. This change is the foundational first step in moving the registry from a complex, event-driven lifecycle model to a simpler, lease-based one. The now-unused event and lifecycle types are also removed. * refactor: Adapt shard to new managedQueue contract Updates the `registryShard` to work with the new, signal-free `managedQueue`. The shard's responsibility is simplified; it no longer needs to handle or propagate lifecycle signals from its queues. Its draining logic is now based on directly checking its aggregate length, removing the need for the `BecameDrained` signal. * feat: Replace events lifecycle with leases Replaces the registry's core lifecycle management system. The complex, event-driven, actor-based model is removed in favor of a simpler and more performant lease-based lifecycle. This commit introduces: - A new `WithConnection` client API that provides a safe, leak-proof entry point for managing flow leases via reference counting. - A per-flow `RWMutex` to provide fine-grained locking, allowing the garbage collector to operate on one flow without blocking others. - A simplified GC that periodically scans for flows with a zero lease count. Simultaneously, this commit removes the old machinery: - The central event-processing loop (`Run` is now just for GC). - The `RegisterOrUpdateFlow` administrative method. - The entire signaling and event-handling subsystem. * test: Overhaul tests for new lease-based design Updates all tests in the registry package to align with the new lease-based API and simplified component responsibilities. - `managedqueue_test` is rewritten to focus on stats propagation and invariants, removing all signal testing. - `shard_test` is updated to reflect its simpler role and new draining logic. - `registry_test` is completely overhauled to test the `WithConnection` API, JIT registration, and the new lease-based GC logic, using a fake clock for deterministic lifecycle testing.
1 parent 9d4ab28 commit c4a315a

File tree

14 files changed

+1676
-2588
lines changed

14 files changed

+1676
-2588
lines changed

pkg/epp/flowcontrol/contracts/errors.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,8 @@ var (
3333

3434
// ErrInvalidShardCount indicates that an invalid shard count was provided (e.g., zero or negative).
3535
ErrInvalidShardCount = errors.New("invalid shard count")
36+
37+
// ErrShardDraining indicates that an operation could not be completed because the target shard is in the process of
38+
// being gracefully drained. The caller should retry the operation on a different, Active shard.
39+
ErrShardDraining = errors.New("shard is draining")
3640
)

pkg/epp/flowcontrol/contracts/registry.go

Lines changed: 60 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -21,104 +21,84 @@ import (
2121
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
2222
)
2323

24-
// FlowRegistry is the complete interface for the global control plane. An implementation of this interface is the single
25-
// source of truth for all flow control state and configuration.
24+
// FlowRegistry is the complete interface for the global flow control plane.
25+
// It composes the client-facing data path interface and the administrative interface. A concrete implementation of this
26+
// interface is the single source of truth for all flow control state.
2627
//
27-
// # Conformance
28+
// # Conformance: Implementations MUST be goroutine-safe.
29+
//
30+
// # Flow Lifecycle
31+
//
32+
// A flow instance, identified by its immutable `types.FlowKey`, has a lease-based lifecycle managed by this interface.
33+
// Any implementation MUST adhere to this lifecycle:
2834
//
29-
// All methods MUST be goroutine-safe. Implementations are expected to perform complex updates (e.g.,
30-
// `RegisterOrUpdateFlow`) atomically.
35+
// 1. Lease Acquisition: A client calls Connect to acquire a lease. This signals that the flow is in use and protects
36+
// it from garbage collection. If the flow does not exist, it is created Just-In-Time (JIT).
37+
// 2. Active State: A flow is "Active" as long as its lease count is greater than zero.
38+
// 3. Lease Release: The client MUST call `Close()` on the returned `FlowConnection` to release the lease.
39+
// When the lease count drops to zero, the flow becomes "Idle".
40+
// 4. Garbage Collection: The implementation MUST automatically garbage collect a flow after it has remained
41+
// continuously Idle for a configurable duration.
3142
//
3243
// # System Invariants
3344
//
3445
// Concrete implementations MUST uphold the following invariants:
46+
//
3547
// 1. Shard Consistency: All configured priority bands and registered flow instances must exist on every Active shard.
36-
// Plugin instance types must be consistent for a given flow across all shards.
37-
// 2. Flow Instance Uniqueness: Each unique `types.FlowKey` (`ID` + `Priority`) corresponds to exactly one managed flow
38-
// instance.
39-
// 3. Capacity Partitioning: Global and per-band capacity limits must be uniformly partitioned across all Active
48+
// 2. Capacity Partitioning: Global and per-band capacity limits must be uniformly partitioned across all Active
4049
// shards.
41-
//
42-
// # Flow Lifecycle
43-
//
44-
// A flow instance (identified by its immutable `FlowKey`) has a simple lifecycle:
45-
//
46-
// - Registered: Known to the `FlowRegistry` via `RegisterOrUpdateFlow`.
47-
// - Idle: Queues are empty across all Active and Draining shards.
48-
// - Garbage Collected (Unregistered): The registry automatically garbage collects flows after they have remained Idle
49-
// for a configurable duration.
50-
//
51-
// # Shard Lifecycle
52-
//
53-
// When a shard is decommissioned, it is marked inactive (`IsActive() == false`) to prevent new enqueues. The shard
54-
// continues to drain and is deleted only after it is empty.
5550
type FlowRegistry interface {
51+
FlowRegistryClient
5652
FlowRegistryAdmin
57-
ShardProvider
5853
}
5954

6055
// FlowRegistryAdmin defines the administrative interface for the global control plane.
61-
//
62-
// # Dynamic Update Strategies
63-
//
64-
// The contract specifies behaviors for handling dynamic updates, prioritizing stability and correctness:
65-
//
66-
// - Immutable Flow Identity (`types.FlowKey`): The system treats the `FlowKey` (`ID` + `Priority`) as the immutable
67-
// identifier. Changing the priority of traffic requires registering a new `FlowKey`. The old flow instance is
68-
// automatically garbage collected when Idle. This design eliminates complex priority migration logic.
69-
//
70-
// - Graceful Draining (Shard Scale-Down): Decommissioned shards enter a Draining state. They stop accepting new
71-
// requests but continue to be processed for dispatch until empty.
72-
//
73-
// - Self-Balancing (Shard Scale-Up): When new shards are added, the `controller.FlowController`'s distribution logic
74-
// naturally utilizes them, funneling new requests to the less-loaded shards. Existing queued items are not
75-
// migrated.
7656
type FlowRegistryAdmin interface {
77-
// RegisterOrUpdateFlow handles the registration of a new flow instance or the update of an existing instance's
78-
// specification (for the same `types.FlowKey`). The operation is atomic across all shards.
79-
//
80-
// Since the `FlowKey` (including `Priority`) is immutable, this method cannot change a flow's priority.
81-
// To change priority, the caller should simply register the new `FlowKey`; the old flow instance will be
82-
// automatically garbage collected when it becomes Idle.
83-
//
84-
// Returns errors wrapping `ErrFlowIDEmpty`, `ErrPriorityBandNotFound`, or internal errors if plugin instantiation
85-
// fails.
86-
RegisterOrUpdateFlow(spec types.FlowSpecification) error
87-
88-
// UpdateShardCount dynamically adjusts the number of internal state shards.
89-
//
90-
// The implementation MUST atomically re-partition capacity allocations across all active shards.
91-
// Returns an error wrapping `ErrInvalidShardCount` if `n` is not positive.
92-
UpdateShardCount(n int) error
93-
9457
// Stats returns globally aggregated statistics for the entire `FlowRegistry`.
9558
Stats() AggregateStats
9659

97-
// ShardStats returns a slice of statistics, one for each internal shard. This provides visibility for debugging and
98-
// monitoring per-shard behavior (e.g., identifying hot or stuck shards).
60+
// ShardStats returns a slice of statistics, one for each internal shard.
9961
ShardStats() []ShardStats
10062
}
10163

102-
// ShardProvider defines the interface for discovering available shards.
103-
//
104-
// A "shard" is an internal, parallel execution unit that allows the `controller.FlowController`'s core dispatch logic
105-
// to be parallelized, preventing a CPU bottleneck at high request rates. The `FlowRegistry`'s state is sharded to
106-
// support this parallelism by reducing lock contention.
64+
// FlowRegistryClient defines the primary, client-facing interface for the registry.
65+
// This is the interface that the `controller.FlowController`'s data path depends upon.
66+
type FlowRegistryClient interface {
67+
// WithConnection manages a scoped, leased session for a given flow.
68+
// It is the primary and sole entry point for interacting with the data path.
69+
//
70+
// This method handles the entire lifecycle of a flow connection:
71+
// 1. Just-In-Time (JIT) Registration: If the flow for the given `types.FlowKey` does not exist, it is created and
72+
// registered automatically.
73+
// 2. Lease Acquisition: It acquires a lifecycle lease, protecting the flow from garbage collection.
74+
// 3. Callback Execution: It invokes the provided function `fn`, passing in a temporary `ActiveFlowConnection` handle.
75+
// 4. Guaranteed Lease Release: It ensures the lease is safely released when the callback function returns.
76+
//
77+
// This functional, callback-based approach makes resource leaks impossible, as the caller is not responsible for
78+
// manually closing the connection.
79+
//
80+
// Errors returned by the callback `fn` are propagated up.
81+
// Returns `ErrFlowIDEmpty` if the provided key has an empty ID.
82+
WithConnection(key types.FlowKey, fn func(conn ActiveFlowConnection) error) error
83+
}
84+
85+
// ActiveFlowConnection represents a handle to a temporary, leased session on a flow.
86+
// It provides a safe, scoped entry point to the registry's sharded data plane.
10787
//
108-
// Consumers MUST check `RegistryShard.IsActive()` before routing new work to a shard to avoid sending requests to a
109-
// Draining shard.
110-
type ShardProvider interface {
111-
// Shards returns a slice of accessors, one for each internal state shard (Active and Draining).
112-
// Callers should not modify the returned slice.
88+
// An `ActiveFlowConnection` instance is only valid for the duration of the `WithConnection` callback from which it was
89+
// received. Callers MUST NOT store a reference to this object or use it after the callback returns.
90+
// Its purpose is to ensure that any interaction with the flow's state (e.g., accessing its shards and queues) occurs
91+
// safely while the flow is guaranteed to be protected from garbage collection.
92+
type ActiveFlowConnection interface {
93+
// Shards returns a stable snapshot of accessors for all internal state shards (both Active and Draining).
94+
// Consumers MUST check `RegistryShard.IsActive()` before routing new work to a shard from this slice.
11395
Shards() []RegistryShard
11496
}
11597

116-
// RegistryShard defines the interface for accessing a specific slice (shard) of the `FlowRegistry's` state.
117-
// It provides a concurrent-safe view for `controller.FlowController` workers.
118-
//
119-
// # Conformance
98+
// RegistryShard defines the interface for a single slice (shard) of the `FlowRegistry`'s state.
99+
// A shard acts as an independent, parallel execution unit, allowing the system's dispatch logic to scale horizontally.
120100
//
121-
// All methods MUST be goroutine-safe.
101+
// # Conformance: Implementations MUST be goroutine-safe.
122102
type RegistryShard interface {
123103
// ID returns a unique identifier for this shard, which must remain stable for the shard's lifetime.
124104
ID() string
@@ -163,14 +143,16 @@ type RegistryShard interface {
163143
Stats() ShardStats
164144
}
165145

166-
// ManagedQueue defines the interface for a flow's queue instance on a specific shard.
167-
// It acts as a stateful decorator around an underlying `framework.SafeQueue`.
146+
// ManagedQueue defines the interface for a flow's queue on a specific shard.
147+
// It acts as a stateful decorator around an underlying `framework.SafeQueue`, augmenting it with statistics tracking.
168148
//
169149
// # Conformance
170150
//
171-
// - All methods MUST be goroutine-safe.
172-
// - All mutating methods (`Add()`, `Remove()`, etc.) MUST ensure that the underlying queue state and the statistics
173-
// (`Len`, `ByteSize`) are updated atomically relative to each other.
151+
// - Implementations MUST be goroutine-safe.
152+
// - All mutating methods MUST ensure that the underlying queue state and the public statistics (`Len`, `ByteSize`)
153+
// are updated as a single atomic transaction.
154+
// - The `Add` method MUST return an error wrapping `ErrShardDraining` if the queue instance belongs to a parent shard
155+
// that is no longer Active.
174156
type ManagedQueue interface {
175157
framework.SafeQueue
176158

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package registry
18+
19+
import (
20+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
21+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
22+
)
23+
24+
// connection is the concrete, un-exported implementation of the `contracts.ActiveFlowConnection` interface.
25+
// It is a temporary handle created for the duration of a single `WithConnection` call.
26+
type connection struct {
27+
registry *FlowRegistry
28+
key types.FlowKey
29+
}
30+
31+
var _ contracts.ActiveFlowConnection = &connection{}
32+
33+
// Shards returns a stable snapshot of accessors for all internal state shards.
34+
func (c *connection) Shards() []contracts.RegistryShard {
35+
c.registry.mu.RLock()
36+
defer c.registry.mu.RUnlock()
37+
38+
// Return a copy to ensure the caller cannot modify the registry's internal slice.
39+
shardsCopy := make([]contracts.RegistryShard, len(c.registry.allShards))
40+
for i, s := range c.registry.allShards {
41+
shardsCopy[i] = s
42+
}
43+
return shardsCopy
44+
}

pkg/epp/flowcontrol/registry/doc.go

Lines changed: 13 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -14,116 +14,23 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
// Package registry provides the concrete implementation of the `contracts.FlowRegistry`.
17+
// Package registry provides the concrete implementation of the `contracts.FlowRegistry` interface.
1818
//
19-
// As the stateful control plane, this package manages the lifecycle of all flows, queues, and policies. It provides a
20-
// sharded, concurrent-safe view of its state to the `controller.FlowController` workers, enabling scalable, parallel
21-
// request processing.
19+
// # Architecture: A Sharded, Concurrent Control Plane
2220
//
23-
// # Architecture: Composite, Sharded, and Separated Concerns
21+
// This package implements the flow control state machine using a sharded architecture to enable scalable, parallel
22+
// request processing. It separates the orchestration control plane from the request-processing data plane.
2423
//
25-
// The registry separates the control plane (orchestration) from the data plane (request processing state).
24+
// - `FlowRegistry`: The top-level orchestrator (Control Plane). It manages the lifecycle of all flows and shards,
25+
// handling registration, garbage collection, and scaling operations.
26+
// - `registryShard`: A slice of the data plane. It holds a partition of the total state and provides a
27+
// read-optimized, concurrent-safe view for a single `controller.FlowController` worker.
28+
// - `managedQueue`: A stateful decorator around a `framework.SafeQueue`. It is the fundamental unit of state,
29+
// responsible for atomically tracking statistics (e.g., length and byte size) and ensuring data consistency.
2630
//
27-
// - `FlowRegistry`: The Control Plane. The top-level orchestrator and single source of truth. It centralizes complex
28-
// operations: flow registration, garbage collection (GC) coordination, and shard scaling.
29-
//
30-
// - `registryShard`: The Data Plane Slice. A concurrent-safe "slice" of the registry's total state. It provides a
31-
// read-optimized view for `FlowController` workers.
32-
//
33-
// - `managedQueue`: The Stateful Decorator. A wrapper around a `framework.SafeQueue`. It augments the queue with
34-
// atomic statistics tracking and signaling state transitions to the control plane.
35-
//
36-
// # Data Flow and Interaction Model
37-
//
38-
// The data path (Enqueue/Dispatch) is optimized for minimal latency and maximum concurrency.
39-
//
40-
// Enqueue Path:
41-
// 1. The `FlowController`'s distributor selects an active `registryShard`.
42-
// 2. The distributor calls `shard.ManagedQueue(flowKey)` (acquires `RLock`).
43-
// 3. The distributor calls `managedQueue.Add(item)`.
44-
// 4. `managedQueue` atomically updates the queue and its stats and signals the control plane.
45-
//
46-
// Dispatch Path:
47-
// 1. A `FlowController` worker iterates over its assigned `registryShard`.
48-
// 2. The worker uses policies and accessors to select the next item (acquires `RLock`).
49-
// 3. The worker calls `managedQueue.Remove(handle)`.
50-
// 4. `managedQueue` atomically updates the queue and its stats and signals the control plane.
51-
//
52-
// # Concurrency Strategy: Multi-Tiered and Optimized
53-
//
54-
// The registry maximizes performance on the hot path while ensuring strict correctness for complex state transitions:
55-
//
56-
// - Serialized Control Plane (Actor Model): The `FlowRegistry` uses a single background goroutine to process all
57-
// state change events serially, eliminating race conditions in the control plane.
58-
//
59-
// - Sharding (Data Plane Parallelism): State is partitioned across multiple `registryShard` instances, allowing the
60-
// data path to scale linearly.
61-
//
62-
// - Lock-Free Data Path (Atomics): Statistics aggregation (Shard/Registry level) uses lock-free atomics.
63-
//
64-
// - Strict Consistency (Hybrid Locking): `managedQueue` uses a hybrid locking model (Mutex for writes, Atomics for
65-
// reads) to guarantee strict consistency between queue contents and statistics, which is required for GC
66-
// correctness.
31+
// # Concurrency Model
6732
//
33+
// The registry uses a multi-layered strategy to maximize performance on the hot path while ensuring correctness for
34+
// administrative tasks.
6835
// (See the `FlowRegistry` struct documentation for detailed locking rules).
69-
//
70-
// # Garbage Collection: The "Trust but Verify" Pattern
71-
//
72-
// The registry handles the race condition between asynchronous data path activity and synchronous GC. The control plane
73-
// maintains an eventually consistent cache (`flowState`).
74-
//
75-
// The registry uses a periodic, generational "Trust but Verify" pattern. It identifies candidate flows using the cache.
76-
// Before deletion, it performs a "Verify" step: it synchronously acquires write locks on ALL shards and queries the
77-
// ground truth (live queue counters). This provides strong consistency when needed.
78-
//
79-
// (See the `garbageCollectFlowLocked` function documentation for detailed steps).
80-
//
81-
// # Scalability Characteristics and Trade-offs
82-
//
83-
// The architecture prioritizes data path throughput and correctness, introducing specific trade-offs:
84-
//
85-
// - Data Path Throughput (Excellent): Scales linearly with the number of shards and benefits from lock-free
86-
// statistics updates.
87-
//
88-
// - GC Latency Impact (Trade-off): The GC "Verify" step requires locking all shards (O(N)). This briefly pauses the
89-
// data path. As the shard count (N) increases, this may impact P99 latency. This trade-off guarantees correctness.
90-
//
91-
// - Control Plane Responsiveness during Scale-Up (Trade-off): Scaling up requires synchronizing all existing flows
92-
// (M) onto the new shards (K). This O(M*K) operation occurs under the main control plane lock. If M is large, this
93-
// operation may block the control plane.
94-
//
95-
// # Event-Driven State Machine and Lifecycle Scenarios
96-
//
97-
// The system relies on atomic state transitions to generate reliable, edge-triggered signals. These signals are sent
98-
// reliably; if the event channel is full, the sender blocks, applying necessary backpressure to ensure no events are
99-
// lost, preventing state divergence.
100-
//
101-
// The following scenarios detail how the registry handles lifecycle events:
102-
//
103-
// New Flow Registration: A new flow instance `F1` (`FlowKey{ID: "A", Priority: 10}`) is registered.
104-
//
105-
// 1. `managedQueue` instances are created for `F1` on all shards.
106-
// 2. The `flowState` cache marks `F1` as Idle. If it remains Idle, it will eventually be garbage collected.
107-
//
108-
// Flow Activity/Inactivity:
109-
//
110-
// 1. When the first request for `F1` is enqueued, the queue signals `BecameNonEmpty`. The control plane marks `F1`
111-
// as Active, protecting it from GC.
112-
// 2. When the last request is dispatched globally, the queues signal `BecameEmpty`. The control plane updates the
113-
// cache, and `F1` is now considered Idle by the GC scanner.
114-
//
115-
// "Changing" Flow Priority: Traffic for `ID: "A"` needs to shift from `Priority: 10` to `Priority: 20`.
116-
//
117-
// 1. The caller registers a new flow instance, `F2` (`FlowKey{ID: "A", Priority: 20}`).
118-
// 2. The system treats `F1` and `F2` as independent entities (Immutable `FlowKey` design).
119-
// 3. As `F1` becomes Idle, it is automatically garbage collected. This achieves the outcome gracefully without complex
120-
// state migration logic.
121-
//
122-
// Shard Scaling:
123-
//
124-
// - Scale-Up: New shards are created and marked Active. Existing flows are synchronized onto the new shards.
125-
// Configuration is re-partitioned.
126-
// - Scale-Down: Targeted shards transition to Draining (stop accepting new work). Configuration is re-partitioned
127-
// across remaining active shards. When a Draining shard is empty, it signals `BecameDrained` and is removed by the
128-
// control plane.
12936
package registry

0 commit comments

Comments
 (0)