Skip to content

Commit 671ac4c

Browse files
authored
refactor: Make composite FlowKey the canonical ID (#1340)
This commit establishes the composite `types.FlowKey` (ID + Priority) as the canonical identifier for a flow instance throughout the entire flow control system. This architectural shift aligns the implementation with the clarified domain model where a flow's priority is immutable. This change is the result of a key architectural decision to simplify the system's state management. The previous model, which treated priority as a mutable attribute, required a complex state machine to handle graceful updates. By making the `FlowKey` immutable, that complexity is no longer necessary. The key benefits of this new model are: 1. **Radical State Simplification:** The entire state machine for handling priority updates has been removed. The concepts of "active" vs. "draining" queues, generation counters, and queue reactivation logic are now obsolete. This makes the `FlowRegistry` and `ManagedQueue` implementations significantly simpler and more robust. 2. **Unambiguous API Contracts:** Interfaces are now clearer. Methods like `ManagedQueue(key types.FlowKey)` receive a single, self-contained identifier, eliminating the need to pass `flowID` and `priority` as separate parameters and removing any ambiguity about a flow's identity. 3. **Architectural Alignment:** The implementation now correctly and directly reflects the API's expectation that a flow instance is uniquely defined by its ID *and* its priority. **Architectural Consequence & Accepted Trade-off:** This commit knowingly accepts a significant trade-off: inter-flow fairness is now strictly siloed within each priority band. Under this model, the system will never be able to reason about holistic fairness for a single tenant across multiple priority levels (e.g., balancing `TenantA-High` against `TenantA-Medium`). Fairness policies now operate on `(flowID, priority)` pairs as distinct, unrelated entities. This trade-off is considered acceptable because the requirement for holistic, cross-priority fairness is currently undefined and out of scope for the system's immediate goals.
1 parent de0c7b3 commit 671ac4c

File tree

25 files changed

+602
-609
lines changed

25 files changed

+602
-609
lines changed

pkg/epp/flowcontrol/contracts/mocks/mocks.go

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,8 @@ import (
4646
type MockRegistryShard struct {
4747
IDFunc func() string
4848
IsActiveFunc func() bool
49-
ActiveManagedQueueFunc func(flowID string) (contracts.ManagedQueue, error)
50-
ManagedQueueFunc func(flowID string, priority uint) (contracts.ManagedQueue, error)
51-
IntraFlowDispatchPolicyFunc func(flowID string, priority uint) (framework.IntraFlowDispatchPolicy, error)
49+
ManagedQueueFunc func(key types.FlowKey) (contracts.ManagedQueue, error)
50+
IntraFlowDispatchPolicyFunc func(key types.FlowKey) (framework.IntraFlowDispatchPolicy, error)
5251
InterFlowDispatchPolicyFunc func(priority uint) (framework.InterFlowDispatchPolicy, error)
5352
PriorityBandAccessorFunc func(priority uint) (framework.PriorityBandAccessor, error)
5453
AllOrderedPriorityLevelsFunc func() []uint
@@ -69,23 +68,16 @@ func (m *MockRegistryShard) IsActive() bool {
6968
return false
7069
}
7170

72-
func (m *MockRegistryShard) ActiveManagedQueue(flowID string) (contracts.ManagedQueue, error) {
73-
if m.ActiveManagedQueueFunc != nil {
74-
return m.ActiveManagedQueueFunc(flowID)
75-
}
76-
return nil, nil
77-
}
78-
79-
func (m *MockRegistryShard) ManagedQueue(flowID string, priority uint) (contracts.ManagedQueue, error) {
71+
func (m *MockRegistryShard) ManagedQueue(key types.FlowKey) (contracts.ManagedQueue, error) {
8072
if m.ManagedQueueFunc != nil {
81-
return m.ManagedQueueFunc(flowID, priority)
73+
return m.ManagedQueueFunc(key)
8274
}
8375
return nil, nil
8476
}
8577

86-
func (m *MockRegistryShard) IntraFlowDispatchPolicy(flowID string, priority uint) (framework.IntraFlowDispatchPolicy, error) {
78+
func (m *MockRegistryShard) IntraFlowDispatchPolicy(key types.FlowKey) (framework.IntraFlowDispatchPolicy, error) {
8779
if m.IntraFlowDispatchPolicyFunc != nil {
88-
return m.IntraFlowDispatchPolicyFunc(flowID, priority)
80+
return m.IntraFlowDispatchPolicyFunc(key)
8981
}
9082
return nil, nil
9183
}
@@ -148,8 +140,8 @@ func (m *MockSaturationDetector) IsSaturated(ctx context.Context) bool {
148140
// 3. **Self-Wiring**: The `FlowQueueAccessor()` method returns the mock itself, ensuring the accessor is always
149141
// correctly connected to the queue's state without manual wiring in tests.
150142
type MockManagedQueue struct {
151-
// FlowSpecV defines the flow specification for this mock queue. It should be set by the test.
152-
FlowSpecV types.FlowSpecification
143+
// FlowKeyV defines the flow specification for this mock queue. It should be set by the test.
144+
FlowKeyV types.FlowKey
153145

154146
// AddFunc allows a test to completely override the default Add behavior.
155147
AddFunc func(item types.QueueItemAccessor) error
@@ -249,7 +241,7 @@ func (m *MockManagedQueue) Drain() ([]types.QueueItemAccessor, error) {
249241
return drained, nil
250242
}
251243

252-
func (m *MockManagedQueue) FlowSpec() types.FlowSpecification { return m.FlowSpecV }
244+
func (m *MockManagedQueue) FlowKey() types.FlowKey { return m.FlowKeyV }
253245
func (m *MockManagedQueue) Name() string { return "" }
254246
func (m *MockManagedQueue) Capabilities() []framework.QueueCapability { return nil }
255247
func (m *MockManagedQueue) Comparator() framework.ItemComparator { return nil }

pkg/epp/flowcontrol/contracts/registry.go

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@ package contracts
2424

2525
import (
2626
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
2728
)
2829

2930
// RegistryShard defines the read-oriented interface that a `controller.FlowController` worker uses to access its
30-
// specific slice (shard) of the `FlowRegistry`'s state. It provides the necessary methods for a worker to perform its
31-
// dispatch operations by accessing queues and policies in a concurrent-safe manner.
31+
// specific slice (shard) of the `FlowRegistry's` state. It provides a concurrent-safe view of all flow instances, which
32+
// are uniquely identified by their composite `types.FlowKey`. It is the primary contract for performing dispatch
33+
// operations.
3234
//
3335
// # Conformance
3436
//
@@ -41,21 +43,19 @@ type RegistryShard interface {
4143
// being gracefully drained and should not be given new work.
4244
IsActive() bool
4345

44-
// ActiveManagedQueue returns the currently active `ManagedQueue` for a given flow on this shard. This is the queue to
45-
// which new requests for the flow should be enqueued.
46-
// Returns an error wrapping `ErrFlowInstanceNotFound` if no active instance exists for the given `flowID`.
47-
ActiveManagedQueue(flowID string) (ManagedQueue, error)
48-
49-
// ManagedQueue retrieves a specific (potentially draining) `ManagedQueue` instance from this shard. This allows a
50-
// worker to continue dispatching items from queues that are draining as part of a flow update.
51-
// Returns an error wrapping `ErrFlowInstanceNotFound` if no instance for the given flowID and priority exists.
52-
ManagedQueue(flowID string, priority uint) (ManagedQueue, error)
46+
// ManagedQueue retrieves the managed queue for the given, unique `types.FlowKey`. This is the primary method for
47+
// accessing a specific flow's queue for either enqueueing or dispatching requests.
48+
//
49+
// Returns an error wrapping `ErrPriorityBandNotFound` if the priority specified in the key is not configured, or
50+
// `ErrFlowInstanceNotFound` if no instance exists for the given `key`.
51+
ManagedQueue(key types.FlowKey) (ManagedQueue, error)
5352

54-
// IntraFlowDispatchPolicy retrieves a flow's configured `framework.IntraFlowDispatchPolicy` for this shard.
53+
// IntraFlowDispatchPolicy retrieves a flow's configured `framework.IntraFlowDispatchPolicy` for this shard,
54+
// identified by its unique `FlowKey`.
5555
// The registry guarantees that a non-nil default policy (as configured at the priority-band level) is returned if
56-
// none is specified on the flow itself.
56+
// none is specified for the flow.
5757
// Returns an error wrapping `ErrFlowInstanceNotFound` if the flow instance does not exist.
58-
IntraFlowDispatchPolicy(flowID string, priority uint) (framework.IntraFlowDispatchPolicy, error)
58+
IntraFlowDispatchPolicy(key types.FlowKey) (framework.IntraFlowDispatchPolicy, error)
5959

6060
// InterFlowDispatchPolicy retrieves a priority band's configured `framework.InterFlowDispatchPolicy` for this shard.
6161
// The registry guarantees that a non-nil default policy is returned if none is configured for the band.
@@ -86,8 +86,6 @@ type RegistryShard interface {
8686
// # Conformance
8787
//
8888
// - All methods (including those embedded from `framework.SafeQueue`) MUST be goroutine-safe.
89-
// - The `Add()` method MUST reject new items if the queue has been marked as "draining" by the `FlowRegistry`,
90-
// ensuring that lifecycle changes are respected even by consumers holding a stale pointer to the queue.
9189
// - All mutating methods (`Add()`, `Remove()`, `Cleanup()`, `Drain()`) MUST atomically update relevant statistics
9290
// (e.g., queue length, byte size).
9391
type ManagedQueue interface {

pkg/epp/flowcontrol/controller/internal/filter.go

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@ import (
2323

2424
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
2525
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
2627
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
2728
)
2829

2930
// BandFilter is a function that acts as a pre-policy gate. It takes a complete view of a priority band and returns a
30-
// subset of flow IDs that are considered viable candidates for a subsequent policy decision. It can also return a
31-
// boolean signal to pause the entire operation for the band.
31+
// potentially filtered `framework.PriorityBandAccessor` containing only the flows that are viable candidates for a
32+
// subsequent policy decision. It can also return a boolean signal to pause the entire operation for the band.
3233
//
3334
// This abstraction decouples the logic of determining *viability* (e.g., is a flow subject to backpressure?) from the
3435
// logic of *selection* (e.g., which of the viable flows is the fairest to pick next?). This separation simplifies the
@@ -39,13 +40,13 @@ import (
3940
// can be chained to apply different viability criteria. For example, a future filter could be developed to temporarily
4041
// exclude a "misbehaving" flow that is causing repeated errors, quarantining it from policy consideration.
4142
//
42-
// A nil `allowedFlows` map indicates that no filtering is necessary and all flows in the band are visible.
43-
// This provides a zero-allocation fast path for the common case where no flows are being filtered.
43+
// A nil returned `PriorityBandAccessor` indicates that no filtering was necessary and the original accessor should be
44+
// used. This provides a zero-allocation fast path for the common case where no flows are being filtered.
4445
type BandFilter func(
4546
ctx context.Context,
4647
band framework.PriorityBandAccessor,
4748
logger logr.Logger,
48-
) (allowedFlows map[string]struct{}, shouldPause bool)
49+
) (filteredBand framework.PriorityBandAccessor, shouldPause bool)
4950

5051
// NewSaturationFilter creates a `BandFilter` that uses the provided `contracts.SaturationDetector` to determine which
5152
// flows are dispatchable. This is the standard filter used in the production `FlowController` for the dispatch
@@ -55,15 +56,16 @@ func NewSaturationFilter(sd contracts.SaturationDetector) BandFilter {
5556
ctx context.Context,
5657
band framework.PriorityBandAccessor,
5758
logger logr.Logger,
58-
) (map[string]struct{}, bool) {
59+
) (framework.PriorityBandAccessor, bool) {
5960
// Phase 1: Implement the current global saturation check.
6061
if sd.IsSaturated(ctx) {
6162
logger.V(logutil.VERBOSE).Info("System saturated, pausing dispatch for this shard.")
6263
return nil, true // Pause dispatching for all bands.
6364
}
6465

6566
// Phase 2 (Future): This is where per-flow saturation logic would go.
66-
// It would iterate `band`, call `IsSaturated(ctx, flowID)`, and build a filtered map of allowed flows.
67+
// It would iterate `band`, call `IsSaturated(ctx, flowID)`, and build a filtered map of allowed flows,
68+
// then return `newSubsetPriorityBandAccessor(band, allowedFlows)`.
6769
// For now, no per-flow filtering is done. Return nil to signal the fast path.
6870
return nil, false // Do not pause, and do not filter any flows.
6971
}
@@ -73,31 +75,28 @@ func NewSaturationFilter(sd contracts.SaturationDetector) BandFilter {
7375
// It implements the `framework.PriorityBandAccessor` interface, ensuring that any policy operating on it will only
7476
// see the allowed flows, regardless of which accessor method is used. This provides correctness by construction.
7577
//
76-
// For performance, it pre-computes a slice of the allowed flow IDs at creation time, making subsequent calls to
77-
// `FlowIDs()` an O(1) operation with zero allocations.
78+
// For performance, it pre-computes a slice of the allowed flows at creation time, making subsequent calls to
79+
// `FlowKeys()` an O(1) operation with zero allocations.
7880
type subsetPriorityBandAccessor struct {
7981
originalAccessor framework.PriorityBandAccessor
80-
allowedFlows map[string]struct{}
81-
allowedFlowsSlice []string
82+
allowedFlows map[types.FlowKey]struct{}
83+
allowedFlowsSlice []types.FlowKey
8284
}
8385

8486
var _ framework.PriorityBandAccessor = &subsetPriorityBandAccessor{}
8587

8688
// newSubsetPriorityBandAccessor creates a new filtered view of a priority band.
87-
func newSubsetPriorityBandAccessor(
88-
original framework.PriorityBandAccessor,
89-
allowed map[string]struct{},
90-
) *subsetPriorityBandAccessor {
91-
// Pre-compute the slice of flow IDs for performance.
92-
ids := make([]string, 0, len(allowed))
93-
for id := range allowed {
94-
ids = append(ids, id)
89+
func newSubsetPriorityBandAccessor(original framework.PriorityBandAccessor, allowed []types.FlowKey) *subsetPriorityBandAccessor {
90+
// Pre-compute the map for efficient lookups in `Queue()` and `IterateQueues()`.
91+
allowedMap := make(map[types.FlowKey]struct{}, len(allowed))
92+
for _, k := range allowed {
93+
allowedMap[k] = struct{}{}
9594
}
9695

9796
return &subsetPriorityBandAccessor{
9897
originalAccessor: original,
99-
allowedFlows: allowed,
100-
allowedFlowsSlice: ids,
98+
allowedFlows: allowedMap,
99+
allowedFlowsSlice: allowed,
101100
}
102101
}
103102

@@ -111,19 +110,21 @@ func (s *subsetPriorityBandAccessor) PriorityName() string {
111110
return s.originalAccessor.PriorityName()
112111
}
113112

114-
// FlowIDs returns a slice of all flow IDs within this priority band that are in the allowed subset.
113+
// FlowKeys returns a slice of the composite `types.FlowKey`s for every flow instance currently active within this
114+
// priority band that are in the allowed subset.
115115
// This is an O(1) operation because the slice is pre-computed at creation.
116-
func (s *subsetPriorityBandAccessor) FlowIDs() []string {
116+
func (s *subsetPriorityBandAccessor) FlowKeys() []types.FlowKey {
117117
return s.allowedFlowsSlice
118118
}
119119

120-
// Queue returns a `framework.FlowQueueAccessor` for the specified `flowID` within this priority band, but only if it is
120+
// Queue returns a `framework.FlowQueueAccessor` for the specified `ID` within this priority band, but only if it is
121121
// in the allowed subset. This is an O(1) map lookup. If the flow is not in the allowed subset, it returns nil.
122-
func (s *subsetPriorityBandAccessor) Queue(flowID string) framework.FlowQueueAccessor {
123-
if _, ok := s.allowedFlows[flowID]; !ok {
122+
func (s *subsetPriorityBandAccessor) Queue(id string) framework.FlowQueueAccessor {
123+
key := types.FlowKey{ID: id, Priority: s.Priority()}
124+
if _, ok := s.allowedFlows[key]; !ok {
124125
return nil
125126
}
126-
return s.originalAccessor.Queue(flowID)
127+
return s.originalAccessor.Queue(id)
127128
}
128129

129130
// IterateQueues executes the given `callback` for each `framework.FlowQueueAccessor` in the allowed subset of this
@@ -132,7 +133,7 @@ func (s *subsetPriorityBandAccessor) Queue(flowID string) framework.FlowQueueAcc
132133
// efficient than iterating over a pre-computed slice of IDs.
133134
func (s *subsetPriorityBandAccessor) IterateQueues(callback func(queue framework.FlowQueueAccessor) bool) {
134135
s.originalAccessor.IterateQueues(func(queue framework.FlowQueueAccessor) bool {
135-
if _, ok := s.allowedFlows[queue.FlowSpec().ID]; ok {
136+
if _, ok := s.allowedFlows[queue.FlowKey()]; ok {
136137
// This queue is in the allowed set, so execute the callback.
137138
if !callback(queue) {
138139
return false // The callback requested to stop, so we stop the outer iteration too.

pkg/epp/flowcontrol/controller/internal/filter_test.go

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,22 @@ func TestNewSaturationFilter(t *testing.T) {
3535
t.Parallel()
3636

3737
testCases := []struct {
38-
name string
39-
isSaturated bool
40-
expectShouldPause bool
41-
expectAllowed map[string]struct{}
38+
name string
39+
isSaturated bool
40+
expectShouldPause bool
41+
expectFilteredBandNil bool
4242
}{
4343
{
44-
name: "should not pause or filter when system is not saturated",
45-
isSaturated: false,
46-
expectShouldPause: false,
47-
expectAllowed: nil, // nil map signals the fast path
44+
name: "should not pause or filter when system is not saturated",
45+
isSaturated: false,
46+
expectShouldPause: false,
47+
expectFilteredBandNil: true, // nil band signals the fast path
4848
},
4949
{
50-
name: "should pause when system is saturated",
51-
isSaturated: true,
52-
expectShouldPause: true,
53-
expectAllowed: nil,
50+
name: "should pause when system is saturated",
51+
isSaturated: true,
52+
expectShouldPause: true,
53+
expectFilteredBandNil: true,
5454
},
5555
}
5656

@@ -66,15 +66,15 @@ func TestNewSaturationFilter(t *testing.T) {
6666
mockBand := &frameworkmocks.MockPriorityBandAccessor{}
6767

6868
// --- ACT ---
69-
allowed, shouldPause := filter(context.Background(), mockBand, logr.Discard())
69+
filteredBand, shouldPause := filter(context.Background(), mockBand, logr.Discard())
7070

7171
// --- ASSERT ---
7272
assert.Equal(t, tc.expectShouldPause, shouldPause, "The filter's pause signal should match the expected value")
7373

74-
if tc.expectAllowed == nil {
75-
assert.Nil(t, allowed, "Expected allowed map to be nil for the fast path")
74+
if tc.expectFilteredBandNil {
75+
assert.Nil(t, filteredBand, "Expected filtered band to be nil")
7676
} else {
77-
assert.Equal(t, tc.expectAllowed, allowed, "The set of allowed flows should match the expected value")
77+
assert.NotNil(t, filteredBand, "Expected a non-nil filtered band")
7878
}
7979
})
8080
}
@@ -85,15 +85,19 @@ func TestSubsetPriorityBandAccessor(t *testing.T) {
8585

8686
// --- ARRANGE ---
8787
// Setup a mock original accessor that knows about three flows.
88-
mockQueueA := &frameworkmocks.MockFlowQueueAccessor{FlowSpecV: types.FlowSpecification{ID: "flow-a"}}
89-
mockQueueB := &frameworkmocks.MockFlowQueueAccessor{FlowSpecV: types.FlowSpecification{ID: "flow-b"}}
90-
mockQueueC := &frameworkmocks.MockFlowQueueAccessor{FlowSpecV: types.FlowSpecification{ID: "flow-c"}}
88+
flowAKey := types.FlowKey{ID: "flow-a", Priority: 10}
89+
flowBKey := types.FlowKey{ID: "flow-b", Priority: 10}
90+
flowCKey := types.FlowKey{ID: "flow-c", Priority: 10}
91+
92+
mockQueueA := &frameworkmocks.MockFlowQueueAccessor{FlowKeyV: flowAKey}
93+
mockQueueB := &frameworkmocks.MockFlowQueueAccessor{FlowKeyV: flowBKey}
94+
mockQueueC := &frameworkmocks.MockFlowQueueAccessor{FlowKeyV: flowCKey}
9195

9296
originalAccessor := &frameworkmocks.MockPriorityBandAccessor{
9397
PriorityV: 10,
9498
PriorityNameV: "High",
95-
FlowIDsFunc: func() []string {
96-
return []string{"flow-a", "flow-b", "flow-c"}
99+
FlowKeysFunc: func() []types.FlowKey {
100+
return []types.FlowKey{flowAKey, flowBKey, flowCKey}
97101
},
98102
QueueFunc: func(id string) framework.FlowQueueAccessor {
99103
switch id {
@@ -118,10 +122,7 @@ func TestSubsetPriorityBandAccessor(t *testing.T) {
118122
}
119123

120124
// Create a subset view that only allows two of the flows.
121-
allowedFlows := map[string]struct{}{
122-
"flow-a": {},
123-
"flow-c": {},
124-
}
125+
allowedFlows := []types.FlowKey{flowAKey, flowCKey}
125126
subsetAccessor := newSubsetPriorityBandAccessor(originalAccessor, allowedFlows)
126127
require.NotNil(t, subsetAccessor, "newSubsetPriorityBandAccessor should not return nil")
127128

@@ -132,12 +133,14 @@ func TestSubsetPriorityBandAccessor(t *testing.T) {
132133
"PriorityName() should pass through from the original accessor")
133134
})
134135

135-
t.Run("should only return allowed flow IDs", func(t *testing.T) {
136+
t.Run("should only return allowed flow keys", func(t *testing.T) {
136137
t.Parallel()
137-
flowIDs := subsetAccessor.FlowIDs()
138+
flowKeys := subsetAccessor.FlowKeys()
138139
// Sort for consistent comparison, as the pre-computed slice order is not guaranteed.
139-
sort.Strings(flowIDs)
140-
assert.Equal(t, []string{"flow-a", "flow-c"}, flowIDs, "FlowIDs() should only return the allowed subset")
140+
sort.Slice(flowKeys, func(i, j int) bool {
141+
return flowKeys[i].ID < flowKeys[j].ID
142+
})
143+
assert.Equal(t, []types.FlowKey{flowAKey, flowCKey}, flowKeys, "FlowKeys() should only return the allowed subset")
141144
})
142145

143146
t.Run("should only return queues for allowed flows", func(t *testing.T) {
@@ -151,7 +154,7 @@ func TestSubsetPriorityBandAccessor(t *testing.T) {
151154
t.Parallel()
152155
var iterated []string
153156
subsetAccessor.IterateQueues(func(queue framework.FlowQueueAccessor) bool {
154-
iterated = append(iterated, queue.FlowSpec().ID)
157+
iterated = append(iterated, queue.FlowKey().ID)
155158
return true
156159
})
157160
// Sort for consistent comparison, as iteration order is not guaranteed.
@@ -163,7 +166,7 @@ func TestSubsetPriorityBandAccessor(t *testing.T) {
163166
t.Parallel()
164167
var iterated []string
165168
subsetAccessor.IterateQueues(func(queue framework.FlowQueueAccessor) bool {
166-
iterated = append(iterated, queue.FlowSpec().ID)
169+
iterated = append(iterated, queue.FlowKey().ID)
167170
return false // Exit after the first item.
168171
})
169172
assert.Len(t, iterated, 1, "Iteration should have stopped after one item")

pkg/epp/flowcontrol/controller/internal/item_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ func TestItem(t *testing.T) {
4141

4242
t.Run("should have a non-finalized state upon creation", func(t *testing.T) {
4343
t.Parallel()
44-
req := typesmocks.NewMockFlowControlRequest(100, "req-1", "flow-a", context.Background())
44+
key := types.FlowKey{ID: "flow-a", Priority: 10}
45+
req := typesmocks.NewMockFlowControlRequest(100, "req-1", key, context.Background())
4546
item := NewItem(req, time.Minute, time.Now())
4647
require.NotNil(t, item, "NewItem should not return nil")
4748
outcome, err := item.FinalState()

0 commit comments

Comments
 (0)