Skip to content

Commit 614d7c5

Browse files
committed
feat: Replace events lifecycle with leases
Replaces the registry's core lifecycle management system. The complex, event-driven, actor-based model is removed in favor of a simpler and more performant lease-based lifecycle. This commit introduces: - A new `WithConnection` client API that provides a safe, leak-proof entry point for managing flow leases via reference counting. - A per-flow `RWMutex` to provide fine-grained locking, allowing the garbage collector to operate on one flow without blocking others. - A simplified GC that periodically scans for flows with a zero lease count. Simultaneously, this commit removes the old machinery: - The central event-processing loop (`Run` is now just for GC). - The `RegisterOrUpdateFlow` administrative method. - The entire signaling and event-handling subsystem.
1 parent e77cab5 commit 614d7c5

File tree

6 files changed

+421
-957
lines changed

6 files changed

+421
-957
lines changed

pkg/epp/flowcontrol/contracts/registry.go

Lines changed: 53 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -21,104 +21,84 @@ import (
2121
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
2222
)
2323

24-
// FlowRegistry is the complete interface for the global control plane. An implementation of this interface is the single
25-
// source of truth for all flow control state and configuration.
24+
// FlowRegistry is the complete interface for the global flow control plane.
25+
// It composes the client-facing data path interface and the administrative interface. A concrete implementation of this
26+
// interface is the single source of truth for all flow control state.
2627
//
27-
// # Conformance
28+
// # Conformance: Implementations MUST be goroutine-safe.
29+
//
30+
// # Flow Lifecycle
31+
//
32+
// A flow instance, identified by its immutable `types.FlowKey`, has a lease-based lifecycle managed by this interface.
33+
// Any implementation MUST adhere to this lifecycle:
2834
//
29-
// All methods MUST be goroutine-safe. Implementations are expected to perform complex updates (e.g.,
30-
// `RegisterOrUpdateFlow`) atomically.
35+
// 1. Lease Acquisition: A client calls Connect to acquire a lease. This signals that the flow is in use and protects
36+
// it from garbage collection. If the flow does not exist, it is created Just-In-Time (JIT).
37+
// 2. Active State: A flow is "Active" as long as its lease count is greater than zero.
38+
// 3. Lease Release: The client MUST call `Close()` on the returned `FlowConnection` to release the lease.
39+
// When the lease count drops to zero, the flow becomes "Idle".
40+
// 4. Garbage Collection: The implementation MUST automatically garbage collect a flow after it has remained
41+
// continuously Idle for a configurable duration.
3142
//
3243
// # System Invariants
3344
//
3445
// Concrete implementations MUST uphold the following invariants:
46+
//
3547
// 1. Shard Consistency: All configured priority bands and registered flow instances must exist on every Active shard.
36-
// Plugin instance types must be consistent for a given flow across all shards.
37-
// 2. Flow Instance Uniqueness: Each unique `types.FlowKey` (`ID` + `Priority`) corresponds to exactly one managed flow
38-
// instance.
39-
// 3. Capacity Partitioning: Global and per-band capacity limits must be uniformly partitioned across all Active
48+
// 2. Capacity Partitioning: Global and per-band capacity limits must be uniformly partitioned across all Active
4049
// shards.
41-
//
42-
// # Flow Lifecycle
43-
//
44-
// A flow instance (identified by its immutable `FlowKey`) has a simple lifecycle:
45-
//
46-
// - Registered: Known to the `FlowRegistry` via `RegisterOrUpdateFlow`.
47-
// - Idle: Queues are empty across all Active and Draining shards.
48-
// - Garbage Collected (Unregistered): The registry automatically garbage collects flows after they have remained Idle
49-
// for a configurable duration.
50-
//
51-
// # Shard Lifecycle
52-
//
53-
// When a shard is decommissioned, it is marked inactive (`IsActive() == false`) to prevent new enqueues. The shard
54-
// continues to drain and is deleted only after it is empty.
5550
type FlowRegistry interface {
51+
FlowRegistryClient
5652
FlowRegistryAdmin
57-
ShardProvider
5853
}
5954

6055
// FlowRegistryAdmin defines the administrative interface for the global control plane.
61-
//
62-
// # Dynamic Update Strategies
63-
//
64-
// The contract specifies behaviors for handling dynamic updates, prioritizing stability and correctness:
65-
//
66-
// - Immutable Flow Identity (`types.FlowKey`): The system treats the `FlowKey` (`ID` + `Priority`) as the immutable
67-
// identifier. Changing the priority of traffic requires registering a new `FlowKey`. The old flow instance is
68-
// automatically garbage collected when Idle. This design eliminates complex priority migration logic.
69-
//
70-
// - Graceful Draining (Shard Scale-Down): Decommissioned shards enter a Draining state. They stop accepting new
71-
// requests but continue to be processed for dispatch until empty.
72-
//
73-
// - Self-Balancing (Shard Scale-Up): When new shards are added, the `controller.FlowController`'s distribution logic
74-
// naturally utilizes them, funneling new requests to the less-loaded shards. Existing queued items are not
75-
// migrated.
7656
type FlowRegistryAdmin interface {
77-
// RegisterOrUpdateFlow handles the registration of a new flow instance or the update of an existing instance's
78-
// specification (for the same `types.FlowKey`). The operation is atomic across all shards.
79-
//
80-
// Since the `FlowKey` (including `Priority`) is immutable, this method cannot change a flow's priority.
81-
// To change priority, the caller should simply register the new `FlowKey`; the old flow instance will be
82-
// automatically garbage collected when it becomes Idle.
83-
//
84-
// Returns errors wrapping `ErrFlowIDEmpty`, `ErrPriorityBandNotFound`, or internal errors if plugin instantiation
85-
// fails.
86-
RegisterOrUpdateFlow(spec types.FlowSpecification) error
87-
88-
// UpdateShardCount dynamically adjusts the number of internal state shards.
89-
//
90-
// The implementation MUST atomically re-partition capacity allocations across all active shards.
91-
// Returns an error wrapping `ErrInvalidShardCount` if `n` is not positive.
92-
UpdateShardCount(n int) error
93-
9457
// Stats returns globally aggregated statistics for the entire `FlowRegistry`.
9558
Stats() AggregateStats
9659

97-
// ShardStats returns a slice of statistics, one for each internal shard. This provides visibility for debugging and
98-
// monitoring per-shard behavior (e.g., identifying hot or stuck shards).
60+
// ShardStats returns a slice of statistics, one for each internal shard.
9961
ShardStats() []ShardStats
10062
}
10163

