-
Notifications
You must be signed in to change notification settings - Fork 186
refactor(registry): Replace event-driven GC with a lease-based lifecycle #1476
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
66d8e28
ef56ab4
e77cab5
614d7c5
da9490d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,104 +21,84 @@ import ( | |
| "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" | ||
| ) | ||
|
|
||
| // FlowRegistry is the complete interface for the global control plane. An implementation of this interface is the single | ||
| // source of truth for all flow control state and configuration. | ||
| // FlowRegistry is the complete interface for the global flow control plane. | ||
| // It composes the client-facing data path interface and the administrative interface. A concrete implementation of this | ||
| // interface is the single source of truth for all flow control state. | ||
| // | ||
| // # Conformance | ||
| // # Conformance: Implementations MUST be goroutine-safe. | ||
| // | ||
| // # Flow Lifecycle | ||
| // | ||
| // A flow instance, identified by its immutable `types.FlowKey`, has a lease-based lifecycle managed by this interface. | ||
| // Any implementation MUST adhere to this lifecycle: | ||
| // | ||
| // All methods MUST be goroutine-safe. Implementations are expected to perform complex updates (e.g., | ||
| // `RegisterOrUpdateFlow`) atomically. | ||
| // 1. Lease Acquisition: A client calls Connect to acquire a lease. This signals that the flow is in use and protects | ||
| // it from garbage collection. If the flow does not exist, it is created Just-In-Time (JIT). | ||
| // 2. Active State: A flow is "Active" as long as its lease count is greater than zero. | ||
| // 3. Lease Release: The client MUST call `Close()` on the returned `FlowConnection` to release the lease. | ||
| // When the lease count drops to zero, the flow becomes "Idle". | ||
| // 4. Garbage Collection: The implementation MUST automatically garbage collect a flow after it has remained | ||
| // continuously Idle for a configurable duration. | ||
| // | ||
| // # System Invariants | ||
| // | ||
| // Concrete implementations MUST uphold the following invariants: | ||
| // | ||
| // 1. Shard Consistency: All configured priority bands and registered flow instances must exist on every Active shard. | ||
| // Plugin instance types must be consistent for a given flow across all shards. | ||
| // 2. Flow Instance Uniqueness: Each unique `types.FlowKey` (`ID` + `Priority`) corresponds to exactly one managed flow | ||
| // instance. | ||
| // 3. Capacity Partitioning: Global and per-band capacity limits must be uniformly partitioned across all Active | ||
| // 2. Capacity Partitioning: Global and per-band capacity limits must be uniformly partitioned across all Active | ||
| // shards. | ||
| // | ||
| // # Flow Lifecycle | ||
| // | ||
| // A flow instance (identified by its immutable `FlowKey`) has a simple lifecycle: | ||
| // | ||
| // - Registered: Known to the `FlowRegistry` via `RegisterOrUpdateFlow`. | ||
| // - Idle: Queues are empty across all Active and Draining shards. | ||
| // - Garbage Collected (Unregistered): The registry automatically garbage collects flows after they have remained Idle | ||
| // for a configurable duration. | ||
| // | ||
| // # Shard Lifecycle | ||
| // | ||
| // When a shard is decommissioned, it is marked inactive (`IsActive() == false`) to prevent new enqueues. The shard | ||
| // continues to drain and is deleted only after it is empty. | ||
| type FlowRegistry interface { | ||
| FlowRegistryClient | ||
| FlowRegistryAdmin | ||
| ShardProvider | ||
| } | ||
|
|
||
| // FlowRegistryAdmin defines the administrative interface for the global control plane. | ||
| // | ||
| // # Dynamic Update Strategies | ||
| // | ||
| // The contract specifies behaviors for handling dynamic updates, prioritizing stability and correctness: | ||
| // | ||
| // - Immutable Flow Identity (`types.FlowKey`): The system treats the `FlowKey` (`ID` + `Priority`) as the immutable | ||
| // identifier. Changing the priority of traffic requires registering a new `FlowKey`. The old flow instance is | ||
| // automatically garbage collected when Idle. This design eliminates complex priority migration logic. | ||
| // | ||
| // - Graceful Draining (Shard Scale-Down): Decommissioned shards enter a Draining state. They stop accepting new | ||
| // requests but continue to be processed for dispatch until empty. | ||
| // | ||
| // - Self-Balancing (Shard Scale-Up): When new shards are added, the `controller.FlowController`'s distribution logic | ||
| // naturally utilizes them, funneling new requests to the less-loaded shards. Existing queued items are not | ||
| // migrated. | ||
| type FlowRegistryAdmin interface { | ||
| // RegisterOrUpdateFlow handles the registration of a new flow instance or the update of an existing instance's | ||
| // specification (for the same `types.FlowKey`). The operation is atomic across all shards. | ||
| // | ||
| // Since the `FlowKey` (including `Priority`) is immutable, this method cannot change a flow's priority. | ||
| // To change priority, the caller should simply register the new `FlowKey`; the old flow instance will be | ||
| // automatically garbage collected when it becomes Idle. | ||
| // | ||
| // Returns errors wrapping `ErrFlowIDEmpty`, `ErrPriorityBandNotFound`, or internal errors if plugin instantiation | ||
| // fails. | ||
| RegisterOrUpdateFlow(spec types.FlowSpecification) error | ||
|
|
||
| // UpdateShardCount dynamically adjusts the number of internal state shards. | ||
| // | ||
| // The implementation MUST atomically re-partition capacity allocations across all active shards. | ||
| // Returns an error wrapping `ErrInvalidShardCount` if `n` is not positive. | ||
| UpdateShardCount(n int) error | ||
|
|
||
| // Stats returns globally aggregated statistics for the entire `FlowRegistry`. | ||
| Stats() AggregateStats | ||
|
|
||
| // ShardStats returns a slice of statistics, one for each internal shard. This provides visibility for debugging and | ||
| // monitoring per-shard behavior (e.g., identifying hot or stuck shards). | ||
| // ShardStats returns a slice of statistics, one for each internal shard. | ||
| ShardStats() []ShardStats | ||
| } | ||
|
|
||
| // ShardProvider defines the interface for discovering available shards. | ||
| // | ||
| // A "shard" is an internal, parallel execution unit that allows the `controller.FlowController`'s core dispatch logic | ||
| // to be parallelized, preventing a CPU bottleneck at high request rates. The `FlowRegistry`'s state is sharded to | ||
| // support this parallelism by reducing lock contention. | ||
| // FlowRegistryClient defines the primary, client-facing interface for the registry. | ||
| // This is the interface that the `controller.FlowController`'s data path depends upon. | ||
| type FlowRegistryClient interface { | ||
| // WithConnection manages a scoped, leased session for a given flow. | ||
| // It is the primary and sole entry point for interacting with the data path. | ||
| // | ||
| // This method handles the entire lifecycle of a flow connection: | ||
| // 1. Just-In-Time (JIT) Registration: If the flow for the given `types.FlowKey` does not exist, it is created and | ||
| // registered automatically. | ||
| // 2. Lease Acquisition: It acquires a lifecycle lease, protecting the flow from garbage collection. | ||
| // 3. Callback Execution: It invokes the provided function `fn`, passing in a temporary `ActiveFlowConnection` handle. | ||
| // 4. Guaranteed Lease Release: It ensures the lease is safely released when the callback function returns. | ||
| // | ||
| // This functional, callback-based approach makes resource leaks impossible, as the caller is not responsible for | ||
| // manually closing the connection. | ||
| // | ||
| // Errors returned by the callback `fn` are propagated up. | ||
| // Returns `ErrFlowIDEmpty` if the provided key has an empty ID. | ||
| WithConnection(key types.FlowKey, fn func(conn ActiveFlowConnection) error) error | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The alternative here is:
e.g., Also, this method should eventually accept and respect caller context. I am not doing this in this PR as this will need to be done comprehensively across the flow control module contracts. As of right now, none of our function have unbounded blocking, so this is more for contract hardening and tracing than correctness at the moment. I will do this in a separate PR. |
||
| } | ||
|
|
||
| // ActiveFlowConnection represents a handle to a temporary, leased session on a flow. | ||
| // It provides a safe, scoped entry point to the registry's sharded data plane. | ||
| // | ||
| // Consumers MUST check `RegistryShard.IsActive()` before routing new work to a shard to avoid sending requests to a | ||
| // Draining shard. | ||
| type ShardProvider interface { | ||
| // Shards returns a slice of accessors, one for each internal state shard (Active and Draining). | ||
| // Callers should not modify the returned slice. | ||
| // An `ActiveFlowConnection` instance is only valid for the duration of the `WithConnection` callback from which it was | ||
| // received. Callers MUST NOT store a reference to this object or use it after the callback returns. | ||
| // Its purpose is to ensure that any interaction with the flow's state (e.g., accessing its shards and queues) occurs | ||
| // safely while the flow is guaranteed to be protected from garbage collection. | ||
| type ActiveFlowConnection interface { | ||
| // Shards returns a stable snapshot of accessors for all internal state shards (both Active and Draining). | ||
| // Consumers MUST check `RegistryShard.IsActive()` before routing new work to a shard from this slice. | ||
| Shards() []RegistryShard | ||
| } | ||
|
|
||
| // RegistryShard defines the interface for accessing a specific slice (shard) of the `FlowRegistry's` state. | ||
| // It provides a concurrent-safe view for `controller.FlowController` workers. | ||
| // | ||
| // # Conformance | ||
| // RegistryShard defines the interface for a single slice (shard) of the `FlowRegistry`'s state. | ||
| // A shard acts as an independent, parallel execution unit, allowing the system's dispatch logic to scale horizontally. | ||
| // | ||
| // All methods MUST be goroutine-safe. | ||
| // # Conformance: Implementations MUST be goroutine-safe. | ||
| type RegistryShard interface { | ||
| // ID returns a unique identifier for this shard, which must remain stable for the shard's lifetime. | ||
| ID() string | ||
|
|
@@ -163,14 +143,16 @@ type RegistryShard interface { | |
| Stats() ShardStats | ||
| } | ||
|
|
||
| // ManagedQueue defines the interface for a flow's queue instance on a specific shard. | ||
| // It acts as a stateful decorator around an underlying `framework.SafeQueue`. | ||
| // ManagedQueue defines the interface for a flow's queue on a specific shard. | ||
| // It acts as a stateful decorator around an underlying `framework.SafeQueue`, augmenting it with statistics tracking. | ||
| // | ||
| // # Conformance | ||
| // | ||
| // - All methods MUST be goroutine-safe. | ||
| // - All mutating methods (`Add()`, `Remove()`, etc.) MUST ensure that the underlying queue state and the statistics | ||
| // (`Len`, `ByteSize`) are updated atomically relative to each other. | ||
| // - Implementations MUST be goroutine-safe. | ||
| // - All mutating methods MUST ensure that the underlying queue state and the public statistics (`Len`, `ByteSize`) | ||
| // are updated as a single atomic transaction. | ||
| // - The `Add` method MUST return an error wrapping `ErrShardDraining` if the queue instance belongs to a parent shard | ||
| // that is no longer Active. | ||
| type ManagedQueue interface { | ||
| framework.SafeQueue | ||
|
|
||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can simply be in |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /* | ||
| Copyright 2025 The Kubernetes Authors. | ||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package registry | ||
|
|
||
| import ( | ||
| "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" | ||
| "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" | ||
| ) | ||
|
|
||
| // connection is the concrete, un-exported implementation of the `contracts.ActiveFlowConnection` interface. | ||
| // It is a temporary handle created for the duration of a single `WithConnection` call. | ||
| type connection struct { | ||
| registry *FlowRegistry | ||
| key types.FlowKey | ||
| } | ||
|
|
||
| var _ contracts.ActiveFlowConnection = &connection{} | ||
|
|
||
| // Shards returns a stable snapshot of accessors for all internal state shards. | ||
| func (c *connection) Shards() []contracts.RegistryShard { | ||
| c.registry.mu.RLock() | ||
| defer c.registry.mu.RUnlock() | ||
|
|
||
| // Return a copy to ensure the caller cannot modify the registry's internal slice. | ||
| shardsCopy := make([]contracts.RegistryShard, len(c.registry.allShards)) | ||
| for i, s := range c.registry.allShards { | ||
| shardsCopy[i] = s | ||
| } | ||
| return shardsCopy | ||
| } |
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dramatically shortened for doc maintainability (plus a lot of it became obsolete with the new lease-based model). Following principle of first disclosure by moving details closer to the relevant types. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This becomes the responsibility of the
controller.FlowControllerwhich distributes requests across workers.