Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 23 additions & 16 deletions pkg/epp/flowcontrol/contracts/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
)

// FlowRegistry is the complete interface for the global flow control plane.
// It composes the client-facing data path interface and the administrative interface. A concrete implementation of this
// interface is the single source of truth for all flow control state.
// It composes all role-based interfaces. A concrete implementation of this interface is the single source of truth for
// all flow control state.
//
// # Conformance: Implementations MUST be goroutine-safe.
//
Expand All @@ -48,22 +48,21 @@ import (
// 2. Capacity Partitioning: Global and per-band capacity limits must be uniformly partitioned across all Active
// shards.
type FlowRegistry interface {
FlowRegistryClient
FlowRegistryAdmin
FlowRegistryObserver
FlowRegistryDataPlane
}

// FlowRegistryAdmin defines the administrative interface for the global control plane.
type FlowRegistryAdmin interface {
// Stats returns globally aggregated statistics for the entire `FlowRegistry`.
// FlowRegistryObserver defines the read-only, observation interface for the registry.
type FlowRegistryObserver interface {
// Stats returns a near-consistent snapshot globally aggregated statistics for the entire `FlowRegistry`.
Stats() AggregateStats

// ShardStats returns a slice of statistics, one for each internal shard.
// ShardStats returns a near-consistent slice of statistics snapshots, one for each `RegistryShard`.
ShardStats() []ShardStats
}

// FlowRegistryClient defines the primary, client-facing interface for the registry.
// This is the interface that the `controller.FlowController`'s data path depends upon.
type FlowRegistryClient interface {
// FlowRegistryDataPlane defines the high-throughput, request-path interface for the registry.
type FlowRegistryDataPlane interface {
// WithConnection manages a scoped, leased session for a given flow.
// It is the primary and sole entry point for interacting with the data path.
//
Expand All @@ -90,9 +89,8 @@ type FlowRegistryClient interface {
// Its purpose is to ensure that any interaction with the flow's state (e.g., accessing its shards and queues) occurs
// safely while the flow is guaranteed to be protected from garbage collection.
type ActiveFlowConnection interface {
// Shards returns a stable snapshot of accessors for all internal state shards (both Active and Draining).
// Consumers MUST check `RegistryShard.IsActive()` before routing new work to a shard from this slice.
Shards() []RegistryShard
// ActiveShards returns a stable snapshot of accessors for all Active internal state shards.
ActiveShards() []RegistryShard
}

// RegistryShard defines the interface for a single slice (shard) of the `FlowRegistry`'s state.
Expand Down Expand Up @@ -139,7 +137,7 @@ type RegistryShard interface {
// `controller.FlowController` worker's dispatch loop.
AllOrderedPriorityLevels() []int

// Stats returns a snapshot of the statistics for this specific shard.
// Stats returns a near consistent snapshot of the shard's state.
Stats() ShardStats
}

Expand All @@ -162,6 +160,7 @@ type ManagedQueue interface {
}

// AggregateStats holds globally aggregated statistics for the entire `FlowRegistry`.
// It is a read-only data object representing a near-consistent snapshot of the registry's state.
type AggregateStats struct {
// TotalCapacityBytes is the globally configured maximum total byte size limit across all priority bands and shards.
TotalCapacityBytes uint64
Expand All @@ -173,8 +172,15 @@ type AggregateStats struct {
PerPriorityBandStats map[int]PriorityBandStats
}

// ShardStats holds statistics for a single internal shard within the `FlowRegistry`.
// ShardStats holds statistics and identifying information for a `RegistryShard` within the `FlowRegistry`.
// It is a read-only data object representing a near-consistent snapshot of the shard's state.
type ShardStats struct {
// ID is the unique, stable identifier for this shard.
ID string
// IsActive indicates if the shard was accepting new work at the time this stats snapshot was generated.
// A value of `false` means the shard is in the process of being gracefully drained.
// Due to the concurrent nature of the system, this state could change immediately after the snapshot is taken.
IsActive bool
// TotalCapacityBytes is the optional, maximum total byte size limit aggregated across all priority bands within this
// shard. Its value represents the globally configured limit for the `FlowRegistry` partitioned for this shard.
// The `controller.FlowController` enforces this limit in addition to any per-band capacity limits.
Expand All @@ -192,6 +198,7 @@ type ShardStats struct {
}

// PriorityBandStats holds aggregated statistics for a single priority band.
// It is a read-only data object representing a near-consistent snapshot of the priority band's state.
type PriorityBandStats struct {
// Priority is the numerical priority level this struct describes.
Priority int
Expand Down
110 changes: 110 additions & 0 deletions pkg/epp/flowcontrol/controller/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"fmt"
"time"
)

const (
// defaultExpiryCleanupInterval is the default frequency for scanning for expired items.
defaultExpiryCleanupInterval = 1 * time.Second
// defaultProcessorReconciliationInterval is the default frequency for the supervisor loop.
defaultProcessorReconciliationInterval = 5 * time.Second
// defaultEnqueueChannelBufferSize is the default size of a worker's incoming request buffer.
defaultEnqueueChannelBufferSize = 100
)

// Config holds the configuration for the `FlowController`.
type Config struct {
// DefaultRequestTTL is the default Time-To-Live applied to requests that do not
// specify their own TTL hint.
// Optional: If zero, no TTL is applied by default and we rely solely on request context cancellation.
DefaultRequestTTL time.Duration

// ExpiryCleanupInterval is the interval at which each shard processor scans its queues for expired items.
// Optional: Defaults to `defaultExpiryCleanupInterval` (1 second).
ExpiryCleanupInterval time.Duration

// ProcessorReconciliationInterval is the frequency at which the `FlowController`'s supervisor loop garbage collects
// stale workers.
// Optional: Defaults to `defaultProcessorReconciliationInterval` (5 seconds).
ProcessorReconciliationInterval time.Duration

// EnqueueChannelBufferSize is the size of the buffered channel that accepts incoming requests for each shard
// processor. This buffer acts as a shock absorber, decoupling the high-frequency distributor from the processor's
// serial execution loop and allowing the system to handle short bursts of traffic without blocking.
// Optional: Defaults to `defaultEnqueueChannelBufferSize` (100).
EnqueueChannelBufferSize int
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this buffer overflows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If running with data parallelism (more than one shard), the distribution algorithm will select the best candidate and attempt a non-blocking send. If the candidate buffer is full, it immediately falls back to the next best candidate and so on. If every worker's buffer is full, then we change from a non-blocking to a blocking send on the best candidate.

As the type comment mentions, it is simply a shock absorber for transient request bursts. The goal is to get the request enqueued (in the actual queue structure) as quickly as possible. If the buffer is full, it's fine, we just try to find a less congested worker even if it is not the "best" candidate for our data parallelism strategy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kfswain Hopefully this clears it up. If you don't find the config documentation sufficient here, I can update it to better incorporate these details. Else, if no change is needed, can you resolve this thread?

}

// newConfig performs validation and initialization, returning a guaranteed-valid `Config` object.
// This is the required constructor for creating a new configuration.
// It does not mutate the input `cfg`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OOC: Why not? Can we just log what the value was before it was defaulted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// It does not mutate the input cfg.

We could do this. I prefer this being a pure function though. It is easier to test and less prone to unexpected side effects.


Also, I am changing this in a followup PR and removing the requirement for this constructor to be invoked (this becomes test-only convenience utility). In registry/config.co and controller/config.go I will be exposing a Config.ValidateAndApplyDefaults() *Config method that the caller is expected to invoke before passing the config structs to the FlowRegistry and FlowController respectively.

Would you like me to absorb this refactoring (for at least the controller package into this PR) instead of a followup?

Copy link
Contributor Author

@LukeAVanDrie LukeAVanDrie Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep this PR focused squarely on the controller implementation itself, I'd prefer to tackle the config changes in the immediate follow-up PR I have staged.

In that PR, I implement the plan I mentioned: removing the newConfig constructor in favor of exposing a public ValidateAndApplyDefaults() method on the Config struct itself. This will make the EPP runner responsible for calling it, which is a cleaner, more explicit approach.

Does that sound like a reasonable path forward? If so, I'll resolve this thread and proceed with the follow-up PR after this one merges.

func newConfig(cfg Config) (*Config, error) {
newCfg := cfg.deepCopy()
if err := newCfg.validateAndApplyDefaults(); err != nil {
return nil, err
}
return newCfg, nil
}

// validateAndApplyDefaults checks the global configuration for validity and then mutates the receiver to populate any
// empty fields with system defaults.
func (c *Config) validateAndApplyDefaults() error {
// --- Validation ---
if c.DefaultRequestTTL < 0 {
return fmt.Errorf("DefaultRequestTTL cannot be negative, but got %v", c.DefaultRequestTTL)
}
if c.ExpiryCleanupInterval < 0 {
return fmt.Errorf("ExpiryCleanupInterval cannot be negative, but got %v", c.ExpiryCleanupInterval)
}
if c.ProcessorReconciliationInterval < 0 {
return fmt.Errorf("ProcessorReconciliationInterval cannot be negative, but got %v",
c.ProcessorReconciliationInterval)
}
if c.EnqueueChannelBufferSize < 0 {
return fmt.Errorf("EnqueueChannelBufferSize cannot be negative, but got %d", c.EnqueueChannelBufferSize)
}

// --- Defaulting ---
if c.ExpiryCleanupInterval == 0 {
c.ExpiryCleanupInterval = defaultExpiryCleanupInterval
}
if c.ProcessorReconciliationInterval == 0 {
c.ProcessorReconciliationInterval = defaultProcessorReconciliationInterval
}
if c.EnqueueChannelBufferSize == 0 {
c.EnqueueChannelBufferSize = defaultEnqueueChannelBufferSize
}
return nil
}

// deepCopy creates a deep copy of the `Config` object.
func (c *Config) deepCopy() *Config {
if c == nil {
return nil
}
newCfg := &Config{
DefaultRequestTTL: c.DefaultRequestTTL,
ExpiryCleanupInterval: c.ExpiryCleanupInterval,
ProcessorReconciliationInterval: c.ProcessorReconciliationInterval,
EnqueueChannelBufferSize: c.EnqueueChannelBufferSize,
}
return newCfg
}
135 changes: 135 additions & 0 deletions pkg/epp/flowcontrol/controller/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewConfig(t *testing.T) {
t.Parallel()

testCases := []struct {
name string
input Config
expectErr bool
expectedCfg Config
shouldDefault bool
}{
{
name: "ValidConfig_NoChanges",
input: Config{
DefaultRequestTTL: 10 * time.Second,
ExpiryCleanupInterval: 2 * time.Second,
ProcessorReconciliationInterval: 10 * time.Second,
EnqueueChannelBufferSize: 200,
},
expectErr: false,
expectedCfg: Config{
DefaultRequestTTL: 10 * time.Second,
ExpiryCleanupInterval: 2 * time.Second,
ProcessorReconciliationInterval: 10 * time.Second,
EnqueueChannelBufferSize: 200,
},
},
{
name: "EmptyConfig_ShouldApplyDefaults",
input: Config{},
expectErr: false,
expectedCfg: Config{
DefaultRequestTTL: 0,
ExpiryCleanupInterval: defaultExpiryCleanupInterval,
ProcessorReconciliationInterval: defaultProcessorReconciliationInterval,
EnqueueChannelBufferSize: defaultEnqueueChannelBufferSize,
},
shouldDefault: true,
},
{
name: "NegativeDefaultRequestTTL_Invalid",
input: Config{DefaultRequestTTL: -1},
expectErr: true,
},
{
name: "NegativeExpiryCleanupInterval_Invalid",
input: Config{ExpiryCleanupInterval: -1},
expectErr: true,
},
{
name: "NegativeProcessorReconciliationInterval_Invalid",
input: Config{ProcessorReconciliationInterval: -1},
expectErr: true,
},
{
name: "NegativeEnqueueChannelBufferSize_Invalid",
input: Config{EnqueueChannelBufferSize: -1},
expectErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
originalInput := tc.input.deepCopy()
validatedCfg, err := newConfig(tc.input)

if tc.expectErr {
require.Error(t, err, "expected an error but got nil")
assert.Nil(t, validatedCfg, "validatedCfg should be nil on error")
} else {
require.NoError(t, err, "expected no error but got: %v", err)
require.NotNil(t, validatedCfg, "validatedCfg should not be nil on success")
assert.Equal(t, tc.expectedCfg, *validatedCfg, "validatedCfg should match expected config")

// Ensure the original config is not mutated.
assert.Equal(t, *originalInput, tc.input, "input config should not be mutated")
}
})
}
}

func TestConfig_DeepCopy(t *testing.T) {
t.Parallel()

t.Run("ShouldReturnNil_ForNilReceiver", func(t *testing.T) {
t.Parallel()
var nilConfig *Config
assert.Nil(t, nilConfig.deepCopy(), "Deep copy of a nil config should be nil")
})

t.Run("ShouldCreateIdenticalButSeparateObject", func(t *testing.T) {
t.Parallel()
original := &Config{
DefaultRequestTTL: 1 * time.Second,
ExpiryCleanupInterval: 2 * time.Second,
ProcessorReconciliationInterval: 3 * time.Second,
EnqueueChannelBufferSize: 4,
}
clone := original.deepCopy()

require.NotSame(t, original, clone, "Clone should be a new object in memory")
assert.Equal(t, *original, *clone, "Cloned object should have identical values")

// Modify the clone and ensure the original is unchanged.
clone.DefaultRequestTTL = 99 * time.Second
assert.NotEqual(t, original.DefaultRequestTTL, clone.DefaultRequestTTL,
"Original should not be mutated after clone is changed")
})
}
Loading