102-
// ShardProvider defines the interface for discovering available shards.
103-
//
104-
// A "shard" is an internal, parallel execution unit that allows the `controller.FlowController`'s core dispatch logic
105-
// to be parallelized, preventing a CPU bottleneck at high request rates. The `FlowRegistry`'s state is sharded to
106-
// support this parallelism by reducing lock contention.
64+
// FlowRegistryClient defines the primary, client-facing interface for the registry.
65+
// This is the interface that the `controller.FlowController`'s data path depends upon.
66+
type FlowRegistryClient interface {
67+
// WithConnection manages a scoped, leased session for a given flow.
68+
// It is the primary and sole entry point for interacting with the data path.
69+
//
70+
// This method handles the entire lifecycle of a flow connection:
71+
// 1. Just-In-Time (JIT) Registration: If the flow for the given `types.FlowKey` does not exist, it is created and
72+
// registered automatically.
73+
// 2. Lease Acquisition: It acquires a lifecycle lease, protecting the flow from garbage collection.
74+
// 3. Callback Execution: It invokes the provided function `fn`, passing in a temporary `ActiveFlowConnection` handle.
75+
// 4. Guaranteed Lease Release: It ensures the lease is safely released when the callback function returns.
76+
//
77+
// This functional, callback-based approach makes resource leaks impossible, as the caller is not responsible for
78+
// manually closing the connection.
79+
//
80+
// Errors returned by the callback `fn` are propagated up.
81+
// Returns `ErrFlowIDEmpty` if the provided key has an empty ID.
82+
WithConnection(key types.FlowKey, fn func(conn ActiveFlowConnection) error) error
83+
}
84+
85+
// ActiveFlowConnection represents a handle to a temporary, leased session on a flow.
86+
// It provides a safe, scoped entry point to the registry's sharded data plane.
10787
//
108-
// Consumers MUST check `RegistryShard.IsActive()` before routing new work to a shard to avoid sending requests to a
109-
// Draining shard.
110-
type ShardProvider interface {
111-
// Shards returns a slice of accessors, one for each internal state shard (Active and Draining).
112-
// Callers should not modify the returned slice.
88+
// An `ActiveFlowConnection` instance is only valid for the duration of the `WithConnection` callback from which it was
89+
// received. Callers MUST NOT store a reference to this object or use it after the callback returns.
90+
// Its purpose is to ensure that any interaction with the flow's state (e.g., accessing its shards and queues) occurs
91+
// safely while the flow is guaranteed to be protected from garbage collection.
92+
type ActiveFlowConnection interface {
93+
// Shards returns a stable snapshot of accessors for all internal state shards (both Active and Draining).
94+
// Consumers MUST check `RegistryShard.IsActive()` before routing new work to a shard from this slice.
11395
Shards() []RegistryShard
11496
}
11597

116-
// RegistryShard defines the interface for accessing a specific slice (shard) of the `FlowRegistry's` state.
117-
// It provides a concurrent-safe view for `controller.FlowController` workers.
118-
//
119-
// # Conformance
98+
// RegistryShard defines the interface for a single slice (shard) of the `FlowRegistry`'s state.
99+
// A shard acts as an independent, parallel execution unit, allowing the system's dispatch logic to scale horizontally.
120100
//
121-
// All methods MUST be goroutine-safe.
101+
// # Conformance: Implementations MUST be goroutine-safe.
122102
type RegistryShard interface {
123103
// ID returns a unique identifier for this shard, which must remain stable for the shard's lifetime.
124104
ID() string
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package registry
18+
19+
import (
20+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
21+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
22+
)
23+
24+
// connection is the concrete, un-exported implementation of the `contracts.ActiveFlowConnection` interface.
25+
// It is a temporary handle created for the duration of a single `WithConnection` call.
26+
type connection struct {
27+
registry *FlowRegistry
28+
key types.FlowKey
29+
}
30+
31+
var _ contracts.ActiveFlowConnection = &connection{}
32+
33+
// Shards returns a stable snapshot of accessors for all internal state shards.
34+
func (c *connection) Shards() []contracts.RegistryShard {
35+
c.registry.mu.RLock()
36+
defer c.registry.mu.RUnlock()
37+
38+
// Return a copy to ensure the caller cannot modify the registry's internal slice.
39+
shardsCopy := make([]contracts.RegistryShard, len(c.registry.allShards))
40+
for i, s := range c.registry.allShards {
41+
shardsCopy[i] = s
42+
}
43+
return shardsCopy
44+
}

pkg/epp/flowcontrol/registry/doc.go

Lines changed: 13 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -14,116 +14,23 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
// Package registry provides the concrete implementation of the `contracts.FlowRegistry`.
17+
// Package registry provides the concrete implementation of the `contracts.FlowRegistry` interface.
1818
//
19-
// As the stateful control plane, this package manages the lifecycle of all flows, queues, and policies. It provides a
20-
// sharded, concurrent-safe view of its state to the `controller.FlowController` workers, enabling scalable, parallel
21-
// request processing.
19+
// # Architecture: A Sharded, Concurrent Control Plane
2220
//
23-
// # Architecture: Composite, Sharded, and Separated Concerns
21+
// This package implements the flow control state machine using a sharded architecture to enable scalable, parallel
22+
// request processing. It separates the orchestration control plane from the request-processing data plane.
2423
//
25-
// The registry separates the control plane (orchestration) from the data plane (request processing state).
24+
// - `FlowRegistry`: The top-level orchestrator (Control Plane). It manages the lifecycle of all flows and shards,
25+
// handling registration, garbage collection, and scaling operations.
26+
// - `registryShard`: A slice of the data plane. It holds a partition of the total state and provides a
27+
// read-optimized, concurrent-safe view for a single `controller.FlowController` worker.
28+
// - `managedQueue`: A stateful decorator around a `framework.SafeQueue`. It is the fundamental unit of state,
29+
// responsible for atomically tracking statistics (e.g., length and byte size) and ensuring data consistency.
2630
//
27-
// - `FlowRegistry`: The Control Plane. The top-level orchestrator and single source of truth. It centralizes complex
28-
// operations: flow registration, garbage collection (GC) coordination, and shard scaling.
29-
//
30-
// - `registryShard`: The Data Plane Slice. A concurrent-safe "slice" of the registry's total state. It provides a
31-
// read-optimized view for `FlowController` workers.
32-
//
33-
// - `managedQueue`: The Stateful Decorator. A wrapper around a `framework.SafeQueue`. It augments the queue with
34-
// atomic statistics tracking and signaling state transitions to the control plane.
35-
//
36-
// # Data Flow and Interaction Model
37-
//
38-
// The data path (Enqueue/Dispatch) is optimized for minimal latency and maximum concurrency.
39-
//
40-
// Enqueue Path:
41-
// 1. The `FlowController`'s distributor selects an active `registryShard`.
42-
// 2. The distributor calls `shard.ManagedQueue(flowKey)` (acquires `RLock`).
43-
// 3. The distributor calls `managedQueue.Add(item)`.
44-
// 4. `managedQueue` atomically updates the queue and its stats and signals the control plane.
45-
//
46-
// Dispatch Path:
47-
// 1. A `FlowController` worker iterates over its assigned `registryShard`.
48-
// 2. The worker uses policies and accessors to select the next item (acquires `RLock`).
49-
// 3. The worker calls `managedQueue.Remove(handle)`.
50-
// 4. `managedQueue` atomically updates the queue and its stats and signals the control plane.
51-
//
52-
// # Concurrency Strategy: Multi-Tiered and Optimized
53-
//
54-
// The registry maximizes performance on the hot path while ensuring strict correctness for complex state transitions:
55-
//
56-
// - Serialized Control Plane (Actor Model): The `FlowRegistry` uses a single background goroutine to process all
57-
// state change events serially, eliminating race conditions in the control plane.
58-
//
59-
// - Sharding (Data Plane Parallelism): State is partitioned across multiple `registryShard` instances, allowing the
60-
// data path to scale linearly.
61-
//
62-
// - Lock-Free Data Path (Atomics): Statistics aggregation (Shard/Registry level) uses lock-free atomics.
63-
//
64-
// - Strict Consistency (Hybrid Locking): `managedQueue` uses a hybrid locking model (Mutex for writes, Atomics for
65-
// reads) to guarantee strict consistency between queue contents and statistics, which is required for GC
66-
// correctness.
31+
// # Concurrency Model
6732
//
33+
// The registry uses a multi-layered strategy to maximize performance on the hot path while ensuring correctness for
34+
// administrative tasks.
6835
// (See the `FlowRegistry` struct documentation for detailed locking rules).
69-
//
70-
// # Garbage Collection: The "Trust but Verify" Pattern
71-
//
72-
// The registry handles the race condition between asynchronous data path activity and synchronous GC. The control plane
73-
// maintains an eventually consistent cache (`flowState`).
74-
//
75-
// The registry uses a periodic, generational "Trust but Verify" pattern. It identifies candidate flows using the cache.
76-
// Before deletion, it performs a "Verify" step: it synchronously acquires write locks on ALL shards and queries the
77-
// ground truth (live queue counters). This provides strong consistency when needed.
78-
//
79-
// (See the `garbageCollectFlowLocked` function documentation for detailed steps).
80-
//
81-
// # Scalability Characteristics and Trade-offs
82-
//
83-
// The architecture prioritizes data path throughput and correctness, introducing specific trade-offs:
84-
//
85-
// - Data Path Throughput (Excellent): Scales linearly with the number of shards and benefits from lock-free
86-
// statistics updates.
87-
//
88-
// - GC Latency Impact (Trade-off): The GC "Verify" step requires locking all shards (O(N)). This briefly pauses the
89-
// data path. As the shard count (N) increases, this may impact P99 latency. This trade-off guarantees correctness.
90-
//
91-
// - Control Plane Responsiveness during Scale-Up (Trade-off): Scaling up requires synchronizing all existing flows
92-
// (M) onto the new shards (K). This O(M*K) operation occurs under the main control plane lock. If M is large, this
93-
// operation may block the control plane.
94-
//
95-
// # Event-Driven State Machine and Lifecycle Scenarios
96-
//
97-
// The system relies on atomic state transitions to generate reliable, edge-triggered signals. These signals are sent
98-
// reliably; if the event channel is full, the sender blocks, applying necessary backpressure to ensure no events are
99-
// lost, preventing state divergence.
100-
//
101-
// The following scenarios detail how the registry handles lifecycle events:
102-
//
103-
// New Flow Registration: A new flow instance `F1` (`FlowKey{ID: "A", Priority: 10}`) is registered.
104-
//
105-
// 1. `managedQueue` instances are created for `F1` on all shards.
106-
// 2. The `flowState` cache marks `F1` as Idle. If it remains Idle, it will eventually be garbage collected.
107-
//
108-
// Flow Activity/Inactivity:
109-
//
110-
// 1. When the first request for `F1` is enqueued, the queue signals `BecameNonEmpty`. The control plane marks `F1`
111-
// as Active, protecting it from GC.
112-
// 2. When the last request is dispatched globally, the queues signal `BecameEmpty`. The control plane updates the
113-
// cache, and `F1` is now considered Idle by the GC scanner.
114-
//
115-
// "Changing" Flow Priority: Traffic for `ID: "A"` needs to shift from `Priority: 10` to `Priority: 20`.
116-
//
117-
// 1. The caller registers a new flow instance, `F2` (`FlowKey{ID: "A", Priority: 20}`).
118-
// 2. The system treats `F1` and `F2` as independent entities (Immutable `FlowKey` design).
119-
// 3. As `F1` becomes Idle, it is automatically garbage collected. This achieves the outcome gracefully without complex
120-
// state migration logic.
121-
//
122-
// Shard Scaling:
123-
//
124-
// - Scale-Up: New shards are created and marked Active. Existing flows are synchronized onto the new shards.
125-
// Configuration is re-partitioned.
126-
// - Scale-Down: Targeted shards transition to Draining (stop accepting new work). Configuration is re-partitioned
127-
// across remaining active shards. When a Draining shard is empty, it signals `BecameDrained` and is removed by the
128-
// control plane.
12936
package registry

0 commit comments

Comments
 (0)