Skip to content

Commit 453c164

Browse files
authored
refactor: Improve Flow Control queue contracts for clarity and correctness (#1836)
* refactor: Simplify SafeQueue interface This commit refactors the `framework.SafeQueue` interface to remove error returns from methods where failure is not expected in normal operation (Add, PeekHead, PeekTail, Drain, Cleanup). The interface now relies on the following premises: - Queues are unbounded and in-memory. - OOM is a panic, not a handled error. - Internal callers are trusted not to provide nil items. - PeekHead/PeekTail return nil for empty queues. This change simplifies the interface and reduces unnecessary error checking on the hot path. This commit updates the queue implementations (ListQueue, MaxMinHeap), mocks, and all callers within the `pkg/epp/flowcontrol/framework/...` tree to conform to the new interface. A subsequent commit will address callers outside this tree. * refactor: Decouple ManagedQueue from SafeQueue This commit refactors the `ManagedQueue` contract after the preceding commit which altered the `SafeQueue` contract. Motivation: The preceding commit made the generic `framework.SafeQueue.Add` method infallible. This created a design conflict, as the higher-level `ManagedQueue` decorator requires a *fallible* `Add` operation to atomically reject requests when its parent shard is draining. This commit addresses this by refactoring `ManagedQueue` to favor composition over embedding: - `ManagedQueue` no longer embeds `SafeQueue`; it now contains it as a field, correctly modeling the "has-a" relationship. - The `ManagedQueue.Add` method now has its own explicit, fallible contract, returning `contracts.ErrShardDraining`. - The `Remove` and `Cleanup` operations follow the new, simpler `SafeQueue` contract. This change also includes the necessary updates to all callers in the `controller` and `registry` packages to conform to these new, more robust contracts as well as updating tests and mocks.
1 parent fd5ce6b commit 453c164

File tree

21 files changed

+239
-416
lines changed

21 files changed

+239
-416
lines changed

pkg/epp/flowcontrol/contracts/mocks/mocks.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,9 @@ type MockManagedQueue struct {
149149
// RemoveFunc allows a test to completely override the default Remove behavior.
150150
RemoveFunc func(handle types.QueueItemHandle) (types.QueueItemAccessor, error)
151151
// CleanupFunc allows a test to completely override the default Cleanup behavior.
152-
CleanupFunc func(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error)
152+
CleanupFunc func(predicate framework.PredicateFunc) []types.QueueItemAccessor
153153
// DrainFunc allows a test to completely override the default Drain behavior.
154-
DrainFunc func() ([]types.QueueItemAccessor, error)
154+
DrainFunc func() []types.QueueItemAccessor
155155

156156
// mu protects access to the internal `items` map.
157157
mu sync.Mutex
@@ -209,7 +209,7 @@ func (m *MockManagedQueue) Remove(handle types.QueueItemHandle) (types.QueueItem
209209
}
210210

211211
// Cleanup removes items matching a predicate. It checks for a test override before locking.
212-
func (m *MockManagedQueue) Cleanup(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) {
212+
func (m *MockManagedQueue) Cleanup(predicate framework.PredicateFunc) []types.QueueItemAccessor {
213213
if m.CleanupFunc != nil {
214214
return m.CleanupFunc(predicate)
215215
}
@@ -223,11 +223,11 @@ func (m *MockManagedQueue) Cleanup(predicate framework.PredicateFunc) ([]types.Q
223223
delete(m.items, handle)
224224
}
225225
}
226-
return removed, nil
226+
return removed
227227
}
228228

229229
// Drain removes all items from the queue. It checks for a test override before locking.
230-
func (m *MockManagedQueue) Drain() ([]types.QueueItemAccessor, error) {
230+
func (m *MockManagedQueue) Drain() []types.QueueItemAccessor {
231231
if m.DrainFunc != nil {
232232
return m.DrainFunc()
233233
}
@@ -239,7 +239,7 @@ func (m *MockManagedQueue) Drain() ([]types.QueueItemAccessor, error) {
239239
drained = append(drained, item)
240240
}
241241
m.items = make(map[types.QueueItemHandle]types.QueueItemAccessor)
242-
return drained, nil
242+
return drained
243243
}
244244

245245
func (m *MockManagedQueue) FlowKey() types.FlowKey { return m.FlowKeyV }
@@ -268,17 +268,17 @@ func (m *MockManagedQueue) ByteSize() uint64 {
268268
}
269269

270270
// PeekHead returns the first item found in the mock queue. Note: map iteration order is not guaranteed.
271-
func (m *MockManagedQueue) PeekHead() (types.QueueItemAccessor, error) {
271+
func (m *MockManagedQueue) PeekHead() types.QueueItemAccessor {
272272
m.mu.Lock()
273273
defer m.mu.Unlock()
274274
m.init()
275275
for _, item := range m.items {
276-
return item, nil // Return first item found
276+
return item // Return first item found
277277
}
278-
return nil, nil // Queue is empty
278+
return nil // Queue is empty
279279
}
280280

281281
// PeekTail is not implemented for this mock.
282-
func (m *MockManagedQueue) PeekTail() (types.QueueItemAccessor, error) {
283-
return nil, nil
282+
func (m *MockManagedQueue) PeekTail() types.QueueItemAccessor {
283+
return nil
284284
}

pkg/epp/flowcontrol/contracts/registry.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -142,17 +142,24 @@ type RegistryShard interface {
142142
}
143143

144144
// ManagedQueue defines the interface for a flow's queue on a specific shard.
145-
// It acts as a stateful decorator around an underlying `framework.SafeQueue`, augmenting it with statistics tracking.
145+
// It acts as a stateful decorator that *use an underlying framework.SafeQueue, augmenting it with statistics tracking
146+
// and lifecycle awareness (e.g., rejecting adds when a shard is draining).
146147
//
147-
// # Conformance
148-
//
149-
// - Implementations MUST be goroutine-safe.
150-
// - All mutating methods MUST ensure that the underlying queue state and the public statistics (`Len`, `ByteSize`)
151-
// are updated as a single atomic transaction.
152-
// - The `Add` method MUST return an error wrapping `ErrShardDraining` if the queue instance belongs to a parent shard
153-
// that is no longer Active.
148+
// Conformance: Implementations MUST be goroutine-safe.
154149
type ManagedQueue interface {
155-
framework.SafeQueue
150+
// Add attempts to enqueue an item, performing an atomic check on the parent shard's lifecycle state before adding
151+
// the item to the underlying queue.
152+
// Returns ErrShardDraining if the parent shard is no longer Active.
153+
Add(item types.QueueItemAccessor) error
154+
155+
// Remove atomically finds and removes an item from the underlying queue using its handle.
156+
Remove(handle types.QueueItemHandle) (types.QueueItemAccessor, error)
157+
158+
// Cleanup removes all items from the underlying queue that satisfy the predicate.
159+
Cleanup(predicate framework.PredicateFunc) []types.QueueItemAccessor
160+
161+
// Drain removes all items from the underlying queue.
162+
Drain() []types.QueueItemAccessor
156163

157164
// FlowQueueAccessor returns a read-only, flow-aware accessor for this queue, used by policy plugins.
158165
// Conformance: This method MUST NOT return nil.

pkg/epp/flowcontrol/controller/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ func (fc *FlowController) selectDistributionCandidates(key types.FlowKey) ([]can
374374
"flowKey", key, "shardID", shard.ID())
375375
continue
376376
}
377-
candidates = append(candidates, candidate{worker.processor, shard.ID(), mq.ByteSize()})
377+
candidates = append(candidates, candidate{worker.processor, shard.ID(), mq.FlowQueueAccessor().ByteSize()})
378378
}
379379
return nil
380380
})

pkg/epp/flowcontrol/controller/controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1128,7 +1128,7 @@ func setupRegistryForConcurrency(t *testing.T, numShards int, flowKey types.Flow
11281128
IntraFlowDispatchPolicyFunc: func(_ types.FlowKey) (framework.IntraFlowDispatchPolicy, error) {
11291129
return &frameworkmocks.MockIntraFlowDispatchPolicy{
11301130
SelectItemFunc: func(qa framework.FlowQueueAccessor) (types.QueueItemAccessor, error) {
1131-
return qa.PeekHead()
1131+
return qa.PeekHead(), nil
11321132
},
11331133
}, nil
11341134
},

pkg/epp/flowcontrol/controller/internal/processor.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -408,10 +408,7 @@ func (sp *ShardProcessor) sweepFinalizedItems() {
408408
predicate := func(itemAcc types.QueueItemAccessor) bool {
409409
return itemAcc.(*FlowItem).FinalState() != nil
410410
}
411-
removedItems, err := managedQ.Cleanup(predicate)
412-
if err != nil {
413-
logger.Error(err, "Error during ManagedQueue Cleanup", "flowKey", key)
414-
}
411+
removedItems := managedQ.Cleanup(predicate)
415412
logger.V(logutil.DEBUG).Info("Swept finalized items and released capacity.",
416413
"flowKey", key, "count", len(removedItems))
417414
}
@@ -449,10 +446,7 @@ func (sp *ShardProcessor) shutdown() {
449446
func (sp *ShardProcessor) evictAll() {
450447
processFn := func(managedQ contracts.ManagedQueue, logger logr.Logger) {
451448
key := managedQ.FlowQueueAccessor().FlowKey()
452-
removedItems, err := managedQ.Drain()
453-
if err != nil {
454-
logger.Error(err, "Error during ManagedQueue Drain", "flowKey", key)
455-
}
449+
removedItems := managedQ.Drain()
456450

457451
outcome := types.QueueOutcomeEvictedOther
458452
errShutdown := fmt.Errorf("%w: %w", types.ErrEvicted, types.ErrFlowControllerNotRunning)

pkg/epp/flowcontrol/controller/internal/processor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ func (h *testHarness) intraFlowDispatchPolicy(types.FlowKey) (framework.IntraFlo
282282

283283
// Otherwise, use a default implementation that selects the head of the queue.
284284
policy.SelectItemFunc = func(fqa framework.FlowQueueAccessor) (types.QueueItemAccessor, error) {
285-
return fqa.PeekHead()
285+
return fqa.PeekHead(), nil
286286
}
287287
return policy, nil
288288
}

pkg/epp/flowcontrol/framework/errors.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,6 @@ import (
2626
// and might be handled or wrapped by the `contracts.FlowRegistry`'s `contracts.ManagedQueue` or the
2727
// `controller.FlowController`.
2828
var (
29-
// ErrNilQueueItem indicates that a nil `types.QueueItemAccessor` was passed to `SafeQueue.Add()`.
30-
ErrNilQueueItem = errors.New("queue item cannot be nil")
31-
32-
// ErrQueueEmpty indicates an attempt to perform an operation on an empty `SafeQueue` that requires one or more items
33-
// (e.g., calling `SafeQueue.PeekHead()`).
34-
ErrQueueEmpty = errors.New("queue is empty")
35-
3629
// ErrInvalidQueueItemHandle indicates that a `types.QueueItemHandle` provided to a `SafeQueue` operation (e.g.,
3730
// `SafeQueue.Remove()`) is not valid for that queue, has been invalidated, or does not correspond to an actual item
3831
// in the queue.

pkg/epp/flowcontrol/framework/mocks/mocks.go

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,7 @@ type MockFlowQueueAccessor struct {
3838
LenV int
3939
ByteSizeV uint64
4040
PeekHeadV types.QueueItemAccessor
41-
PeekHeadErrV error
4241
PeekTailV types.QueueItemAccessor
43-
PeekTailErrV error
4442
FlowKeyV types.FlowKey
4543
ComparatorV framework.ItemComparator
4644
CapabilitiesV []framework.QueueCapability
@@ -53,12 +51,12 @@ func (m *MockFlowQueueAccessor) Comparator() framework.ItemComparator { ret
5351
func (m *MockFlowQueueAccessor) FlowKey() types.FlowKey { return m.FlowKeyV }
5452
func (m *MockFlowQueueAccessor) Capabilities() []framework.QueueCapability { return m.CapabilitiesV }
5553

56-
func (m *MockFlowQueueAccessor) PeekHead() (types.QueueItemAccessor, error) {
57-
return m.PeekHeadV, m.PeekHeadErrV
54+
func (m *MockFlowQueueAccessor) PeekHead() types.QueueItemAccessor {
55+
return m.PeekHeadV
5856
}
5957

60-
func (m *MockFlowQueueAccessor) PeekTail() (types.QueueItemAccessor, error) {
61-
return m.PeekTailV, m.PeekTailErrV
58+
func (m *MockFlowQueueAccessor) PeekTail() types.QueueItemAccessor {
59+
return m.PeekTailV
6260
}
6361

6462
var _ framework.FlowQueueAccessor = &MockFlowQueueAccessor{}
@@ -108,33 +106,30 @@ type MockSafeQueue struct {
108106
LenV int
109107
ByteSizeV uint64
110108
PeekHeadV types.QueueItemAccessor
111-
PeekHeadErrV error
112109
PeekTailV types.QueueItemAccessor
113-
PeekTailErrV error
114-
AddFunc func(item types.QueueItemAccessor) error
110+
AddFunc func(item types.QueueItemAccessor)
115111
RemoveFunc func(handle types.QueueItemHandle) (types.QueueItemAccessor, error)
116-
CleanupFunc func(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error)
117-
DrainFunc func() ([]types.QueueItemAccessor, error)
112+
CleanupFunc func(predicate framework.PredicateFunc) []types.QueueItemAccessor
113+
DrainFunc func() []types.QueueItemAccessor
118114
}
119115

120116
func (m *MockSafeQueue) Name() string { return m.NameV }
121117
func (m *MockSafeQueue) Capabilities() []framework.QueueCapability { return m.CapabilitiesV }
122118
func (m *MockSafeQueue) Len() int { return m.LenV }
123119
func (m *MockSafeQueue) ByteSize() uint64 { return m.ByteSizeV }
124120

125-
func (m *MockSafeQueue) PeekHead() (types.QueueItemAccessor, error) {
126-
return m.PeekHeadV, m.PeekHeadErrV
121+
func (m *MockSafeQueue) PeekHead() types.QueueItemAccessor {
122+
return m.PeekHeadV
127123
}
128124

129-
func (m *MockSafeQueue) PeekTail() (types.QueueItemAccessor, error) {
130-
return m.PeekTailV, m.PeekTailErrV
125+
func (m *MockSafeQueue) PeekTail() types.QueueItemAccessor {
126+
return m.PeekTailV
131127
}
132128

133-
func (m *MockSafeQueue) Add(item types.QueueItemAccessor) error {
129+
func (m *MockSafeQueue) Add(item types.QueueItemAccessor) {
134130
if m.AddFunc != nil {
135-
return m.AddFunc(item)
131+
m.AddFunc(item)
136132
}
137-
return nil
138133
}
139134

140135
func (m *MockSafeQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAccessor, error) {
@@ -144,18 +139,18 @@ func (m *MockSafeQueue) Remove(handle types.QueueItemHandle) (types.QueueItemAcc
144139
return nil, nil
145140
}
146141

147-
func (m *MockSafeQueue) Cleanup(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) {
142+
func (m *MockSafeQueue) Cleanup(predicate framework.PredicateFunc) []types.QueueItemAccessor {
148143
if m.CleanupFunc != nil {
149144
return m.CleanupFunc(predicate)
150145
}
151-
return nil, nil
146+
return nil
152147
}
153148

154-
func (m *MockSafeQueue) Drain() ([]types.QueueItemAccessor, error) {
149+
func (m *MockSafeQueue) Drain() []types.QueueItemAccessor {
155150
if m.DrainFunc != nil {
156151
return m.DrainFunc()
157152
}
158-
return nil, nil
153+
return nil
159154
}
160155

161156
var _ framework.SafeQueue = &MockSafeQueue{}

pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ func (p *bestHead) SelectQueue(band framework.PriorityBandAccessor) (framework.F
6868
return true
6969
}
7070

71-
item, err := queue.PeekHead()
72-
if err != nil || item == nil {
71+
item := queue.PeekHead()
72+
if item == nil {
7373
return true
7474
}
7575

pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package besthead
1818

1919
import (
20-
"errors"
2120
"testing"
2221
"time"
2322

@@ -105,10 +104,10 @@ func TestBestHead_SelectQueue(t *testing.T) {
105104
ComparatorV: newTestComparator(),
106105
}
107106
queueEmpty := &frameworkmocks.MockFlowQueueAccessor{
108-
LenV: 0,
109-
PeekHeadErrV: framework.ErrQueueEmpty,
110-
FlowKeyV: types.FlowKey{ID: "flowEmpty"},
111-
ComparatorV: newTestComparator(),
107+
LenV: 0,
108+
PeekHeadV: nil,
109+
FlowKeyV: types.FlowKey{ID: "flowEmpty"},
110+
ComparatorV: newTestComparator(),
112111
}
113112

114113
testCases := []struct {
@@ -151,19 +150,6 @@ func TestBestHead_SelectQueue(t *testing.T) {
151150
),
152151
expectedErr: framework.ErrIncompatiblePriorityType,
153152
},
154-
{
155-
name: "QueuePeekHeadErrors",
156-
band: newTestBand(
157-
&frameworkmocks.MockFlowQueueAccessor{
158-
LenV: 1,
159-
PeekHeadErrV: errors.New("peek error"),
160-
FlowKeyV: flow1Key,
161-
ComparatorV: newTestComparator(),
162-
},
163-
queue2,
164-
),
165-
expectedQueueID: flow2ID,
166-
},
167153
{
168154
name: "QueueComparatorIsNil",
169155
band: newTestBand(
@@ -195,10 +181,10 @@ func TestBestHead_SelectQueue(t *testing.T) {
195181
band: newTestBand(
196182
queueEmpty,
197183
&frameworkmocks.MockFlowQueueAccessor{
198-
LenV: 0,
199-
PeekHeadErrV: framework.ErrQueueEmpty,
200-
FlowKeyV: types.FlowKey{ID: "flowEmpty2"},
201-
ComparatorV: newTestComparator(),
184+
LenV: 0,
185+
PeekHeadV: nil,
186+
FlowKeyV: types.FlowKey{ID: "flowEmpty2"},
187+
ComparatorV: newTestComparator(),
202188
},
203189
),
204190
},

0 commit comments

Comments
 (0)