Skip to content

Commit e77cab5

Browse files
committed
refactor: Adapt shard to new managedQueue contract
Updates the `registryShard` to work with the new, signal-free `managedQueue`. The shard's responsibility is simplified; it no longer needs to handle or propagate lifecycle signals from its queues. Its draining logic is now based on directly checking its aggregate length, removing the need for the `BecameDrained` signal.
1 parent ef56ab4 commit e77cab5

File tree

1 file changed

+82
-118
lines changed

1 file changed

+82
-118
lines changed

pkg/epp/flowcontrol/registry/shard.go

Lines changed: 82 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -30,74 +30,72 @@ import (
3030
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3131
)
3232

33-
// shardCallbacks groups the callback functions that a `registryShard` uses to communicate with its parent registry.
34-
type shardCallbacks struct {
35-
propagateStatsDelta propagateStatsDeltaFunc
36-
signalQueueState func(shardID string, key types.FlowKey, signal queueStateSignal)
37-
signalShardState signalShardStateFunc
38-
}
39-
40-
// priorityBand holds all the `managedQueues` and configuration for a single priority level within a shard.
33+
// priorityBand holds all `managedQueues` and configuration for a single priority level within a shard.
4134
type priorityBand struct {
42-
// config holds the partitioned config for this specific band within this shard.
43-
config ShardPriorityBandConfig
35+
// --- Immutable (set at construction) ---
36+
config ShardPriorityBandConfig
37+
interFlowDispatchPolicy framework.InterFlowDispatchPolicy
38+
39+
// --- State Protected by the parent shard's `mu` ---
4440

4541
// queues holds all `managedQueue` instances within this band, keyed by their logical `ID` string.
4642
// The priority is implicit from the parent `priorityBand`.
4743
queues map[string]*managedQueue
4844

49-
// Band-level statistics. Updated atomically via lock-free propagation.
45+
// --- Concurrent-Safe State (Atomics) ---
46+
47+
// Band-level statistics, updated via lock-free propagation from child queues.
5048
byteSize atomic.Int64
5149
len atomic.Int64
52-
53-
// Cached policy instance for this band, created at initialization.
54-
interFlowDispatchPolicy framework.InterFlowDispatchPolicy
5550
}
5651

