Skip to content

scheduler: fragment queue and querier pick-up coordination #6968

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

rubywtl
Copy link
Contributor

@rubywtl rubywtl commented Aug 13, 2025

What this PR does:
This PR introduces a Fragmenter interface that splits logical query plans into fragments when distributed execution is enabled. The Fragmenter appends metadata to each fragment for tracking, which the scheduler then uses to route fragments to appropriate queriers. The scheduler maintains a mapping between fragments and querier addresses to track fragment locations across the distributed system.

Which issue(s) this PR fixes:

Checklist

  • Tests updated
  • Documentation added
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX]

@@ -414,7 +416,12 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {

t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
t.Cfg.Worker.TargetHeaders = t.Cfg.API.HTTPRequestHeadersToLog
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer)
ipAddr, err := ring.GetInstanceAddr(t.Cfg.Alertmanager.ShardingRing.InstanceAddr, t.Cfg.Alertmanager.ShardingRing.InstanceInterfaceNames, util_log.Logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why using alertmanager config here

Copy link
Contributor Author

@rubywtl rubywtl Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gPRC params I needed are under RingConfig struct, which is called ShardedRing here, but it doesn't exist under querier

[update] I will add new field (ring configs) for querier 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm I don't think we want to add a Ring for querier. We just need the configurations for the addresses and interface, etc

defer f.mu.Unlock()

keysToDelete := make([]distributed_execution.FragmentKey, 0)
for key := range f.mappings {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the methods you have, is it easier to change mappings from mappings map[distributed_execution.FragmentKey]string to map[uint64]map[uint64]string?

You can find the map by just a lookup

Copy link
Contributor Author

@rubywtl rubywtl Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but I made the FragmentKey struct so that it is easier to maintain (for example: if we ever want to change the types for the IDs or add more fields, we dont have to go through the codebase to fix it), and the code will be easier to understand (more literal). This fragment key type is also reused for remote nodes and child-root execution accesses to result cache in future PRs.

@rubywtl rubywtl force-pushed the scheduler/logicalplan_fragment_coordination branch from ac9f641 to 4f75333 Compare August 14, 2025 23:15
@rubywtl rubywtl force-pushed the scheduler/logicalplan_fragment_coordination branch from 4f75333 to 942674c Compare August 14, 2025 23:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants