Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
* [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132
* [BUGFIX] Scheduler: Fix memory leak by properly cleaning up query fragment registry. #7148

## 1.20.1 2025-12-03

Expand Down
24 changes: 24 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,30 @@ func (s *Scheduler) cancelRequestAndRemoveFromPending(frontendAddr string, query
req.ctxCancel()
}
delete(s.pendingRequests, key)

// Clean up queryFragmentRegistry for this specific fragment
if fragmentIDs, ok := s.queryFragmentRegistry[querykey]; ok {
// Fast path: if there's only one fragment and it's the one we're deleting,
// just delete the entire entry without allocating a new slice
if len(fragmentIDs) == 1 && fragmentIDs[0] == fragmentID {
delete(s.queryFragmentRegistry, querykey)
} else {
// Slow path: remove this fragmentID from the slice
newFragmentIDs := make([]uint64, 0, len(fragmentIDs)-1)
for _, fid := range fragmentIDs {
if fid != fragmentID {
newFragmentIDs = append(newFragmentIDs, fid)
}
}

// If no more fragments remain, delete the entire entry
if len(newFragmentIDs) == 0 {
delete(s.queryFragmentRegistry, querykey)
} else {
s.queryFragmentRegistry[querykey] = newFragmentIDs
}
}
}
}
}

Expand Down
183 changes: 183 additions & 0 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,3 +697,186 @@ func (f *frontendMock) getRequest(queryID uint64) *httpgrpc.HTTPResponse {

return f.resp[queryID]
}

// TestQueryFragmentRegistryCleanupSingleFragment verifies that queryFragmentRegistry
// is properly cleaned up when a single fragment completes (non-fragmenting mode).
func TestQueryFragmentRegistryCleanupSingleFragment(t *testing.T) {
cfg := Config{}
flagext.DefaultValues(&cfg)
s, err := NewScheduler(cfg, frontendv1.MockLimits{Queriers: 2, MockLimits: queue.MockLimits{MaxOutstanding: testMaxOutstandingPerTenant}}, log.NewNopLogger(), nil, false)
require.NoError(t, err)

frontendAddr := "frontend1"
queryID := uint64(100)
fragmentID := uint64(0)

// Simulate enqueue adding to both maps
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

req := &schedulerRequest{
frontendAddress: frontendAddr,
userID: "user1",
queryID: queryID,
ctx: ctx,
ctxCancel: cancel,
}

s.pendingRequestsMu.Lock()
queryKey := queryKey{frontendAddr: frontendAddr, queryID: queryID}
s.queryFragmentRegistry[queryKey] = []uint64{fragmentID}
s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID}] = req
s.pendingRequestsMu.Unlock()

// Verify both entries exist
s.pendingRequestsMu.Lock()
require.Len(t, s.queryFragmentRegistry[queryKey], 1)
require.Contains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID})
s.pendingRequestsMu.Unlock()

// Simulate request completion (cancelAll=false)
s.cancelRequestAndRemoveFromPending(frontendAddr, queryID, fragmentID, false)

// Verify cleanup: both pendingRequests AND queryFragmentRegistry should be cleaned up
s.pendingRequestsMu.Lock()
_, registryExists := s.queryFragmentRegistry[queryKey]
require.False(t, registryExists, "queryFragmentRegistry should be cleaned up when last fragment completes")
require.NotContains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID}, "pendingRequests should be cleaned up")
s.pendingRequestsMu.Unlock()
}

// TestQueryFragmentRegistryCleanupMultipleFragments verifies that queryFragmentRegistry
// properly removes only the completed fragment and keeps others when multiple fragments exist.
func TestQueryFragmentRegistryCleanupMultipleFragments(t *testing.T) {
cfg := Config{}
flagext.DefaultValues(&cfg)
s, err := NewScheduler(cfg, frontendv1.MockLimits{Queriers: 2, MockLimits: queue.MockLimits{MaxOutstanding: testMaxOutstandingPerTenant}}, log.NewNopLogger(), nil, true)
require.NoError(t, err)

frontendAddr := "frontend1"
queryID := uint64(100)
fragmentID1 := uint64(0)
fragmentID2 := uint64(1)
fragmentID3 := uint64(2)

// Simulate multiple fragments for the same query
ctx1, cancel1 := context.WithCancel(context.Background())
defer cancel1()
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
ctx3, cancel3 := context.WithCancel(context.Background())
defer cancel3()

req1 := &schedulerRequest{
frontendAddress: frontendAddr,
userID: "user1",
queryID: queryID,
ctx: ctx1,
ctxCancel: cancel1,
}
req2 := &schedulerRequest{
frontendAddress: frontendAddr,
userID: "user1",
queryID: queryID,
ctx: ctx2,
ctxCancel: cancel2,
}
req3 := &schedulerRequest{
frontendAddress: frontendAddr,
userID: "user1",
queryID: queryID,
ctx: ctx3,
ctxCancel: cancel3,
}

s.pendingRequestsMu.Lock()
queryKey := queryKey{frontendAddr: frontendAddr, queryID: queryID}
s.queryFragmentRegistry[queryKey] = []uint64{fragmentID1, fragmentID2, fragmentID3}
s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID1}] = req1
s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID2}] = req2
s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID3}] = req3
s.pendingRequestsMu.Unlock()

