Skip to content

Commit a864fd4

Browse files
authored
Fix query scheduler memory leak by cleaning up fragment registry (#7148)
1 parent e4a83be commit a864fd4

File tree

3 files changed

+208
-0
lines changed

3 files changed

+208
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
1818
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
1919
* [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132
20+
* [BUGFIX] Scheduler: Fix memory leak by properly cleaning up query fragment registry. #7148
2021

2122
## 1.20.1 2025-12-03
2223

pkg/scheduler/scheduler.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,30 @@ func (s *Scheduler) cancelRequestAndRemoveFromPending(frontendAddr string, query
462462
req.ctxCancel()
463463
}
464464
delete(s.pendingRequests, key)
465+
466+
// Clean up queryFragmentRegistry for this specific fragment
467+
if fragmentIDs, ok := s.queryFragmentRegistry[querykey]; ok {
468+
// Fast path: if there's only one fragment and it's the one we're deleting,
469+
// just delete the entire entry without allocating a new slice
470+
if len(fragmentIDs) == 1 && fragmentIDs[0] == fragmentID {
471+
delete(s.queryFragmentRegistry, querykey)
472+
} else {
473+
// Slow path: remove this fragmentID from the slice
474+
newFragmentIDs := make([]uint64, 0, len(fragmentIDs)-1)
475+
for _, fid := range fragmentIDs {
476+
if fid != fragmentID {
477+
newFragmentIDs = append(newFragmentIDs, fid)
478+
}
479+
}
480+
481+
// If no more fragments remain, delete the entire entry
482+
if len(newFragmentIDs) == 0 {
483+
delete(s.queryFragmentRegistry, querykey)
484+
} else {
485+
s.queryFragmentRegistry[querykey] = newFragmentIDs
486+
}
487+
}
488+
}
465489
}
466490
}
467491

