From 06a9f45a3670f7f5abe298e3d4af39d77c3ccd87 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Mon, 8 Dec 2025 21:58:32 -0800 Subject: [PATCH 1/3] fix query scheduler memory leak by cleaning up fragment registry properly Signed-off-by: yeya24 --- pkg/scheduler/scheduler.go | 24 +++++ pkg/scheduler/scheduler_test.go | 183 ++++++++++++++++++++++++++++++++ 2 files changed, 207 insertions(+) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index cbe73c505fe..a5256d08a48 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -462,6 +462,30 @@ func (s *Scheduler) cancelRequestAndRemoveFromPending(frontendAddr string, query req.ctxCancel() } delete(s.pendingRequests, key) + + // Clean up queryFragmentRegistry for this specific fragment + if fragmentIDs, ok := s.queryFragmentRegistry[querykey]; ok { + // Fast path: if there's only one fragment and it's the one we're deleting, + // just delete the entire entry without allocating a new slice + if len(fragmentIDs) == 1 && fragmentIDs[0] == fragmentID { + delete(s.queryFragmentRegistry, querykey) + } else { + // Slow path: remove this fragmentID from the slice + newFragmentIDs := make([]uint64, 0, len(fragmentIDs)-1) + for _, fid := range fragmentIDs { + if fid != fragmentID { + newFragmentIDs = append(newFragmentIDs, fid) + } + } + + // If no more fragments remain, delete the entire entry + if len(newFragmentIDs) == 0 { + delete(s.queryFragmentRegistry, querykey) + } else { + s.queryFragmentRegistry[querykey] = newFragmentIDs + } + } + } } } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index aa246f1d16a..30501ba9a98 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -697,3 +697,186 @@ func (f *frontendMock) getRequest(queryID uint64) *httpgrpc.HTTPResponse { return f.resp[queryID] } + +// TestQueryFragmentRegistryCleanupSingleFragment verifies that queryFragmentRegistry +// is properly cleaned up when a single fragment completes (non-fragmenting mode). +func TestQueryFragmentRegistryCleanupSingleFragment(t *testing.T) { + cfg := Config{} + flagext.DefaultValues(&cfg) + s, err := NewScheduler(cfg, frontendv1.MockLimits{Queriers: 2, MockLimits: queue.MockLimits{MaxOutstanding: testMaxOutstandingPerTenant}}, log.NewNopLogger(), nil, false) + require.NoError(t, err) + + frontendAddr := "frontend1" + queryID := uint64(100) + fragmentID := uint64(0) + + // Simulate enqueue adding to both maps + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req := &schedulerRequest{ + frontendAddress: frontendAddr, + userID: "user1", + queryID: queryID, + ctx: ctx, + ctxCancel: cancel, + } + + s.pendingRequestsMu.Lock() + queryKey := queryKey{frontendAddr: frontendAddr, queryID: queryID} + s.queryFragmentRegistry[queryKey] = []uint64{fragmentID} + s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID}] = req + s.pendingRequestsMu.Unlock() + + // Verify both entries exist + s.pendingRequestsMu.Lock() + require.Len(t, s.queryFragmentRegistry[queryKey], 1) + require.Contains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID}) + s.pendingRequestsMu.Unlock() + + // Simulate request completion (cancelAll=false) + s.cancelRequestAndRemoveFromPending(frontendAddr, queryID, fragmentID, false) + + // Verify cleanup: both pendingRequests AND queryFragmentRegistry should be cleaned up + s.pendingRequestsMu.Lock() + _, registryExists := s.queryFragmentRegistry[queryKey] + require.False(t, registryExists, "queryFragmentRegistry should be cleaned up when last fragment completes") + require.NotContains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID}, "pendingRequests should be cleaned up") + s.pendingRequestsMu.Unlock() +} + +// TestQueryFragmentRegistryCleanupMultipleFragments verifies that queryFragmentRegistry +// properly removes only the completed fragment and keeps others when multiple fragments exist. +func TestQueryFragmentRegistryCleanupMultipleFragments(t *testing.T) { + cfg := Config{} + flagext.DefaultValues(&cfg) + s, err := NewScheduler(cfg, frontendv1.MockLimits{Queriers: 2, MockLimits: queue.MockLimits{MaxOutstanding: testMaxOutstandingPerTenant}}, log.NewNopLogger(), nil, true) + require.NoError(t, err) + + frontendAddr := "frontend1" + queryID := uint64(100) + fragmentID1 := uint64(0) + fragmentID2 := uint64(1) + fragmentID3 := uint64(2) + + // Simulate multiple fragments for the same query + ctx1, cancel1 := context.WithCancel(context.Background()) + defer cancel1() + ctx2, cancel2 := context.WithCancel(context.Background()) + defer cancel2() + ctx3, cancel3 := context.WithCancel(context.Background()) + defer cancel3() + + req1 := &schedulerRequest{ + frontendAddress: frontendAddr, + userID: "user1", + queryID: queryID, + ctx: ctx1, + ctxCancel: cancel1, + } + req2 := &schedulerRequest{ + frontendAddress: frontendAddr, + userID: "user1", + queryID: queryID, + ctx: ctx2, + ctxCancel: cancel2, + } + req3 := &schedulerRequest{ + frontendAddress: frontendAddr, + userID: "user1", + queryID: queryID, + ctx: ctx3, + ctxCancel: cancel3, + } + + s.pendingRequestsMu.Lock() + queryKey := queryKey{frontendAddr: frontendAddr, queryID: queryID} + s.queryFragmentRegistry[queryKey] = []uint64{fragmentID1, fragmentID2, fragmentID3} + s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID1}] = req1 + s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID2}] = req2 + s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID3}] = req3 + s.pendingRequestsMu.Unlock() + + // Verify all three fragments exist + s.pendingRequestsMu.Lock() + require.Len(t, s.queryFragmentRegistry[queryKey], 3) + require.Len(t, s.pendingRequests, 3) + s.pendingRequestsMu.Unlock() + + // Fragment 1 completes + s.cancelRequestAndRemoveFromPending(frontendAddr, queryID, fragmentID1, false) + + // Verify fragment 1 removed, but fragments 2 and 3 remain + s.pendingRequestsMu.Lock() + require.Len(t, s.queryFragmentRegistry[queryKey], 2, "should have 2 fragments remaining") + require.ElementsMatch(t, []uint64{fragmentID2, fragmentID3}, s.queryFragmentRegistry[queryKey]) + require.NotContains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID1}) + require.Contains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID2}) + require.Contains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID3}) + s.pendingRequestsMu.Unlock() + + // Fragment 2 completes + s.cancelRequestAndRemoveFromPending(frontendAddr, queryID, fragmentID2, false) + + // Verify fragment 2 removed, only fragment 3 remains + s.pendingRequestsMu.Lock() + require.Len(t, s.queryFragmentRegistry[queryKey], 1, "should have 1 fragment remaining") + require.Equal(t, []uint64{fragmentID3}, s.queryFragmentRegistry[queryKey]) + require.NotContains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID2}) + require.Contains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID3}) + s.pendingRequestsMu.Unlock() + + // Fragment 3 completes (last fragment) + s.cancelRequestAndRemoveFromPending(frontendAddr, queryID, fragmentID3, false) + + // Verify all cleaned up + s.pendingRequestsMu.Lock() + _, registryExists := s.queryFragmentRegistry[queryKey] + require.False(t, registryExists, "queryFragmentRegistry should be deleted when last fragment completes") + require.Empty(t, s.pendingRequests, "all pendingRequests should be cleaned up") + s.pendingRequestsMu.Unlock() +} + +// TestQueryFragmentRegistryNoLeak verifies that repeated request completions +// don't cause queryFragmentRegistry to grow unbounded. +func TestQueryFragmentRegistryNoLeak(t *testing.T) { + cfg := Config{} + flagext.DefaultValues(&cfg) + s, err := NewScheduler(cfg, frontendv1.MockLimits{Queriers: 2, MockLimits: queue.MockLimits{MaxOutstanding: testMaxOutstandingPerTenant}}, log.NewNopLogger(), nil, false) + require.NoError(t, err) + + frontendAddr := "frontend1" + + // Simulate 100 requests completing normally + for i := 0; i < 100; i++ { + queryID := uint64(i) + fragmentID := uint64(0) + + ctx, cancel := context.WithCancel(context.Background()) + req := &schedulerRequest{ + frontendAddress: frontendAddr, + userID: "user1", + queryID: queryID, + ctx: ctx, + ctxCancel: cancel, + } + + // Add to registry and pending requests + s.pendingRequestsMu.Lock() + queryKey := queryKey{frontendAddr: frontendAddr, queryID: queryID} + s.queryFragmentRegistry[queryKey] = []uint64{fragmentID} + s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID}] = req + s.pendingRequestsMu.Unlock() + + // Complete the request + s.cancelRequestAndRemoveFromPending(frontendAddr, queryID, fragmentID, false) + + cancel() + } + + // Verify no leak: registry should be empty + s.pendingRequestsMu.Lock() + require.Empty(t, s.queryFragmentRegistry, "queryFragmentRegistry should be empty after all requests complete") + require.Empty(t, s.pendingRequests, "pendingRequests should be empty after all requests complete") + s.pendingRequestsMu.Unlock() +} From c36d61210c013640001a5f69d3e7f7a06e942fa9 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Mon, 8 Dec 2025 22:05:08 -0800 Subject: [PATCH 2/3] changelog Signed-off-by: yeya24 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6232398eacc..6e1dbb67d4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 * [BUGFIX] Ruler: Add XFunctions validation support. #7111 * [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132 +* [BUGFIX] Scheduler: Fix memory leak by properly cleaning up query fragment registry. #7148 ## 1.20.1 2025-12-03 From 064dd775859397d8135150fbbb6b3d72a94a34f3 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Tue, 9 Dec 2025 08:37:54 -0800 Subject: [PATCH 3/3] fix lint Signed-off-by: yeya24 --- pkg/scheduler/scheduler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 30501ba9a98..57a017f94d5 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -848,7 +848,7 @@ func TestQueryFragmentRegistryNoLeak(t *testing.T) { frontendAddr := "frontend1" // Simulate 100 requests completing normally - for i := 0; i < 100; i++ { + for i := range 100 { queryID := uint64(i) fragmentID := uint64(0)