Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 52 additions & 57 deletions pkg/epp/flowcontrol/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Package controller contains the implementation of the `FlowController` engine.
// Package controller contains the implementation of the FlowController engine.
//
// The FlowController is the central processing engine of the Flow Control system. It is a sharded, high-throughput
// The FlowController is the central processing engine of the Flow Control layer. It is a sharded, high-throughput
// component responsible for managing the lifecycle of all incoming requests. It achieves this by acting as a stateless
// supervisor that orchestrates a pool of stateful workers (`internal.ShardProcessor`), distributing incoming requests
// among them using a sophisticated load-balancing algorithm.
// supervisor that orchestrates a pool of stateful workers (ShardProcessors), distributing incoming requests among them.
package controller

import (
Expand All @@ -43,22 +42,20 @@ import (
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

// registryClient defines the minimal interface that the `FlowController` needs to interact with the `FlowRegistry`.
// registryClient defines the minimal interface that the FlowController needs to interact with the FlowRegistry.
type registryClient interface {
contracts.FlowRegistryObserver
contracts.FlowRegistryDataPlane
}

// shardProcessor is the minimal internal interface that the `FlowController` requires from its workers.
// This abstraction allows for the injection of mock processors during testing.
// shardProcessor is the minimal internal interface that the FlowController requires from its workers.
type shardProcessor interface {
Run(ctx context.Context)
Submit(item *internal.FlowItem) error
SubmitOrBlock(ctx context.Context, item *internal.FlowItem) error
}

// shardProcessorFactory defines the signature for a function that creates a `shardProcessor`.
// This enables dependency injection for testing.
// shardProcessorFactory defines the signature for creating a shardProcessor.
type shardProcessorFactory func(
ctx context.Context,
shard contracts.RegistryShard,
Expand All @@ -74,15 +71,16 @@ var _ shardProcessor = &internal.ShardProcessor{}
// managedWorker holds the state for a single supervised worker.
type managedWorker struct {
processor shardProcessor
cancel context.CancelFunc
// cancel function for the worker-specific context. Used during shutdown and GC.
cancel context.CancelFunc
}

// FlowController is the central, high-throughput engine of the Flow Control system.
// It is designed as a stateless distributor that orchestrates a pool of stateful workers (`internal.ShardProcessor`),
// following a "supervisor-worker" pattern.
// FlowController is the central, high-throughput engine of the Flow Control layer.
// It is designed as a stateless distributor that orchestrates a pool of stateful workers (ShardProcessor), following a
// supervisor-worker pattern.
//
// The controller's `Run` loop executes periodically, acting as a garbage collector that keeps the pool of running
// workers synchronized with the dynamic shard topology of the `FlowRegistry`.
// The controller's run loop executes periodically, acting as a garbage collector that keeps the pool of running
// workers synchronized with the dynamic shard topology of the FlowRegistry.
//
// Request Lifecycle Management:
//
Expand All @@ -103,25 +101,26 @@ type FlowController struct {

// --- Lifecycle state ---

// parentCtx is the root context for the controller's lifecycle, established when `Run` is called.
// parentCtx is the root context for the controller's lifecycle, established when NewFlowController is called.
// It is the parent for all long-lived worker goroutines.
parentCtx context.Context

// --- Concurrent state ---

// workers is a highly concurrent map storing the `managedWorker` for each shard.
// workers is a highly concurrent map storing the managedWorker for each shard.
// It is the controller's source of truth for the worker pool.
// The key is the shard ID (`string`), and the value is a `*managedWorker`.
workers sync.Map
workers sync.Map // key: shard ID (string); value: *managedWorker

// wg waits for all worker goroutines to terminate during shutdown.
wg sync.WaitGroup
}

// flowControllerOption is a function that applies a configuration change to a `FlowController`.
// flowControllerOption is a function that applies a configuration change.
// test-only
type flowControllerOption func(*FlowController)

// NewFlowController creates a new `FlowController` instance.
// NewFlowController creates and starts a new FlowController instance.
// The provided context governs the lifecycle of the controller and all its workers.
func NewFlowController(
ctx context.Context,
config Config,
Expand All @@ -131,15 +130,14 @@ func NewFlowController(
opts ...flowControllerOption,
) (*FlowController, error) {
fc := &FlowController{
config: *config.deepCopy(),
config: config,
registry: registry,
saturationDetector: sd,
clock: clock.RealClock{},
logger: logger.WithName("flow-controller"),
parentCtx: ctx,
}

// Use the real shard processor implementation by default.
fc.shardProcessorFactory = func(
ctx context.Context,
shard contracts.RegistryShard,
Expand Down Expand Up @@ -167,9 +165,9 @@ func NewFlowController(
return fc, nil
}

// run starts the `FlowController`'s main reconciliation loop.
// run starts the FlowController's main reconciliation loop (supervisor loop).
// This loop is responsible for garbage collecting workers whose shards no longer exist in the registry.
// This method blocks until the provided context is cancelled and ALL worker goroutines have fully terminated.
// This method blocks until the provided context is cancelled and all worker goroutines have fully terminated.
func (fc *FlowController) run(ctx context.Context) {
fc.logger.Info("Starting FlowController reconciliation loop.")
defer fc.logger.Info("FlowController reconciliation loop stopped.")
Expand All @@ -194,23 +192,19 @@ func (fc *FlowController) run(ctx context.Context) {
// # Design Rationale: The Synchronous Model
//
// This blocking model is deliberately chosen for its simplicity and robustness, especially in the context of Envoy
// External Processing (`ext_proc`), which operates on a stream-based protocol.
// External Processing (ext_proc), which operates on a stream-based protocol.
//
// - `ext_proc` Alignment: A single goroutine typically manages the stream for a given HTTP request.
// `EnqueueAndWait` fits this perfectly: the request-handling goroutine calls it, blocks, and upon return, has a
// - ext_proc Alignment: A single goroutine typically manages the stream for a given HTTP request.
// EnqueueAndWait fits this perfectly: the request-handling goroutine calls it, blocks, and upon return, has a
// definitive outcome to act upon.
// - Simplified State Management: The state of a "waiting" request is implicitly managed by the blocked goroutine's
// stack and its `context.Context`. The system only needs to signal this specific goroutine to unblock it.
// - Direct Backpressure: If queues are full, `EnqueueAndWait` returns an error immediately, providing direct
// stack and its Context. The system only needs to signal this specific goroutine to unblock it.
// - Direct Backpressure: If queues are full, EnqueueAndWait returns an error immediately, providing direct
// backpressure to the caller.
func (fc *FlowController) EnqueueAndWait(
ctx context.Context,
req types.FlowControlRequest,
) (types.QueueOutcome, error) {
if req == nil {
return types.QueueOutcomeRejectedOther, errors.New("request cannot be nil")
}

flowKey := req.FlowKey()
fairnessID := flowKey.ID
priority := strconv.Itoa(flowKey.Priority)
Expand Down Expand Up @@ -365,17 +359,22 @@ type candidate struct {
// of that flow's queue, from least to most loaded.
func (fc *FlowController) selectDistributionCandidates(key types.FlowKey) ([]candidate, error) {
var candidates []candidate

// Acquire a connection to the registry for the flow key. This ensures a consistent view of the ActiveShards for the
// duration of the shard selection process, preventing races with concurrent shard topology changes.
err := fc.registry.WithConnection(key, func(conn contracts.ActiveFlowConnection) error {
shards := conn.ActiveShards()
candidates = make([]candidate, len(shards))
for i, shard := range shards {
candidates = make([]candidate, 0, len(shards))
for _, shard := range shards {
worker := fc.getOrStartWorker(shard)
mq, err := shard.ManagedQueue(key)
if err != nil {
panic(fmt.Sprintf("invariant violation: ManagedQueue for leased flow %s failed on shard %s: %v",
key, shard.ID(), err))
fc.logger.Error(err,
"Invariant violation. Failed to get ManagedQueue for a leased flow on an Active shard. Skipping shard.",
"flowKey", key, "shardID", shard.ID())
continue
}
candidates[i] = candidate{worker.processor, shard.ID(), mq.ByteSize()}
candidates = append(candidates, candidate{worker.processor, shard.ID(), mq.ByteSize()})
}
return nil
})
Expand Down Expand Up @@ -435,15 +434,15 @@ func (fc *FlowController) distributeRequest(
}

// getOrStartWorker implements the lazy-loading and startup of shard processors.
// It attempts to retrieve an existing worker for a shard. If one doesn't exist, it constructs a new worker and attempts
// to register it atomically. The worker's processor goroutine is only started *after* it has successfully been
// registered, preventing race conditions where multiple goroutines create and start the same worker.
// It ensures that exactly one worker goroutine is started for each shard, using atomic operations
// (sync.Map.LoadOrStore). The worker's processor goroutine is only started after it has successfully been registered,
// preventing race conditions where multiple goroutines create and start the same worker.
func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *managedWorker {
if w, ok := fc.workers.Load(shard.ID()); ok {
return w.(*managedWorker)
}

// Construct a new worker, but do not start its processor goroutine yet.
// Construct a new worker, but do not start its goroutine yet.
processorCtx, cancel := context.WithCancel(fc.parentCtx)
processor := fc.shardProcessorFactory(
processorCtx,
Expand All @@ -459,18 +458,17 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag
cancel: cancel,
}

// Atomically load or store. This is the critical step for preventing race conditions.
// Atomically load or store. This is the critical synchronization step.
actual, loaded := fc.workers.LoadOrStore(shard.ID(), newWorker)
if loaded {
// Another goroutine beat us to it. The `newWorker` we created was not stored.
// We must cancel the context we created for it to prevent a leak, but we do not need to do anything else, as its
// processor was never started.
// We must cancel the context we created to prevent a leak.
cancel()
return actual.(*managedWorker)
}

// We won the race. The `newWorker` was successfully stored.
// Now, and only now, do we start the processor's long-running goroutine.
// We won the race. The newWorker was stored. Now, start the processor's long-running goroutine.
fc.logger.V(logutil.DEFAULT).Info("Starting new ShardProcessor worker.", "shardID", shard.ID())
fc.wg.Add(1)
go func() {
defer fc.wg.Done()
Expand All @@ -481,24 +479,22 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag
}

// reconcileProcessors is the supervisor's core garbage collection loop.
// It fetches the current list of Active shards from the registry and removes any workers whose corresponding shards
// have been fully drained and garbage collected by the registry.
// It identifies and stops workers whose corresponding shards have been removed from the registry.
func (fc *FlowController) reconcileProcessors() {
stats := fc.registry.ShardStats()
shards := make(map[string]struct{}, len(stats)) // `map[shardID] -> isActive`
shards := make(map[string]struct{}, len(stats)) // map[shardID] -> isActive
for _, s := range stats {
shards[s.ID] = struct{}{}
}

fc.workers.Range(func(key, value any) bool {
shardID := key.(string)
worker := value.(*managedWorker)

// GC check: Is the shard no longer in the registry at all?
if _, exists := shards[shardID]; !exists {
fc.logger.Info("Stale worker detected for GC'd shard, shutting down.", "shardID", shardID)
worker.cancel()
fc.workers.Delete(shardID)
fc.logger.V(logutil.DEFAULT).Info("Stale worker detected for GC'd shard, initiating shutdown.",
"shardID", shardID)
worker.cancel() // Cancel the worker's context, initiating the Processor's graceful shutdown sequence.
fc.workers.Delete(shardID) // Delete from the map so no new requests are routed to it.
}
return true
})
Expand All @@ -515,7 +511,6 @@ func (fc *FlowController) shutdown() {
worker.cancel()
return true
})

fc.wg.Wait()
fc.logger.Info("All shard processors have shut down.")
}
24 changes: 7 additions & 17 deletions pkg/epp/flowcontrol/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,17 +317,6 @@ func TestFlowController_EnqueueAndWait(t *testing.T) {
t.Run("Rejections", func(t *testing.T) {
t.Parallel()

t.Run("OnNilRequest", func(t *testing.T) {
t.Parallel()
h := newUnitHarness(t, t.Context(), Config{}, nil)

outcome, err := h.fc.EnqueueAndWait(context.Background(), nil)
require.Error(t, err, "EnqueueAndWait must reject a nil request")
assert.Equal(t, "request cannot be nil", err.Error(), "error message must be specific")
assert.Equal(t, types.QueueOutcomeRejectedOther, outcome,
"outcome should be QueueOutcomeRejectedOther for invalid inputs")
})

t.Run("OnReqCtxExpiredBeforeDistribution", func(t *testing.T) {
t.Parallel()
// Test that if the request context provided to EnqueueAndWait is already expired, it returns immediately.
Expand Down Expand Up @@ -418,14 +407,13 @@ func TestFlowController_EnqueueAndWait(t *testing.T) {
"outcome should be QueueOutcomeRejectedOther for transient registry errors")
})

// This test validates the documented invariant handling in distributeRequest.
t.Run("PanicsOnManagedQueueError", func(t *testing.T) {
t.Run("OnManagedQueueError", func(t *testing.T) {
t.Parallel()
mockRegistry := &mockRegistryClient{}
h := newUnitHarness(t, t.Context(), Config{}, mockRegistry)

// Create a faulty shard that successfully leases the flow but fails to return the
// ManagedQueue.
// ManagedQueue. This shard should be considered as unavailable.
faultyShard := &mocks.MockRegistryShard{
IDFunc: func() string { return "faulty-shard" },
ManagedQueueFunc: func(_ types.FlowKey) (contracts.ManagedQueue, error) {
Expand All @@ -440,9 +428,11 @@ func TestFlowController_EnqueueAndWait(t *testing.T) {
}

req := newTestRequest(defaultFlowKey)
assert.Panics(t, func() {
_, _ = h.fc.EnqueueAndWait(context.Background(), req)
}, "EnqueueAndWait must panic when a registry implementation violates the ManagedQueue contract")
outcome, err := h.fc.EnqueueAndWait(context.Background(), req)
require.Error(t, err, "EnqueueAndWait must reject requests if no shards are available")
assert.ErrorIs(t, err, types.ErrRejected, "error should wrap ErrRejected")
assert.Equal(t, types.QueueOutcomeRejectedCapacity, outcome,
"outcome should be QueueOutcomeRejectedCapacity when no shards exist for the flow")
})
})

Expand Down
49 changes: 37 additions & 12 deletions pkg/epp/flowcontrol/controller/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,48 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Package controller contains the implementation of the `FlowController` engine.
// Package controller contains the implementation of the FlowController engine.
//
// # Overview
//
// The `FlowController` is the central processing engine of the Flow Control system. It acts as a stateless supervisor
// that orchestrates a pool of stateful workers (`internal.ShardProcessor`), managing the lifecycle of all incoming
// requests from initial submission to a terminal outcome (dispatch, rejection, or eviction).
// The FlowController is the central processing engine of the Flow Control layer. It acts as a stateless supervisor that
// orchestrates a pool of stateful workers (internal.ShardProcessor), managing the lifecycle of all incoming requests
// from initial submission to a terminal outcome (dispatch, rejection, or eviction).
//
// # Architecture: Supervisor-Worker Pattern
//
// This package implements a classic "supervisor-worker" pattern to achieve high throughput and dynamic scalability.
// This package implements a supervisor-worker pattern to achieve high throughput and dynamic scalability.
//
// - The `FlowController` (Supervisor): The public-facing API of the system. Its primary responsibilities are to
// execute a distribution algorithm to select the optimal worker for a new request and to manage the lifecycle of
// the worker pool, ensuring it stays synchronized with the underlying shard topology defined by the
// `contracts.FlowRegistry`.
// - The `internal.ShardProcessor` (Worker): A stateful, single-goroutine actor responsible for the entire lifecycle
// of requests on a single shard. The supervisor manages a pool of these workers, one for each
// `contracts.RegistryShard`.
// - The FlowController (Supervisor): The public-facing API of the system. Its primary responsibilities are to execute
// a distribution algorithm to select the optimal worker for a new request and to manage the lifecycle of the worker
// pool, ensuring it stays synchronized with the underlying shard topology defined by the contracts.FlowRegistry.
// - The internal.ShardProcessor (Worker): A stateful, single-goroutine actor responsible for the entire lifecycle of
// requests on a single shard. The supervisor manages a pool of these workers, one for each contracts.RegistryShard.
//
// # Concurrency Model
//
// The FlowController is designed to be highly concurrent and thread-safe. It acts primarily as a stateless distributor.
//
// - EnqueueAndWait: Can be called concurrently by many goroutines.
// - Worker Management: Uses a sync.Map (workers) for concurrent access and lazy initialization of workers.
// - Supervision: A single background goroutine (run) manages the worker pool lifecycle (garbage collection).
//
// It achieves high throughput by minimizing shared state and relying on the internal ShardProcessors to handle state
// mutations serially (using an actor model).
//
// # Request Lifecycle and Ownership
//
// A request (represented internally as a FlowItem) has a lifecycle managed cooperatively by the Controller and a
// Processor. Defining ownership is critical for ensuring an item is finalized exactly once.
//
// 1. Submission (Controller): The Controller attempts to hand off the item to a Processor.
// 2. Handoff:
// - Success: Ownership transfers to the Processor, which is now responsible for Finalization.
// - Failure: Ownership remains with the Controller, which must Finalize the item.
// 3. Processing (Processor): The Processor enqueues, manages, and eventually dispatches or rejects the item.
// 4. Finalization: The terminal outcome is set. This can happen:
// - Synchronously: The Processor determines the outcome (e.g., Dispatch, Capacity Rejection).
// - Asynchronously: The Controller observes the request's Context expiry (TTL/Cancellation) and calls Finalize.
//
// The FlowItem uses atomic operations to safely coordinate the Finalization state across goroutines.
package controller
Loading