pkg/scheduler/scheduler_test.go

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -697,3 +697,186 @@ func (f *frontendMock) getRequest(queryID uint64) *httpgrpc.HTTPResponse {
697697

698698
return f.resp[queryID]
699699
}
700+
701+
// TestQueryFragmentRegistryCleanupSingleFragment verifies that queryFragmentRegistry
702+
// is properly cleaned up when a single fragment completes (non-fragmenting mode).
703+
func TestQueryFragmentRegistryCleanupSingleFragment(t *testing.T) {
704+
cfg := Config{}
705+
flagext.DefaultValues(&cfg)
706+
s, err := NewScheduler(cfg, frontendv1.MockLimits{Queriers: 2, MockLimits: queue.MockLimits{MaxOutstanding: testMaxOutstandingPerTenant}}, log.NewNopLogger(), nil, false)
707+
require.NoError(t, err)
708+
709+
frontendAddr := "frontend1"
710+
queryID := uint64(100)
711+
fragmentID := uint64(0)
712+
713+
// Simulate enqueue adding to both maps
714+
ctx, cancel := context.WithCancel(context.Background())
715+
defer cancel()
716+
717+
req := &schedulerRequest{
718+
frontendAddress: frontendAddr,
719+
userID: "user1",
720+
queryID: queryID,
721+
ctx: ctx,
722+
ctxCancel: cancel,
723+
}
724+
725+
s.pendingRequestsMu.Lock()
726+
queryKey := queryKey{frontendAddr: frontendAddr, queryID: queryID}
727+
s.queryFragmentRegistry[queryKey] = []uint64{fragmentID}
728+
s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID}] = req
729+
s.pendingRequestsMu.Unlock()
730+
731+
// Verify both entries exist
732+
s.pendingRequestsMu.Lock()
733+
require.Len(t, s.queryFragmentRegistry[queryKey], 1)
734+
require.Contains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID})
735+
s.pendingRequestsMu.Unlock()
736+
737+
// Simulate request completion (cancelAll=false)
738+
s.cancelRequestAndRemoveFromPending(frontendAddr, queryID, fragmentID, false)
739+
740+
// Verify cleanup: both pendingRequests AND queryFragmentRegistry should be cleaned up
741+
s.pendingRequestsMu.Lock()
742+
_, registryExists := s.queryFragmentRegistry[queryKey]
743+
require.False(t, registryExists, "queryFragmentRegistry should be cleaned up when last fragment completes")
744+
require.NotContains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID}, "pendingRequests should be cleaned up")
745+
s.pendingRequestsMu.Unlock()
746+
}
747+
748+
// TestQueryFragmentRegistryCleanupMultipleFragments verifies that queryFragmentRegistry
749+
// properly removes only the completed fragment and keeps others when multiple fragments exist.
750+
func TestQueryFragmentRegistryCleanupMultipleFragments(t *testing.T) {
751+
cfg := Config{}
752+
flagext.DefaultValues(&cfg)
753+
s, err := NewScheduler(cfg, frontendv1.MockLimits{Queriers: 2, MockLimits: queue.MockLimits{MaxOutstanding: testMaxOutstandingPerTenant}}, log.NewNopLogger(), nil, true)
754+
require.NoError(t, err)
755+
756+
frontendAddr := "frontend1"
757+
queryID := uint64(100)
758+
fragmentID1 := uint64(0)
759+
fragmentID2 := uint64(1)
760+
fragmentID3 := uint64(2)
761+
762+
// Simulate multiple fragments for the same query
763+
ctx1, cancel1 := context.WithCancel(context.Background())
764+
defer cancel1()
765+
ctx2, cancel2 := context.WithCancel(context.Background())
766+
defer cancel2()
767+
ctx3, cancel3 := context.WithCancel(context.Background())
768+
defer cancel3()
769+
770+
req1 := &schedulerRequest{
771+
frontendAddress: frontendAddr,
772+
userID: "user1",
773+
queryID: queryID,
774+
ctx: ctx1,
775+
ctxCancel: cancel1,
776+
}
777+
req2 := &schedulerRequest{
778+
frontendAddress: frontendAddr,
779+
userID: "user1",
780+
queryID: queryID,
781+
ctx: ctx2,
782+
ctxCancel: cancel2,
783+
}
784+
req3 := &schedulerRequest{
785+
frontendAddress: frontendAddr,
786+
userID: "user1",
787+
queryID: queryID,
788+
ctx: ctx3,
789+
ctxCancel: cancel3,
790+
}
791+
792+
s.pendingRequestsMu.Lock()
793+
queryKey := queryKey{frontendAddr: frontendAddr, queryID: queryID}
794+
s.queryFragmentRegistry[queryKey] = []uint64{fragmentID1, fragmentID2, fragmentID3}
795+
s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID1}] = req1
796+
s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID2}] = req2
797+
s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID3}] = req3
798+
s.pendingRequestsMu.Unlock()
799+
800+
// Verify all three fragments exist
801+
s.pendingRequestsMu.Lock()
802+
require.Len(t, s.queryFragmentRegistry[queryKey], 3)
803+
require.Len(t, s.pendingRequests, 3)
804+
s.pendingRequestsMu.Unlock()
805+
806+
// Fragment 1 completes
807+
s.cancelRequestAndRemoveFromPending(frontendAddr, queryID, fragmentID1, false)
808+
809+
// Verify fragment 1 removed, but fragments 2 and 3 remain
810+
s.pendingRequestsMu.Lock()
811+
require.Len(t, s.queryFragmentRegistry[queryKey], 2, "should have 2 fragments remaining")
812+
require.ElementsMatch(t, []uint64{fragmentID2, fragmentID3}, s.queryFragmentRegistry[queryKey])
813+
require.NotContains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID1})
814+
require.Contains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID2})
815+
require.Contains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID3})
816+
s.pendingRequestsMu.Unlock()
817+
818+
// Fragment 2 completes
819+
s.cancelRequestAndRemoveFromPending(frontendAddr, queryID, fragmentID2, false)
820+
821+
// Verify fragment 2 removed, only fragment 3 remains
822+
s.pendingRequestsMu.Lock()
823+
require.Len(t, s.queryFragmentRegistry[queryKey], 1, "should have 1 fragment remaining")
824+
require.Equal(t, []uint64{fragmentID3}, s.queryFragmentRegistry[queryKey])
825+
require.NotContains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID2})
826+
require.Contains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID3})
827+
s.pendingRequestsMu.Unlock()
828+
829+
// Fragment 3 completes (last fragment)
830+
s.cancelRequestAndRemoveFromPending(frontendAddr, queryID, fragmentID3, false)
831+
832+
// Verify all cleaned up
833+
s.pendingRequestsMu.Lock()
834+
_, registryExists := s.queryFragmentRegistry[queryKey]
835+
require.False(t, registryExists, "queryFragmentRegistry should be deleted when last fragment completes")
836+
require.Empty(t, s.pendingRequests, "all pendingRequests should be cleaned up")
837+
s.pendingRequestsMu.Unlock()
838+
}
839+
840+
// TestQueryFragmentRegistryNoLeak verifies that repeated request completions
841+
// don't cause queryFragmentRegistry to grow unbounded.
842+
func TestQueryFragmentRegistryNoLeak(t *testing.T) {
843+
cfg := Config{}
844+
flagext.DefaultValues(&cfg)
845+
s, err := NewScheduler(cfg, frontendv1.MockLimits{Queriers: 2, MockLimits: queue.MockLimits{MaxOutstanding: testMaxOutstandingPerTenant}}, log.NewNopLogger(), nil, false)
846+
require.NoError(t, err)
847+
848+
frontendAddr := "frontend1"
849+
850+
// Simulate 100 requests completing normally
851+
for i := range 100 {
852+
queryID := uint64(i)
853+
fragmentID := uint64(0)
854+
855+
ctx, cancel := context.WithCancel(context.Background())
856+
req := &schedulerRequest{
857+
frontendAddress: frontendAddr,
858+
userID: "user1",
859+
queryID: queryID,
860+
ctx: ctx,
861+
ctxCancel: cancel,
862+
}
863+
864+
// Add to registry and pending requests
865+
s.pendingRequestsMu.Lock()
866+
queryKey := queryKey{frontendAddr: frontendAddr, queryID: queryID}
867+
s.queryFragmentRegistry[queryKey] = []uint64{fragmentID}
868+
s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID}] = req
869+
s.pendingRequestsMu.Unlock()
870+
871+
// Complete the request
872+
s.cancelRequestAndRemoveFromPending(frontendAddr, queryID, fragmentID, false)
873+
874+
cancel()
875+
}
876+
877+
// Verify no leak: registry should be empty
878+
s.pendingRequestsMu.Lock()
879+
require.Empty(t, s.queryFragmentRegistry, "queryFragmentRegistry should be empty after all requests complete")
880+
require.Empty(t, s.pendingRequests, "pendingRequests should be empty after all requests complete")
881+
s.pendingRequestsMu.Unlock()
882+
}

0 commit comments

Comments
 (0)