5752
// registryShard implements the `contracts.RegistryShard` interface.
5853
//
5954
// # Role: The Data Plane Slice
6055
//
61-
// It represents a single, concurrent-safe slice of the registry's total state. It provides a read-optimized view for a
62-
// `controller.FlowController` worker.
56+
// It represents a single, concurrent-safe slice of the registry's total state, acting as an independent, parallel
57+
// execution unit. It provides a read-optimized view for a `controller.FlowController` worker, partitioning the overall
58+
// system state to enable horizontal scalability.
6359
//
64-
// # Concurrency: `RWMutex` and Atomics
60+
// # Concurrency Model: `RWMutex` for Topology, Atomics for Stats
6561
//
66-
// The `registryShard` balances read performance with write safety:
62+
// The `registryShard` balances read performance with write safety using a hybrid model:
6763
//
68-
// - `sync.RWMutex` (mu): Protects the shard's internal maps (`priorityBands`) during administrative operations.
69-
//
70-
// - Atomics (Stats): Aggregated statistics (`totalByteSize`, `totalLen`) use atomics for lock-free updates during
71-
// delta propagation.
72-
//
73-
// - Atomic Lifecycle (Status): The lifecycle state is managed via an atomic `status` enum.
64+
// - `mu (sync.RWMutex)`: Protects the shard's internal topology (the maps of priority bands and queues) during
65+
// administrative operations like flow registration, garbage collection, and configuration updates.
66+
// Read locks are used on the hot path to look up queues, while write locks are used for infrequent structural
67+
// changes.
68+
// - Atomics: Aggregated statistics (`totalByteSize`, `totalLen`, etc.) and the `isDraining` flag use atomic
69+
// operations, allowing for high-frequency, lock-free updates and reads of the shard's status and load, which is
70+
// critical for the performance of the data path and statistics propagation.
7471
type registryShard struct {
72+
// --- Immutable Identity & Dependencies (set at construction) ---
7573
id string
7674
logger logr.Logger
7775

78-
// config holds the partitioned configuration for this shard, derived from the `FlowRegistry`'s global `Config`.
79-
// It contains only the settings and capacity limits relevant to this specific shard.
80-
config *ShardConfig
81-
82-
// status tracks the lifecycle state of the shard (Active, Draining, Drained).
83-
// It is stored as an `int32` for atomic operations.
84-
status atomic.Int32 // `componentStatus`
76+
// onStatsDelta is the callback used to propagate statistics changes up to the parent registry.
77+
onStatsDelta propagateStatsDeltaFunc
78+
// orderedPriorityLevels is a cached, sorted list of priority levels.
79+
orderedPriorityLevels []uint
8580

86-
// parentCallbacks provides the communication channels back to the parent registry.
87-
parentCallbacks shardCallbacks
81+
// --- State Protected by `mu` ---
8882

89-
// mu protects the shard's internal maps (`priorityBands`).
83+
// mu protects the shard's internal topology (`priorityBands`) and `config`.
84+
// TODO: This is a priority inversion issue. Administrative operations (e.g., GC) for a low-priority flow block all
85+
// data path operations for high priority flows on this shard. We should replace `s.mu` with granular per-band locks.
86+
// This is safe since the priority band map structure is immutable at initialization.
9087
mu sync.RWMutex
91-
92-
// priorityBands is the primary lookup table for all managed queues on this shard, organized by `priority`.
88+
// config holds the partitioned configuration for this shard, derived from the `FlowRegistry`'s global `Config`.
89+
config *ShardConfig
90+
// priorityBands is the primary lookup table for all managed queues on this shard.
9391
priorityBands map[uint]*priorityBand
9492

95-
// orderedPriorityLevels is a cached, sorted list of `priority` levels.
96-
// It is populated at initialization to avoid repeated map key iteration and sorting during the dispatch loop,
97-
// ensuring a deterministic, ordered traversal from highest to lowest priority.
98-
orderedPriorityLevels []uint
93+
// --- Concurrent-Safe State (Atomics) ---
94+
95+
// isDraining indicates if the shard is gracefully shutting down.
96+
isDraining atomic.Bool
9997

100-
// Shard-level statistics. Updated atomically via lock-free propagation.
98+
// Shard-level statistics, updated via lock-free propagation from child queues.
10199
totalByteSize atomic.Int64
102100
totalLen atomic.Int64
103101
}
@@ -109,18 +107,17 @@ func newShard(
109107
id string,
110108
config *ShardConfig,
111109
logger logr.Logger,
112-
parentCallbacks shardCallbacks,
110+
onStatsDelta propagateStatsDeltaFunc,
113111
interFlowFactory interFlowDispatchPolicyFactory,
114112
) (*registryShard, error) {
115113
shardLogger := logger.WithName("registry-shard").WithValues("shardID", id)
116114
s := &registryShard{
117-
id: id,
118-
logger: shardLogger,
119-
config: config,
120-
parentCallbacks: parentCallbacks,
121-
priorityBands: make(map[uint]*priorityBand, len(config.PriorityBands)),
115+
id: id,
116+
logger: shardLogger,
117+
config: config,
118+
onStatsDelta: onStatsDelta,
119+
priorityBands: make(map[uint]*priorityBand, len(config.PriorityBands)),
122120
}
123-
s.status.Store(int32(componentStatusActive))
124121

125122
for _, bandConfig := range config.PriorityBands {
126123
interPolicy, err := interFlowFactory(bandConfig.InterFlowDispatchPolicy)
@@ -147,16 +144,25 @@ func newShard(
147144
func (s *registryShard) ID() string { return s.id }
148145

149146
// IsActive returns true if the shard is active and accepting new requests.
150-
// This is used by the `controller.FlowController` to determine if it should use this shard for new enqueue operations.
147+
// This is a lock-free read, making it efficient for the hot path.
151148
func (s *registryShard) IsActive() bool {
152-
return componentStatus(s.status.Load()) == componentStatusActive
149+
return !s.isDraining.Load()
153150
}
154151

155152
// ManagedQueue retrieves a specific `contracts.ManagedQueue` instance from this shard.
156153
func (s *registryShard) ManagedQueue(key types.FlowKey) (contracts.ManagedQueue, error) {
157154
s.mu.RLock()
158155
defer s.mu.RUnlock()
159-
return s.managedQueueLocked(key)
156+
157+
band, ok := s.priorityBands[key.Priority]
158+
if !ok {
159+
return nil, fmt.Errorf("failed to get managed queue for flow %q: %w", key, contracts.ErrPriorityBandNotFound)
160+
}
161+
mq, ok := band.queues[key.ID]
162+
if !ok {
163+
return nil, fmt.Errorf("failed to get managed queue for flow %q: %w", key, contracts.ErrFlowInstanceNotFound)
164+
}
165+
return mq, nil
160166
}
161167

162168
// IntraFlowDispatchPolicy retrieves a flow's configured `framework.IntraFlowDispatchPolicy`.
@@ -172,13 +178,14 @@ func (s *registryShard) IntraFlowDispatchPolicy(key types.FlowKey) (framework.In
172178
if !ok {
173179
return nil, fmt.Errorf("failed to get intra-flow policy for flow %q: %w", key, contracts.ErrFlowInstanceNotFound)
174180
}
175-
// The policy is stored on the `managedQueue`.
181+
// The policy is stored on the `managedQueue` and is immutable after creation.
176182
return mq.dispatchPolicy, nil
177183
}
178184

179185
// InterFlowDispatchPolicy retrieves a priority band's configured `framework.InterFlowDispatchPolicy`.
180186
// This read is lock-free as the policy instance is immutable after the shard is initialized.
181187
func (s *registryShard) InterFlowDispatchPolicy(priority uint) (framework.InterFlowDispatchPolicy, error) {
188+
// This read is safe because the `priorityBands` map structure is immutable after initialization.
182189
band, ok := s.priorityBands[priority]
183190
if !ok {
184191
return nil, fmt.Errorf("failed to get inter-flow policy for priority %d: %w",
@@ -201,7 +208,7 @@ func (s *registryShard) PriorityBandAccessor(priority uint) (framework.PriorityB
201208
}
202209

203210
// AllOrderedPriorityLevels returns a cached, sorted slice of all configured priority levels for this shard.
204-
// The slice is sorted from highest to lowest priority (ascending numerical order).
211+
// This is a lock-free read.
205212
func (s *registryShard) AllOrderedPriorityLevels() []uint {
206213
return s.orderedPriorityLevels
207214
}
@@ -241,8 +248,7 @@ func (s *registryShard) Stats() contracts.ShardStats {
241248
// --- Internal Administrative/Lifecycle Methods (called by `FlowRegistry`) ---
242249

243250
// synchronizeFlow is the internal administrative method for creating a flow instance on this shard.
244-
// Since a flow instance (identified by its immutable `FlowKey`) cannot be updated yet, this function is a simple
245-
// "create if not exists" operation. It is idempotent.
251+
// It is an idempotent "create if not exists" operation.
246252
func (s *registryShard) synchronizeFlow(
247253
spec types.FlowSpecification,
248254
policy framework.IntraFlowDispatchPolicy,
@@ -267,58 +273,31 @@ func (s *registryShard) synchronizeFlow(
267273
s.logger.V(logging.TRACE).Info("Creating new queue for flow instance.",
268274
"flowKey", key, "queueType", q.Name())
269275

270-
callbacks := managedQueueCallbacks{
271-
propagateStatsDelta: s.propagateStatsDelta,
272-
signalQueueState: func(key types.FlowKey, signal queueStateSignal) {
273-
s.parentCallbacks.signalQueueState(s.id, key, signal)
274-
},
276+
// Create a closure that captures the shard's `isDraining` atomic field.
277+
// This provides the queue with a way to check the shard's status without creating a tight coupling or circular
278+
// dependency.
279+
isDrainingFunc := func() bool {
280+
return s.isDraining.Load()
275281
}
276-
mq := newManagedQueue(q, policy, spec.Key, s.logger, callbacks)
282+
283+
mq := newManagedQueue(q, policy, spec.Key, s.logger, s.propagateStatsDelta, isDrainingFunc)
277284
band.queues[key.ID] = mq
278285
}
279286

280-
// garbageCollectLocked removes a queue instance from the shard.
281-
// This must be called under the shard's write lock.
282-
func (s *registryShard) garbageCollectLocked(key types.FlowKey) {
283-
s.logger.Info("Garbage collecting queue instance.", "flowKey", key, "flowID", key.ID, "priority", key.Priority)
287+
// deleteFlowLocked removes a queue instance from the shard.
288+
func (s *registryShard) deleteFlow(key types.FlowKey) {
289+
s.mu.Lock()
290+
defer s.mu.Unlock()
291+
s.logger.Info("Deleting queue instance.", "flowKey", key, "flowID", key.ID, "priority", key.Priority)
284292
if band, ok := s.priorityBands[key.Priority]; ok {
285293
delete(band.queues, key.ID)
286294
}
287295
}
288296

289297
// markAsDraining transitions the shard to a Draining state. This method is lock-free.
290298
func (s *registryShard) markAsDraining() {
291-
// Attempt to transition from Active to Draining atomically.
292-
if s.status.CompareAndSwap(int32(componentStatusActive), int32(componentStatusDraining)) {
293-
s.logger.V(logging.DEBUG).Info("Shard status changed",
294-
"from", componentStatusActive,
295-
"to", componentStatusDraining,
296-
)
297-
}
298-
299-
// Check if the shard is *already* empty when marked as draining. If so, immediately attempt the transition to
300-
// Drained to ensure timely GC. This handles the race where the shard becomes empty just before or during being
301-
// marked Draining.
302-
if s.totalLen.Load() == 0 {
303-
// Attempt to transition from Draining to Drained atomically.
304-
if s.status.CompareAndSwap(int32(componentStatusDraining), int32(componentStatusDrained)) {
305-
s.parentCallbacks.signalShardState(s.id, shardStateSignalBecameDrained)
306-
}
307-
}
308-
}
309-
310-
// managedQueueLocked retrieves a specific `contracts.ManagedQueue` instance from this shard.
311-
// This must be called under the shard's read lock.
312-
func (s *registryShard) managedQueueLocked(key types.FlowKey) (*managedQueue, error) {
313-
band, ok := s.priorityBands[key.Priority]
314-
if !ok {
315-
return nil, fmt.Errorf("failed to get managed queue for flow %q: %w", key, contracts.ErrPriorityBandNotFound)
316-
}
317-
mq, ok := band.queues[key.ID]
318-
if !ok {
319-
return nil, fmt.Errorf("failed to get managed queue for flow %q: %w", key, contracts.ErrFlowInstanceNotFound)
320-
}
321-
return mq, nil
299+
s.isDraining.Store(true)
300+
s.logger.V(logging.DEBUG).Info("Shard marked as Draining")
322301
}
323302

324303
// updateConfig atomically replaces the shard's configuration. This is used during scaling events to re-partition
@@ -333,7 +312,6 @@ func (s *registryShard) updateConfig(newConfig *ShardConfig) {
333312
newBandConfig, err := newConfig.getBandConfig(priority)
334313
if err != nil {
335314
// An invariant was violated: a priority exists in the shard but not in the new config.
336-
// This should be impossible if the registry's logic is correct.
337315
panic(fmt.Errorf("invariant violation: priority band (%d) missing in new shard configuration during update: %w",
338316
priority, err))
339317
}
@@ -342,14 +320,13 @@ func (s *registryShard) updateConfig(newConfig *ShardConfig) {
342320
s.logger.Info("Shard configuration updated")
343321
}
344322

345-
// --- Internal Callback Methods ---
323+
// --- Internal Callback ---
346324

347325
// propagateStatsDelta is the single point of entry for all statistics changes within the shard.
348-
// It updates the relevant band's stats, the shard's total stats, and handles the shard's lifecycle signaling before
349-
// propagating the delta to the parent registry.
350-
// It uses atomic operations to maintain high performance under concurrent updates from multiple shards.
351-
// As a result, its counters are eventually consistent and may be transiently inaccurate during high-contention races.
326+
// It atomically updates the relevant band's stats, the shard's total stats, and propagates the delta to the parent
327+
// registry.
352328
func (s *registryShard) propagateStatsDelta(priority uint, lenDelta, byteSizeDelta int64) {
329+
// This read is safe because the `priorityBands` map structure is immutable after initialization.
353330
band, ok := s.priorityBands[priority]
354331
if !ok {
355332
// This should be impossible if the `managedQueue` calling this is correctly registered.
@@ -359,24 +336,11 @@ func (s *registryShard) propagateStatsDelta(priority uint, lenDelta, byteSizeDel
359336

360337
band.len.Add(lenDelta)
361338
band.byteSize.Add(byteSizeDelta)
362-
newTotalLen := s.totalLen.Add(lenDelta)
339+
s.totalLen.Add(lenDelta)
363340
s.totalByteSize.Add(byteSizeDelta)
364341

365-
// Following the strict bottom-up signaling pattern, we evaluate and signal our own state change *before* propagating
366-
// the statistics to the parent registry.
367-
s.evaluateDrainingState(newTotalLen)
368-
s.parentCallbacks.propagateStatsDelta(priority, lenDelta, byteSizeDelta)
369-
}
370-
371-
// evaluateDrainingState checks if the shard has transitioned to the Drained state and signals the parent.
372-
func (s *registryShard) evaluateDrainingState(currentLen int64) {
373-
if currentLen == 0 {
374-
// Attempt transition from Draining to Drained atomically.
375-
// This acts as the exactly-once latch. If it succeeds, this goroutine is solely responsible for signaling.
376-
if s.status.CompareAndSwap(int32(componentStatusDraining), int32(componentStatusDrained)) {
377-
s.parentCallbacks.signalShardState(s.id, shardStateSignalBecameDrained)
378-
}
379-
}
342+
// Propagate the delta up to the parent registry. This propagation is lock-free and eventually consistent.
343+
s.onStatsDelta(priority, lenDelta, byteSizeDelta)
380344
}
381345

382346
// --- `priorityBandAccessor` ---

0 commit comments

Comments
 (0)