-
Notifications
You must be signed in to change notification settings - Fork 185
refactor(registry): Replace event-driven GC with a lease-based lifecycle #1476
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
66d8e28
ef56ab4
e77cab5
614d7c5
da9490d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,74 +30,72 @@ import ( | |
| "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" | ||
| ) | ||
|
|
||
| // shardCallbacks groups the callback functions that a `registryShard` uses to communicate with its parent registry. | ||
| type shardCallbacks struct { | ||
| propagateStatsDelta propagateStatsDeltaFunc | ||
| signalQueueState func(shardID string, key types.FlowKey, signal queueStateSignal) | ||
| signalShardState signalShardStateFunc | ||
| } | ||
|
|
||
| // priorityBand holds all the `managedQueues` and configuration for a single priority level within a shard. | ||
| // priorityBand holds all `managedQueues` and configuration for a single priority level within a shard. | ||
| type priorityBand struct { | ||
| // config holds the partitioned config for this specific band within this shard. | ||
| config ShardPriorityBandConfig | ||
| // --- Immutable (set at construction) --- | ||
| config ShardPriorityBandConfig | ||
| interFlowDispatchPolicy framework.InterFlowDispatchPolicy | ||
|
|
||
| // --- State Protected by the parent shard's `mu` --- | ||
|
|
||
| // queues holds all `managedQueue` instances within this band, keyed by their logical `ID` string. | ||
| // The priority is implicit from the parent `priorityBand`. | ||
| queues map[string]*managedQueue | ||
|
|
||
| // Band-level statistics. Updated atomically via lock-free propagation. | ||
| // --- Concurrent-Safe State (Atomics) --- | ||
|
|
||
| // Band-level statistics, updated via lock-free propagation from child queues. | ||
| byteSize atomic.Int64 | ||
| len atomic.Int64 | ||
|
|
||
| // Cached policy instance for this band, created at initialization. | ||
| interFlowDispatchPolicy framework.InterFlowDispatchPolicy | ||
| } | ||
|
|
||
| // registryShard implements the `contracts.RegistryShard` interface. | ||
| // | ||
| // # Role: The Data Plane Slice | ||
| // | ||
| // It represents a single, concurrent-safe slice of the registry's total state. It provides a read-optimized view for a | ||
| // `controller.FlowController` worker. | ||
| // It represents a single, concurrent-safe slice of the registry's total state, acting as an independent, parallel | ||
| // execution unit. It provides a read-optimized view for a `controller.FlowController` worker, partitioning the overall | ||
| // system state to enable horizontal scalability. | ||
| // | ||
| // # Concurrency: `RWMutex` and Atomics | ||
| // # Concurrency Model: `RWMutex` for Topology, Atomics for Stats | ||
| // | ||
| // The `registryShard` balances read performance with write safety: | ||
| // The `registryShard` balances read performance with write safety using a hybrid model: | ||
| // | ||
| // - `sync.RWMutex` (mu): Protects the shard's internal maps (`priorityBands`) during administrative operations. | ||
| // | ||
| // - Atomics (Stats): Aggregated statistics (`totalByteSize`, `totalLen`) use atomics for lock-free updates during | ||
| // delta propagation. | ||
| // | ||
| // - Atomic Lifecycle (Status): The lifecycle state is managed via an atomic `status` enum. | ||
| // - `mu (sync.RWMutex)`: Protects the shard's internal topology (the maps of priority bands and queues) during | ||
| // administrative operations like flow registration, garbage collection, and configuration updates. | ||
| // Read locks are used on the hot path to look up queues, while write locks are used for infrequent structural | ||
| // changes. | ||
| // - Atomics: Aggregated statistics (`totalByteSize`, `totalLen`, etc.) and the `isDraining` flag use atomic | ||
| // operations, allowing for high-frequency, lock-free updates and reads of the shard's status and load, which is | ||
| // critical for the performance of the data path and statistics propagation. | ||
| type registryShard struct { | ||
| // --- Immutable Identity & Dependencies (set at construction) --- | ||
| id string | ||
| logger logr.Logger | ||
|
|
||
| // config holds the partitioned configuration for this shard, derived from the `FlowRegistry`'s global `Config`. | ||
| // It contains only the settings and capacity limits relevant to this specific shard. | ||
| config *ShardConfig | ||
|
|
||
| // status tracks the lifecycle state of the shard (Active, Draining, Drained). | ||
| // It is stored as an `int32` for atomic operations. | ||
| status atomic.Int32 // `componentStatus` | ||
| // onStatsDelta is the callback used to propagate statistics changes up to the parent registry. | ||
| onStatsDelta propagateStatsDeltaFunc | ||
| // orderedPriorityLevels is a cached, sorted list of priority levels. | ||
| orderedPriorityLevels []uint | ||
|
|
||
| // parentCallbacks provides the communication channels back to the parent registry. | ||
| parentCallbacks shardCallbacks | ||
| // --- State Protected by `mu` --- | ||
|
|
||
| // mu protects the shard's internal maps (`priorityBands`). | ||
| // mu protects the shard's internal topology (`priorityBands`) and `config`. | ||
| // TODO: This is a priority inversion issue. Administrative operations (e.g., GC) for a low-priority flow block all | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will tackle this in a fast-followup PR once the flow control system is wired up and benchmarked. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A per band lock makes sense, but it is not clear to me if this should be a "fast" follow; once we have the system working end-to-end, I recommend listing all followup items and then prioritize them. |
||
| // data path operations for high priority flows on this shard. We should replace `s.mu` with granular per-band locks. | ||
| // This is safe since the priority band map structure is immutable at initialization. | ||
| mu sync.RWMutex | ||
|
|
||
| // priorityBands is the primary lookup table for all managed queues on this shard, organized by `priority`. | ||
| // config holds the partitioned configuration for this shard, derived from the `FlowRegistry`'s global `Config`. | ||
| config *ShardConfig | ||
| // priorityBands is the primary lookup table for all managed queues on this shard. | ||
| priorityBands map[uint]*priorityBand | ||
|
|
||
| // orderedPriorityLevels is a cached, sorted list of `priority` levels. | ||
| // It is populated at initialization to avoid repeated map key iteration and sorting during the dispatch loop, | ||
| // ensuring a deterministic, ordered traversal from highest to lowest priority. | ||
| orderedPriorityLevels []uint | ||
| // --- Concurrent-Safe State (Atomics) --- | ||
|
|
||
| // isDraining indicates if the shard is gracefully shutting down. | ||
| isDraining atomic.Bool | ||
|
|
||
| // Shard-level statistics. Updated atomically via lock-free propagation. | ||
| // Shard-level statistics, updated via lock-free propagation from child queues. | ||
| totalByteSize atomic.Int64 | ||
| totalLen atomic.Int64 | ||
| } | ||
|
|
@@ -109,18 +107,17 @@ func newShard( | |
| id string, | ||
| config *ShardConfig, | ||
| logger logr.Logger, | ||
| parentCallbacks shardCallbacks, | ||
| onStatsDelta propagateStatsDeltaFunc, | ||
| interFlowFactory interFlowDispatchPolicyFactory, | ||
| ) (*registryShard, error) { | ||
| shardLogger := logger.WithName("registry-shard").WithValues("shardID", id) | ||
| s := ®istryShard{ | ||
| id: id, | ||
| logger: shardLogger, | ||
| config: config, | ||
| parentCallbacks: parentCallbacks, | ||
| priorityBands: make(map[uint]*priorityBand, len(config.PriorityBands)), | ||
| id: id, | ||
| logger: shardLogger, | ||
| config: config, | ||
| onStatsDelta: onStatsDelta, | ||
| priorityBands: make(map[uint]*priorityBand, len(config.PriorityBands)), | ||
| } | ||
| s.status.Store(int32(componentStatusActive)) | ||
|
|
||
| for _, bandConfig := range config.PriorityBands { | ||
| interPolicy, err := interFlowFactory(bandConfig.InterFlowDispatchPolicy) | ||
|
|
@@ -147,16 +144,25 @@ func newShard( | |
| func (s *registryShard) ID() string { return s.id } | ||
|
|
||
| // IsActive returns true if the shard is active and accepting new requests. | ||
| // This is used by the `controller.FlowController` to determine if it should use this shard for new enqueue operations. | ||
| // This is a lock-free read, making it efficient for the hot path. | ||
| func (s *registryShard) IsActive() bool { | ||
| return componentStatus(s.status.Load()) == componentStatusActive | ||
| return !s.isDraining.Load() | ||
| } | ||
|
|
||
| // ManagedQueue retrieves a specific `contracts.ManagedQueue` instance from this shard. | ||
| func (s *registryShard) ManagedQueue(key types.FlowKey) (contracts.ManagedQueue, error) { | ||
| s.mu.RLock() | ||
| defer s.mu.RUnlock() | ||
| return s.managedQueueLocked(key) | ||
|
|
||
| band, ok := s.priorityBands[key.Priority] | ||
| if !ok { | ||
| return nil, fmt.Errorf("failed to get managed queue for flow %q: %w", key, contracts.ErrPriorityBandNotFound) | ||
| } | ||
| mq, ok := band.queues[key.ID] | ||
| if !ok { | ||
| return nil, fmt.Errorf("failed to get managed queue for flow %q: %w", key, contracts.ErrFlowInstanceNotFound) | ||
| } | ||
| return mq, nil | ||
| } | ||
|
|
||
| // IntraFlowDispatchPolicy retrieves a flow's configured `framework.IntraFlowDispatchPolicy`. | ||
|
|
@@ -172,13 +178,14 @@ func (s *registryShard) IntraFlowDispatchPolicy(key types.FlowKey) (framework.In | |
| if !ok { | ||
| return nil, fmt.Errorf("failed to get intra-flow policy for flow %q: %w", key, contracts.ErrFlowInstanceNotFound) | ||
| } | ||
| // The policy is stored on the `managedQueue`. | ||
| // The policy is stored on the `managedQueue` and is immutable after creation. | ||
| return mq.dispatchPolicy, nil | ||
| } | ||
|
|
||
| // InterFlowDispatchPolicy retrieves a priority band's configured `framework.InterFlowDispatchPolicy`. | ||
| // This read is lock-free as the policy instance is immutable after the shard is initialized. | ||
| func (s *registryShard) InterFlowDispatchPolicy(priority uint) (framework.InterFlowDispatchPolicy, error) { | ||
| // This read is safe because the `priorityBands` map structure is immutable after initialization. | ||
| band, ok := s.priorityBands[priority] | ||
| if !ok { | ||
| return nil, fmt.Errorf("failed to get inter-flow policy for priority %d: %w", | ||
|
|
@@ -201,7 +208,7 @@ func (s *registryShard) PriorityBandAccessor(priority uint) (framework.PriorityB | |
| } | ||
|
|
||
| // AllOrderedPriorityLevels returns a cached, sorted slice of all configured priority levels for this shard. | ||
| // The slice is sorted from highest to lowest priority (ascending numerical order). | ||
| // This is a lock-free read. | ||
| func (s *registryShard) AllOrderedPriorityLevels() []uint { | ||
| return s.orderedPriorityLevels | ||
| } | ||
|
|
@@ -241,8 +248,7 @@ func (s *registryShard) Stats() contracts.ShardStats { | |
| // --- Internal Administrative/Lifecycle Methods (called by `FlowRegistry`) --- | ||
|
|
||
| // synchronizeFlow is the internal administrative method for creating a flow instance on this shard. | ||
| // Since a flow instance (identified by its immutable `FlowKey`) cannot be updated yet, this function is a simple | ||
| // "create if not exists" operation. It is idempotent. | ||
| // It is an idempotent "create if not exists" operation. | ||
| func (s *registryShard) synchronizeFlow( | ||
| spec types.FlowSpecification, | ||
| policy framework.IntraFlowDispatchPolicy, | ||
|
|
@@ -267,58 +273,31 @@ func (s *registryShard) synchronizeFlow( | |
| s.logger.V(logging.TRACE).Info("Creating new queue for flow instance.", | ||
| "flowKey", key, "queueType", q.Name()) | ||
|
|
||
| callbacks := managedQueueCallbacks{ | ||
| propagateStatsDelta: s.propagateStatsDelta, | ||
| signalQueueState: func(key types.FlowKey, signal queueStateSignal) { | ||
| s.parentCallbacks.signalQueueState(s.id, key, signal) | ||
| }, | ||
| // Create a closure that captures the shard's `isDraining` atomic field. | ||
| // This provides the queue with a way to check the shard's status without creating a tight coupling or circular | ||
| // dependency. | ||
| isDrainingFunc := func() bool { | ||
| return s.isDraining.Load() | ||
| } | ||
| mq := newManagedQueue(q, policy, spec.Key, s.logger, callbacks) | ||
|
|
||
| mq := newManagedQueue(q, policy, spec.Key, s.logger, s.propagateStatsDelta, isDrainingFunc) | ||
| band.queues[key.ID] = mq | ||
| } | ||
|
|
||
| // garbageCollectLocked removes a queue instance from the shard. | ||
| // This must be called under the shard's write lock. | ||
| func (s *registryShard) garbageCollectLocked(key types.FlowKey) { | ||
| s.logger.Info("Garbage collecting queue instance.", "flowKey", key, "flowID", key.ID, "priority", key.Priority) | ||
| // deleteFlowLocked removes a queue instance from the shard. | ||
| func (s *registryShard) deleteFlow(key types.FlowKey) { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
| s.logger.Info("Deleting queue instance.", "flowKey", key, "flowID", key.ID, "priority", key.Priority) | ||
| if band, ok := s.priorityBands[key.Priority]; ok { | ||
| delete(band.queues, key.ID) | ||
| } | ||
| } | ||
|
|
||
| // markAsDraining transitions the shard to a Draining state. This method is lock-free. | ||
| func (s *registryShard) markAsDraining() { | ||
| // Attempt to transition from Active to Draining atomically. | ||
| if s.status.CompareAndSwap(int32(componentStatusActive), int32(componentStatusDraining)) { | ||
| s.logger.V(logging.DEBUG).Info("Shard status changed", | ||
| "from", componentStatusActive, | ||
| "to", componentStatusDraining, | ||
| ) | ||
| } | ||
|
|
||
| // Check if the shard is *already* empty when marked as draining. If so, immediately attempt the transition to | ||
| // Drained to ensure timely GC. This handles the race where the shard becomes empty just before or during being | ||
| // marked Draining. | ||
| if s.totalLen.Load() == 0 { | ||
| // Attempt to transition from Draining to Drained atomically. | ||
| if s.status.CompareAndSwap(int32(componentStatusDraining), int32(componentStatusDrained)) { | ||
| s.parentCallbacks.signalShardState(s.id, shardStateSignalBecameDrained) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // managedQueueLocked retrieves a specific `contracts.ManagedQueue` instance from this shard. | ||
| // This must be called under the shard's read lock. | ||
| func (s *registryShard) managedQueueLocked(key types.FlowKey) (*managedQueue, error) { | ||
| band, ok := s.priorityBands[key.Priority] | ||
| if !ok { | ||
| return nil, fmt.Errorf("failed to get managed queue for flow %q: %w", key, contracts.ErrPriorityBandNotFound) | ||
| } | ||
| mq, ok := band.queues[key.ID] | ||
| if !ok { | ||
| return nil, fmt.Errorf("failed to get managed queue for flow %q: %w", key, contracts.ErrFlowInstanceNotFound) | ||
| } | ||
| return mq, nil | ||
| s.isDraining.Store(true) | ||
| s.logger.V(logging.DEBUG).Info("Shard marked as Draining") | ||
| } | ||
|
|
||
| // updateConfig atomically replaces the shard's configuration. This is used during scaling events to re-partition | ||
|
|
@@ -333,7 +312,6 @@ func (s *registryShard) updateConfig(newConfig *ShardConfig) { | |
| newBandConfig, err := newConfig.getBandConfig(priority) | ||
| if err != nil { | ||
| // An invariant was violated: a priority exists in the shard but not in the new config. | ||
| // This should be impossible if the registry's logic is correct. | ||
| panic(fmt.Errorf("invariant violation: priority band (%d) missing in new shard configuration during update: %w", | ||
| priority, err)) | ||
| } | ||
|
|
@@ -342,14 +320,13 @@ func (s *registryShard) updateConfig(newConfig *ShardConfig) { | |
| s.logger.Info("Shard configuration updated") | ||
| } | ||
|
|
||
| // --- Internal Callback Methods --- | ||
| // --- Internal Callback --- | ||
|
|
||
| // propagateStatsDelta is the single point of entry for all statistics changes within the shard. | ||
| // It updates the relevant band's stats, the shard's total stats, and handles the shard's lifecycle signaling before | ||
| // propagating the delta to the parent registry. | ||
| // It uses atomic operations to maintain high performance under concurrent updates from multiple shards. | ||
| // As a result, its counters are eventually consistent and may be transiently inaccurate during high-contention races. | ||
| // It atomically updates the relevant band's stats, the shard's total stats, and propagates the delta to the parent | ||
| // registry. | ||
| func (s *registryShard) propagateStatsDelta(priority uint, lenDelta, byteSizeDelta int64) { | ||
| // This read is safe because the `priorityBands` map structure is immutable after initialization. | ||
| band, ok := s.priorityBands[priority] | ||
| if !ok { | ||
| // This should be impossible if the `managedQueue` calling this is correctly registered. | ||
|
|
@@ -359,24 +336,11 @@ func (s *registryShard) propagateStatsDelta(priority uint, lenDelta, byteSizeDel | |
|
|
||
| band.len.Add(lenDelta) | ||
| band.byteSize.Add(byteSizeDelta) | ||
| newTotalLen := s.totalLen.Add(lenDelta) | ||
| s.totalLen.Add(lenDelta) | ||
| s.totalByteSize.Add(byteSizeDelta) | ||
|
|
||
| // Following the strict bottom-up signaling pattern, we evaluate and signal our own state change *before* propagating | ||
| // the statistics to the parent registry. | ||
| s.evaluateDrainingState(newTotalLen) | ||
| s.parentCallbacks.propagateStatsDelta(priority, lenDelta, byteSizeDelta) | ||
| } | ||
|
|
||
| // evaluateDrainingState checks if the shard has transitioned to the Drained state and signals the parent. | ||
| func (s *registryShard) evaluateDrainingState(currentLen int64) { | ||
| if currentLen == 0 { | ||
| // Attempt transition from Draining to Drained atomically. | ||
| // This acts as the exactly-once latch. If it succeeds, this goroutine is solely responsible for signaling. | ||
| if s.status.CompareAndSwap(int32(componentStatusDraining), int32(componentStatusDrained)) { | ||
| s.parentCallbacks.signalShardState(s.id, shardStateSignalBecameDrained) | ||
| } | ||
| } | ||
| // Propagate the delta up to the parent registry. This propagation is lock-free and eventually consistent. | ||
| s.onStatsDelta(priority, lenDelta, byteSizeDelta) | ||
| } | ||
|
|
||
| // --- `priorityBandAccessor` --- | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flattened (and renamed) from the now-removed
parentCallbacks.