Skip to content

Commit d03d3b6

Browse files
authored
feat: Add top-level Flow Controller (#1525)
* feat(flowcontrol): Refactor FlowRegistry contracts This commit refactors some of the core Flow Control contracts to improve clarity and better align with their intended roles. The goal is to create a more intuitive and robust interface for the upcoming top-level FlowController. Key changes include: - The `FlowRegistryClient` interface is renamed to `FlowRegistryDataPlane` to more accurately reflect its role in the high-throughput request path. - The `FlowRegistryAdmin` interface is renamed to `FlowRegistryObserver` to clarify its read-only, observational nature. - The `ActiveFlowConnection.Shards()` method is renamed to `ActiveFlowConnection.ActiveShards()` to make it explicit that it returns only active, schedulable shards. This removes ambiguity for the distributor logic. - `ShardStats` is enriched with `ID` and `IsActive` fields, providing consumers with more context about the shard's state at the time the snapshot was taken. - The registry implementation has been updated to match these new contract definitions. * refactor: Adapt ShardProcessor to a worker role This commit refactors the `ShardProcessor` to function as a stateful worker managed by a higher-level supervisor. This is a preparatory step for the introduction of the new top-level `FlowController`. The public API of the processor is changed from a direct `Enqueue` method to a more sophisticated, channel-based submission model with `Submit` (non-blocking) and `SubmitOrBlock` (blocking). This decouples the producer from the processor's main loop, enabling better backpressure signals and higher throughput. Key changes include: - Introduction of `Submit` and `SubmitOrBlock` for asynchronous request handoff. - `FlowItem`'s finalization logic is improved to be more robust and channel-based. - Error handling within the dispatch cycle is refactored (no logic change) to be more clear about how it promotes work conservation by isolating failures to a single priority band. * feat: Introduce the FlowController supervisor This commit introduces the `FlowController`, a high-throughput, sharded supervisor that orchestrates a pool of stateful `ShardProcessor` workers. This new component is the central processing engine of the Flow Control system, implementing a "supervisor-worker" pattern. Key features of the `FlowController` include: - Supervisor-Worker Architecture: Acts as a stateless supervisor, managing the lifecycle of stateful `ShardProcessor` workers. It includes a reconciliation loop to garbage-collect workers for stale shards. - Flow-Aware Load Balancing: Implements a "Join-Shortest-Queue-by-Bytes" (JSQ-Bytes) algorithm to distribute incoming requests to the least-loaded worker, promoting emergent fairness. - Synchronous API: Exposes a blocking `EnqueueAndWait` method, which simplifies client integration (e.g., with Envoy `ext_proc`) and provides direct backpressure. - Lazy Worker Initialization: Workers are created on-demand when a shard shard first becomes active to conserve resources and reduce contention on the hot path. - Configuration: A new `Config` object allows for tuning parameters like TTLs, buffer sizes, and reconciliation intervals. * docs: Update comments to align with FlowController This commit updates documentation and code comments across various framework components to align with the concepts and architecture introduced by the `FlowController`. Key changes include: - FCFS Policy: Clarified the distinction between "logical" and "physical" enqueue time and the behavioral trade-offs when pairing with different queue capabilities. - ListQueue: Expanded the documentation to explain its role as a high-performance, approximate FCFS queue in the context of the `FlowController`'s retry mechanics. - Request Types: Refined the comments for `QueueItemAccessor` to be more precise about the meaning of `EnqueueTime`. * refactor Simplify controller Lifecycle This commit refactors the `FlowController` to simplify its startup and shutdown lifecycle, making it more robust and easier to reason about. It also incorporates several smaller improvements based on reviewer feedback. The primary change addresses a complex lifecycle implementation that used an `atomic.Bool` (`isRunning`) and a `ready` channel to manage state. Key changes: - **Simplified Lifecycle:** The controller's lifecycle is now tied directly to a `context` passed into `NewFlowController`. The `Run` method has been unexported, and the main `run` loop is started as a goroutine from the constructor. This eliminates the `ready` channel and `isRunning` flag in addition to simplifying the interface for callers. - **Robust Worker Creation:** The `getOrStartWorker` logic has been improved to ensure that in a race to create a worker, the "losing" goroutine correctly cleans up its resources and does not start a redundant processor. This fixes a bug where the losing worker would evict all items from its queues on shutdown which were shared instances with the winning worker resulting in premature request finalization. - **Comment Reduction:** The extensive explanatory comments in `distributeRequest` have been condensed to be more concise while retaining the essential details of the algorithm. - **Minor Cleanups:** - The initial, unnecessary call to `reconcileProcessors()` at startup has been removed. - Error messages have been clarified (e.g., "acquire lease" instead of "establish connection"). - A typed error for nil requests was replaced with a standard `errors.New`.
1 parent 9266b5e commit d03d3b6

File tree

22 files changed

+2043
-650
lines changed

22 files changed

+2043
-650
lines changed

pkg/epp/flowcontrol/contracts/registry.go

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import (
2222
)
2323

2424
// FlowRegistry is the complete interface for the global flow control plane.
25-
// It composes the client-facing data path interface and the administrative interface. A concrete implementation of this
26-
// interface is the single source of truth for all flow control state.
25+
// It composes all role-based interfaces. A concrete implementation of this interface is the single source of truth for
26+
// all flow control state.
2727
//
2828
// # Conformance: Implementations MUST be goroutine-safe.
2929
//
@@ -48,22 +48,21 @@ import (
4848
// 2. Capacity Partitioning: Global and per-band capacity limits must be uniformly partitioned across all Active
4949
// shards.
5050
type FlowRegistry interface {
51-
FlowRegistryClient
52-
FlowRegistryAdmin
51+
FlowRegistryObserver
52+
FlowRegistryDataPlane
5353
}
5454

55-
// FlowRegistryAdmin defines the administrative interface for the global control plane.
56-
type FlowRegistryAdmin interface {
57-
// Stats returns globally aggregated statistics for the entire `FlowRegistry`.
55+
// FlowRegistryObserver defines the read-only, observation interface for the registry.
56+
type FlowRegistryObserver interface {
57+
// Stats returns a near-consistent snapshot globally aggregated statistics for the entire `FlowRegistry`.
5858
Stats() AggregateStats
5959

60-
// ShardStats returns a slice of statistics, one for each internal shard.
60+
// ShardStats returns a near-consistent slice of statistics snapshots, one for each `RegistryShard`.
6161
ShardStats() []ShardStats
6262
}
6363

64-
// FlowRegistryClient defines the primary, client-facing interface for the registry.
65-
// This is the interface that the `controller.FlowController`'s data path depends upon.
66-
type FlowRegistryClient interface {
64+
// FlowRegistryDataPlane defines the high-throughput, request-path interface for the registry.
65+
type FlowRegistryDataPlane interface {
6766
// WithConnection manages a scoped, leased session for a given flow.
6867
// It is the primary and sole entry point for interacting with the data path.
6968
//
@@ -90,9 +89,8 @@ type FlowRegistryClient interface {
9089
// Its purpose is to ensure that any interaction with the flow's state (e.g., accessing its shards and queues) occurs
9190
// safely while the flow is guaranteed to be protected from garbage collection.
9291
type ActiveFlowConnection interface {
93-
// Shards returns a stable snapshot of accessors for all internal state shards (both Active and Draining).
94-
// Consumers MUST check `RegistryShard.IsActive()` before routing new work to a shard from this slice.
95-
Shards() []RegistryShard
92+
// ActiveShards returns a stable snapshot of accessors for all Active internal state shards.
93+
ActiveShards() []RegistryShard
9694
}
9795

9896
// RegistryShard defines the interface for a single slice (shard) of the `FlowRegistry`'s state.
@@ -139,7 +137,7 @@ type RegistryShard interface {
139137
// `controller.FlowController` worker's dispatch loop.
140138
AllOrderedPriorityLevels() []int
141139

142-
// Stats returns a snapshot of the statistics for this specific shard.
140+
// Stats returns a near consistent snapshot of the shard's state.
143141
Stats() ShardStats
144142
}
145143

@@ -162,6 +160,7 @@ type ManagedQueue interface {
162160
}
163161

164162
// AggregateStats holds globally aggregated statistics for the entire `FlowRegistry`.
163+
// It is a read-only data object representing a near-consistent snapshot of the registry's state.
165164
type AggregateStats struct {
166165
// TotalCapacityBytes is the globally configured maximum total byte size limit across all priority bands and shards.
167166
TotalCapacityBytes uint64
@@ -173,8 +172,15 @@ type AggregateStats struct {
173172
PerPriorityBandStats map[int]PriorityBandStats
174173
}
175174

176-
// ShardStats holds statistics for a single internal shard within the `FlowRegistry`.
175+
// ShardStats holds statistics and identifying information for a `RegistryShard` within the `FlowRegistry`.
176+
// It is a read-only data object representing a near-consistent snapshot of the shard's state.
177177
type ShardStats struct {
178+
// ID is the unique, stable identifier for this shard.
179+
ID string
180+
// IsActive indicates if the shard was accepting new work at the time this stats snapshot was generated.
181+
// A value of `false` means the shard is in the process of being gracefully drained.
182+
// Due to the concurrent nature of the system, this state could change immediately after the snapshot is taken.
183+
IsActive bool
178184
// TotalCapacityBytes is the optional, maximum total byte size limit aggregated across all priority bands within this
179185
// shard. Its value represents the globally configured limit for the `FlowRegistry` partitioned for this shard.
180186
// The `controller.FlowController` enforces this limit in addition to any per-band capacity limits.
@@ -192,6 +198,7 @@ type ShardStats struct {
192198
}
193199

194200
// PriorityBandStats holds aggregated statistics for a single priority band.
201+
// It is a read-only data object representing a near-consistent snapshot of the priority band's state.
195202
type PriorityBandStats struct {
196203
// Priority is the numerical priority level this struct describes.
197204
Priority int
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"fmt"
21+
"time"
22+
)
23+
24+
const (
25+
// defaultExpiryCleanupInterval is the default frequency for scanning for expired items.
26+
defaultExpiryCleanupInterval = 1 * time.Second
27+
// defaultProcessorReconciliationInterval is the default frequency for the supervisor loop.
28+
defaultProcessorReconciliationInterval = 5 * time.Second
29+
// defaultEnqueueChannelBufferSize is the default size of a worker's incoming request buffer.
30+
defaultEnqueueChannelBufferSize = 100
31+
)
32+
33+
// Config holds the configuration for the `FlowController`.
34+
type Config struct {
35+
// DefaultRequestTTL is the default Time-To-Live applied to requests that do not
36+
// specify their own TTL hint.
37+
// Optional: If zero, no TTL is applied by default and we rely solely on request context cancellation.
38+
DefaultRequestTTL time.Duration
39+
40+
// ExpiryCleanupInterval is the interval at which each shard processor scans its queues for expired items.
41+
// Optional: Defaults to `defaultExpiryCleanupInterval` (1 second).
42+
ExpiryCleanupInterval time.Duration
43+
44+
// ProcessorReconciliationInterval is the frequency at which the `FlowController`'s supervisor loop garbage collects
45+
// stale workers.
46+
// Optional: Defaults to `defaultProcessorReconciliationInterval` (5 seconds).
47+
ProcessorReconciliationInterval time.Duration
48+
49+
// EnqueueChannelBufferSize is the size of the buffered channel that accepts incoming requests for each shard
50+
// processor. This buffer acts as a shock absorber, decoupling the high-frequency distributor from the processor's
51+
// serial execution loop and allowing the system to handle short bursts of traffic without blocking.
52+
// Optional: Defaults to `defaultEnqueueChannelBufferSize` (100).
53+
EnqueueChannelBufferSize int
54+
}
55+
56+
// newConfig performs validation and initialization, returning a guaranteed-valid `Config` object.
57+
// This is the required constructor for creating a new configuration.
58+
// It does not mutate the input `cfg`.
59+
func newConfig(cfg Config) (*Config, error) {
60+
newCfg := cfg.deepCopy()
61+
if err := newCfg.validateAndApplyDefaults(); err != nil {
62+
return nil, err
63+
}
64+
return newCfg, nil
65+
}
66+
67+
// validateAndApplyDefaults checks the global configuration for validity and then mutates the receiver to populate any
68+
// empty fields with system defaults.
69+
func (c *Config) validateAndApplyDefaults() error {
70+
// --- Validation ---
71+
if c.DefaultRequestTTL < 0 {
72+
return fmt.Errorf("DefaultRequestTTL cannot be negative, but got %v", c.DefaultRequestTTL)
73+
}
74+
if c.ExpiryCleanupInterval < 0 {
75+
return fmt.Errorf("ExpiryCleanupInterval cannot be negative, but got %v", c.ExpiryCleanupInterval)
76+
}
77+
if c.ProcessorReconciliationInterval < 0 {
78+
return fmt.Errorf("ProcessorReconciliationInterval cannot be negative, but got %v",
79+
c.ProcessorReconciliationInterval)
80+
}
81+
if c.EnqueueChannelBufferSize < 0 {
82+
return fmt.Errorf("EnqueueChannelBufferSize cannot be negative, but got %d", c.EnqueueChannelBufferSize)
83+
}
84+
85+
// --- Defaulting ---
86+
if c.ExpiryCleanupInterval == 0 {
87+
c.ExpiryCleanupInterval = defaultExpiryCleanupInterval
88+
}
89+
if c.ProcessorReconciliationInterval == 0 {
90+
c.ProcessorReconciliationInterval = defaultProcessorReconciliationInterval
91+
}
92+
if c.EnqueueChannelBufferSize == 0 {
93+
c.EnqueueChannelBufferSize = defaultEnqueueChannelBufferSize
94+
}
95+
return nil
96+
}
97+
98+
// deepCopy creates a deep copy of the `Config` object.
99+
func (c *Config) deepCopy() *Config {
100+
if c == nil {
101+
return nil
102+
}
103+
newCfg := &Config{
104+
DefaultRequestTTL: c.DefaultRequestTTL,
105+
ExpiryCleanupInterval: c.ExpiryCleanupInterval,
106+
ProcessorReconciliationInterval: c.ProcessorReconciliationInterval,
107+
EnqueueChannelBufferSize: c.EnqueueChannelBufferSize,
108+
}
109+
return newCfg
110+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"testing"
21+
"time"
22+
23+
"github.com/stretchr/testify/assert"
24+
"github.com/stretchr/testify/require"
25+
)
26+
27+
func TestNewConfig(t *testing.T) {
28+
t.Parallel()
29+
30+
testCases := []struct {
31+
name string
32+
input Config
33+
expectErr bool
34+
expectedCfg Config
35+
shouldDefault bool
36+
}{
37+
{
38+
name: "ValidConfig_NoChanges",
39+
input: Config{
40+
DefaultRequestTTL: 10 * time.Second,
41+
ExpiryCleanupInterval: 2 * time.Second,
42+
ProcessorReconciliationInterval: 10 * time.Second,
43+
EnqueueChannelBufferSize: 200,
44+
},
45+
expectErr: false,
46+
expectedCfg: Config{
47+
DefaultRequestTTL: 10 * time.Second,
48+
ExpiryCleanupInterval: 2 * time.Second,
49+
ProcessorReconciliationInterval: 10 * time.Second,
50+
EnqueueChannelBufferSize: 200,
51+
},
52+
},
53+
{
54+
name: "EmptyConfig_ShouldApplyDefaults",
55+
input: Config{},
56+
expectErr: false,
57+
expectedCfg: Config{
58+
DefaultRequestTTL: 0,
59+
ExpiryCleanupInterval: defaultExpiryCleanupInterval,
60+
ProcessorReconciliationInterval: defaultProcessorReconciliationInterval,
61+
EnqueueChannelBufferSize: defaultEnqueueChannelBufferSize,
62+
},
63+
shouldDefault: true,
64+
},
65+
{
66+
name: "NegativeDefaultRequestTTL_Invalid",
67+
input: Config{DefaultRequestTTL: -1},
68+
expectErr: true,
69+
},
70+
{
71+
name: "NegativeExpiryCleanupInterval_Invalid",
72+
input: Config{ExpiryCleanupInterval: -1},
73+
expectErr: true,
74+
},
75+
{
76+
name: "NegativeProcessorReconciliationInterval_Invalid",
77+
input: Config{ProcessorReconciliationInterval: -1},
78+
expectErr: true,
79+
},
80+
{
81+
name: "NegativeEnqueueChannelBufferSize_Invalid",
82+
input: Config{EnqueueChannelBufferSize: -1},
83+
expectErr: true,
84+
},
85+
}
86+
87+
for _, tc := range testCases {
88+
t.Run(tc.name, func(t *testing.T) {
89+
t.Parallel()
90+
originalInput := tc.input.deepCopy()
91+
validatedCfg, err := newConfig(tc.input)
92+
93+
if tc.expectErr {
94+
require.Error(t, err, "expected an error but got nil")
95+
assert.Nil(t, validatedCfg, "validatedCfg should be nil on error")
96+
} else {
97+
require.NoError(t, err, "expected no error but got: %v", err)
98+
require.NotNil(t, validatedCfg, "validatedCfg should not be nil on success")
99+
assert.Equal(t, tc.expectedCfg, *validatedCfg, "validatedCfg should match expected config")
100+
101+
// Ensure the original config is not mutated.
102+
assert.Equal(t, *originalInput, tc.input, "input config should not be mutated")
103+
}
104+
})
105+
}
106+
}
107+
108+
func TestConfig_DeepCopy(t *testing.T) {
109+
t.Parallel()
110+
111+
t.Run("ShouldReturnNil_ForNilReceiver", func(t *testing.T) {
112+
t.Parallel()
113+
var nilConfig *Config
114+
assert.Nil(t, nilConfig.deepCopy(), "Deep copy of a nil config should be nil")
115+
})
116+
117+
t.Run("ShouldCreateIdenticalButSeparateObject", func(t *testing.T) {
118+
t.Parallel()
119+
original := &Config{
120+
DefaultRequestTTL: 1 * time.Second,
121+
ExpiryCleanupInterval: 2 * time.Second,
122+
ProcessorReconciliationInterval: 3 * time.Second,
123+
EnqueueChannelBufferSize: 4,
124+
}
125+
clone := original.deepCopy()
126+
127+
require.NotSame(t, original, clone, "Clone should be a new object in memory")
128+
assert.Equal(t, *original, *clone, "Cloned object should have identical values")
129+
130+
// Modify the clone and ensure the original is unchanged.
131+
clone.DefaultRequestTTL = 99 * time.Second
132+
assert.NotEqual(t, original.DefaultRequestTTL, clone.DefaultRequestTTL,
133+
"Original should not be mutated after clone is changed")
134+
})
135+
}

0 commit comments

Comments
 (0)