Skip to content

Commit 0c63617

Browse files
authored
Improve block plan creation for standalone binary mode (#3325)
* Improve block plan handling for standalone binary mode * Adapt tests to change in block plan struct * Add check before querying store gateways
1 parent 8348529 commit 0c63617

File tree

4 files changed

+79
-45
lines changed

4 files changed

+79
-45
lines changed

pkg/querier/analyze_query.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@ func (q *Querier) AnalyzeQuery(ctx context.Context, req *connect.Request[querier
3434
}
3535
addBlockStatsToQueryScope(blockStatsFromReplicas, ingesterQueryScope)
3636

37-
blockStatsFromReplicas, err = q.getBlockStatsFromStoreGateways(ctx, plan, storeGatewayQueryScope.blockIds)
38-
if err != nil {
39-
return nil, err
37+
if q.storeGatewayQuerier != nil {
38+
blockStatsFromReplicas, err = q.getBlockStatsFromStoreGateways(ctx, plan, storeGatewayQueryScope.blockIds)
39+
if err != nil {
40+
return nil, err
41+
}
42+
addBlockStatsToQueryScope(blockStatsFromReplicas, storeGatewayQueryScope)
4043
}
41-
addBlockStatsToQueryScope(blockStatsFromReplicas, storeGatewayQueryScope)
4244

4345
queriedSeries, err := q.getQueriedSeriesCount(ctx, req.Msg)
4446
if err != nil {
@@ -66,14 +68,16 @@ func getDataFromPlan(plan blockPlan) (ingesterQueryScope *queryScope, storeGatew
6668
deduplicationNeeded = false
6769
for _, planEntry := range plan {
6870
deduplicationNeeded = deduplicationNeeded || planEntry.Deduplication
69-
if planEntry.InstanceType == ingesterInstance {
70-
ingesterQueryScope.ComponentCount += 1
71-
ingesterQueryScope.BlockCount += uint64(len(planEntry.Ulids))
72-
ingesterQueryScope.blockIds = append(ingesterQueryScope.blockIds, planEntry.Ulids...)
73-
} else {
74-
storeGatewayQueryScope.ComponentCount += 1
75-
storeGatewayQueryScope.BlockCount += uint64(len(planEntry.Ulids))
76-
storeGatewayQueryScope.blockIds = append(storeGatewayQueryScope.blockIds, planEntry.Ulids...)
71+
for _, t := range planEntry.InstanceTypes {
72+
if t == ingesterInstance {
73+
ingesterQueryScope.ComponentCount += 1
74+
ingesterQueryScope.BlockCount += uint64(len(planEntry.Ulids))
75+
ingesterQueryScope.blockIds = append(ingesterQueryScope.blockIds, planEntry.Ulids...)
76+
} else {
77+
storeGatewayQueryScope.ComponentCount += 1
78+
storeGatewayQueryScope.BlockCount += uint64(len(planEntry.Ulids))
79+
storeGatewayQueryScope.blockIds = append(storeGatewayQueryScope.blockIds, planEntry.Ulids...)
80+
}
7781
}
7882
}
7983
return ingesterQueryScope, storeGatewayQueryScope, deduplicationNeeded

pkg/querier/analyze_query_test.go

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ func Test_getDataFromPlan(t *testing.T) {
4242
name: "plan with ingesters only",
4343
plan: blockPlan{
4444
"replica 1": &blockPlanEntry{
45-
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block A", "block B"}, Deduplication: true},
46-
InstanceType: ingesterInstance,
45+
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block A", "block B"}, Deduplication: true},
46+
InstanceTypes: []instanceType{ingesterInstance},
4747
},
4848
"replica 2": &blockPlanEntry{
49-
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block C", "block D"}, Deduplication: true},
50-
InstanceType: ingesterInstance,
49+
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block C", "block D"}, Deduplication: true},
50+
InstanceTypes: []instanceType{ingesterInstance},
5151
},
5252
},
5353
verifyIngesterQueryScope: func(t *testing.T, scope *queryScope) {
@@ -66,16 +66,16 @@ func Test_getDataFromPlan(t *testing.T) {
6666
name: "plan with ingesters and store gateways",
6767
plan: blockPlan{
6868
"replica 1": &blockPlanEntry{
69-
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block A", "block B"}, Deduplication: true},
70-
InstanceType: ingesterInstance,
69+
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block A", "block B"}, Deduplication: true},
70+
InstanceTypes: []instanceType{ingesterInstance},
7171
},
7272
"replica 2": &blockPlanEntry{
73-
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block C", "block D"}, Deduplication: true},
74-
InstanceType: ingesterInstance,
73+
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block C", "block D"}, Deduplication: true},
74+
InstanceTypes: []instanceType{ingesterInstance},
7575
},
7676
"replica 3": &blockPlanEntry{
77-
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block E", "block F"}, Deduplication: true},
78-
InstanceType: storeGatewayInstance,
77+
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block E", "block F"}, Deduplication: true},
78+
InstanceTypes: []instanceType{storeGatewayInstance},
7979
},
8080
},
8181
verifyIngesterQueryScope: func(t *testing.T, scope *queryScope) {
@@ -94,6 +94,30 @@ func Test_getDataFromPlan(t *testing.T) {
9494
},
9595
wantDeduplicationNeeded: true,
9696
},
97+
{
98+
name: "plan with a single replica with dual instance types (standalone binary)",
99+
plan: blockPlan{
100+
"replica 1": &blockPlanEntry{
101+
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block A"}, Deduplication: true},
102+
InstanceTypes: []instanceType{ingesterInstance, storeGatewayInstance},
103+
},
104+
},
105+
verifyIngesterQueryScope: func(t *testing.T, scope *queryScope) {
106+
require.Equal(t, uint64(1), scope.ComponentCount)
107+
require.Equal(t, uint64(1), scope.BlockCount)
108+
for _, block := range []string{"block A"} {
109+
require.True(t, slices.Contains(scope.blockIds, block))
110+
}
111+
},
112+
verifyStoreGatewayQueryScope: func(t *testing.T, scope *queryScope) {
113+
require.Equal(t, uint64(1), scope.ComponentCount)
114+
require.Equal(t, uint64(1), scope.BlockCount)
115+
for _, block := range []string{"block A"} {
116+
require.True(t, slices.Contains(scope.blockIds, block))
117+
}
118+
},
119+
wantDeduplicationNeeded: true,
120+
},
97121
}
98122
for _, tt := range tests {
99123
t.Run(tt.name, func(t *testing.T) {

pkg/querier/querier_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1350,7 +1350,7 @@ func Test_splitQueryToStores(t *testing.T) {
13501350
start: model.TimeFromUnixNano(int64(30 * time.Minute)),
13511351
end: model.TimeFromUnixNano(int64(45*time.Minute) + int64(3*time.Hour)),
13521352
queryStoreAfter: 30 * time.Minute,
1353-
plan: blockPlan{"replica-a": &blockPlanEntry{InstanceType: ingesterInstance, BlockHints: &ingestv1.BlockHints{Ulids: []string{"block-a", "block-b"}}}},
1353+
plan: blockPlan{"replica-a": &blockPlanEntry{InstanceTypes: []instanceType{ingesterInstance}, BlockHints: &ingestv1.BlockHints{Ulids: []string{"block-a", "block-b"}}}},
13541354

13551355
expected: storeQueries{
13561356
queryStoreAfter: 0,

pkg/querier/replication.go

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -144,26 +144,25 @@ const (
144144
// map of block ID to replicas containing the block, when empty replicas, the
145145
// block is already contained by a higher compaction level block in full.
146146
type replicasPerBlockID struct {
147-
m map[string][]string
148-
meta map[string]*typesv1.BlockInfo
149-
instanceType map[string]instanceType
150-
logger log.Logger
147+
m map[string][]string
148+
meta map[string]*typesv1.BlockInfo
149+
instanceTypes map[string][]instanceType
150+
logger log.Logger
151151
}
152152

153153
func newReplicasPerBlockID(logger log.Logger) *replicasPerBlockID {
154154
return &replicasPerBlockID{
155-
m: make(map[string][]string),
156-
meta: make(map[string]*typesv1.BlockInfo),
157-
instanceType: make(map[string]instanceType),
158-
logger: logger,
155+
m: make(map[string][]string),
156+
meta: make(map[string]*typesv1.BlockInfo),
157+
instanceTypes: make(map[string][]instanceType),
158+
logger: logger,
159159
}
160160
}
161161

162162
func (r *replicasPerBlockID) add(result []ResponseFromReplica[[]*typesv1.BlockInfo], t instanceType) {
163163
for _, replica := range result {
164-
// mark the replica's instance type
165-
// TODO: Figure out if that breaks in single binary mode
166-
r.instanceType[replica.addr] = t
164+
// mark the replica's instance types (in single binary we can have the same replica have multiple types)
165+
r.instanceTypes[replica.addr] = append(r.instanceTypes[replica.addr], t)
167166

168167
for _, block := range replica.response {
169168
// add block to map
@@ -322,7 +321,7 @@ func (r *replicasPerBlockID) pruneSupersededBlocks(sharded bool) error {
322321

323322
type blockPlanEntry struct {
324323
*ingestv1.BlockHints
325-
InstanceType instanceType
324+
InstanceTypes []instanceType
326325
}
327326

328327
type blockPlan map[string]*blockPlanEntry
@@ -397,11 +396,16 @@ func (r *replicasPerBlockID) blockPlan(ctx context.Context) map[string]*blockPla
397396

398397
// only get store gateways replicas
399398
sgReplicas := lo.Filter(replicas, func(replica string, _ int) bool {
400-
t, ok := r.instanceType[replica]
399+
instanceTypes, ok := r.instanceTypes[replica]
401400
if !ok {
402401
return false
403402
}
404-
return t == storeGatewayInstance
403+
for _, t := range instanceTypes {
404+
if t == storeGatewayInstance {
405+
return true
406+
}
407+
}
408+
return false
405409
})
406410

407411
if len(sgReplicas) > 0 {
@@ -423,8 +427,8 @@ func (r *replicasPerBlockID) blockPlan(ctx context.Context) map[string]*blockPla
423427
p, exists := plan[selectedReplica]
424428
if !exists {
425429
p = &blockPlanEntry{
426-
BlockHints: &ingestv1.BlockHints{},
427-
InstanceType: r.instanceType[selectedReplica],
430+
BlockHints: &ingestv1.BlockHints{},
431+
InstanceTypes: r.instanceTypes[selectedReplica],
428432
}
429433
plan[selectedReplica] = p
430434
}
@@ -443,15 +447,17 @@ func (r *replicasPerBlockID) blockPlan(ctx context.Context) map[string]*blockPla
443447

444448
var plannedIngesterBlocks, plannedStoreGatewayBlocks int
445449
for replica, blocks := range plan {
446-
t, ok := r.instanceType[replica]
450+
instanceTypes, ok := r.instanceTypes[replica]
447451
if !ok {
448452
continue
449453
}
450-
if t == storeGatewayInstance {
451-
plannedStoreGatewayBlocks += len(blocks.Ulids)
452-
}
453-
if t == ingesterInstance {
454-
plannedIngesterBlocks += len(blocks.Ulids)
454+
for _, t := range instanceTypes {
455+
if t == storeGatewayInstance {
456+
plannedStoreGatewayBlocks += len(blocks.Ulids)
457+
}
458+
if t == ingesterInstance {
459+
plannedIngesterBlocks += len(blocks.Ulids)
460+
}
455461
}
456462
}
457463

0 commit comments

Comments
 (0)