// Verify all three fragments exist
s.pendingRequestsMu.Lock()
require.Len(t, s.queryFragmentRegistry[queryKey], 3)
require.Len(t, s.pendingRequests, 3)
s.pendingRequestsMu.Unlock()

// Fragment 1 completes
s.cancelRequestAndRemoveFromPending(frontendAddr, queryID, fragmentID1, false)

// Verify fragment 1 removed, but fragments 2 and 3 remain
s.pendingRequestsMu.Lock()
require.Len(t, s.queryFragmentRegistry[queryKey], 2, "should have 2 fragments remaining")
require.ElementsMatch(t, []uint64{fragmentID2, fragmentID3}, s.queryFragmentRegistry[queryKey])
require.NotContains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID1})
require.Contains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID2})
require.Contains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID3})
s.pendingRequestsMu.Unlock()

// Fragment 2 completes
s.cancelRequestAndRemoveFromPending(frontendAddr, queryID, fragmentID2, false)

// Verify fragment 2 removed, only fragment 3 remains
s.pendingRequestsMu.Lock()
require.Len(t, s.queryFragmentRegistry[queryKey], 1, "should have 1 fragment remaining")
require.Equal(t, []uint64{fragmentID3}, s.queryFragmentRegistry[queryKey])
require.NotContains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID2})
require.Contains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID3})
s.pendingRequestsMu.Unlock()

// Fragment 3 completes (last fragment)
s.cancelRequestAndRemoveFromPending(frontendAddr, queryID, fragmentID3, false)

// Verify all cleaned up
s.pendingRequestsMu.Lock()
_, registryExists := s.queryFragmentRegistry[queryKey]
require.False(t, registryExists, "queryFragmentRegistry should be deleted when last fragment completes")
require.Empty(t, s.pendingRequests, "all pendingRequests should be cleaned up")
s.pendingRequestsMu.Unlock()
}

// TestQueryFragmentRegistryNoLeak verifies that repeated request completions
// don't cause queryFragmentRegistry to grow unbounded.
func TestQueryFragmentRegistryNoLeak(t *testing.T) {
cfg := Config{}
flagext.DefaultValues(&cfg)
s, err := NewScheduler(cfg, frontendv1.MockLimits{Queriers: 2, MockLimits: queue.MockLimits{MaxOutstanding: testMaxOutstandingPerTenant}}, log.NewNopLogger(), nil, false)
require.NoError(t, err)

frontendAddr := "frontend1"

// Simulate 100 requests completing normally
for i := range 100 {
queryID := uint64(i)
fragmentID := uint64(0)

ctx, cancel := context.WithCancel(context.Background())
req := &schedulerRequest{
frontendAddress: frontendAddr,
userID: "user1",
queryID: queryID,
ctx: ctx,
ctxCancel: cancel,
}

// Add to registry and pending requests
s.pendingRequestsMu.Lock()
queryKey := queryKey{frontendAddr: frontendAddr, queryID: queryID}
s.queryFragmentRegistry[queryKey] = []uint64{fragmentID}
s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID}] = req
s.pendingRequestsMu.Unlock()

// Complete the request
s.cancelRequestAndRemoveFromPending(frontendAddr, queryID, fragmentID, false)

cancel()
}

// Verify no leak: registry should be empty
s.pendingRequestsMu.Lock()
require.Empty(t, s.queryFragmentRegistry, "queryFragmentRegistry should be empty after all requests complete")
require.Empty(t, s.pendingRequests, "pendingRequests should be empty after all requests complete")
s.pendingRequestsMu.Unlock()
}
Loading