Skip to content

Commit ca8da96

Browse files
authored
Introduce group_replica partial response strategy (#33)
2 parents 971761a + 3e96d0d commit ca8da96

File tree

15 files changed

+505
-85
lines changed

15 files changed

+505
-85
lines changed

cmd/thanos/query.go

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ const (
6767
promqlNegativeOffset = "promql-negative-offset"
6868
promqlAtModifier = "promql-at-modifier"
6969
queryPushdown = "query-pushdown"
70+
dnsPrefix = "dnssrv+"
7071
)
7172

7273
type queryMode string
@@ -188,6 +189,9 @@ func registerQuery(app *extkingpin.App) {
188189
enableQueryPartialResponse := cmd.Flag("query.partial-response", "Enable partial response for queries if no partial_response param is specified. --no-query.partial-response for disabling.").
189190
Default("true").Bool()
190191

192+
enableGroupReplicaPartialStrategy := cmd.Flag("query.group-replica-strategy", "Enable group-replica partial response strategy.").
193+
Default("false").Bool()
194+
191195
enableRulePartialResponse := cmd.Flag("rule.partial-response", "Enable partial response for rules endpoint. --no-rule.partial-response for disabling.").
192196
Hidden().Default("true").Bool()
193197

@@ -369,6 +373,7 @@ func registerQuery(app *extkingpin.App) {
369373
*tenantCertField,
370374
*enforceTenancy,
371375
*tenantLabel,
376+
*enableGroupReplicaPartialStrategy,
372377
)
373378
})
374379
}
@@ -451,6 +456,7 @@ func runQuery(
451456
tenantCertField string,
452457
enforceTenancy bool,
453458
tenantLabel string,
459+
groupReplicaPartialResponseStrategy bool,
454460
) error {
455461
if alertQueryURL == "" {
456462
lastColon := strings.LastIndex(httpBindAddr, ":")
@@ -548,6 +554,8 @@ func runQuery(
548554
dialOpts,
549555
unhealthyStoreTimeout,
550556
endpointInfoTimeout,
557+
// ignoreErrors when group_replica partial response strategy is enabled.
558+
groupReplicaPartialResponseStrategy,
551559
queryConnMetricLabels...,
552560
)
553561

@@ -556,14 +564,26 @@ func runQuery(
556564
targetsProxy = targets.NewProxy(logger, endpoints.GetTargetsClients)
557565
metadataProxy = metadata.NewProxy(logger, endpoints.GetMetricMetadataClients)
558566
exemplarsProxy = exemplars.NewProxy(logger, endpoints.GetExemplarsStores, selectorLset)
567+
queryableCreator query.QueryableCreator
568+
)
569+
if groupReplicaPartialResponseStrategy {
570+
level.Info(logger).Log("msg", "Enabled group-replica partial response strategy")
571+
queryableCreator = query.NewQueryableCreatorWithGroupReplicaPartialResponseStrategy(
572+
logger,
573+
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
574+
proxy,
575+
maxConcurrentSelects,
576+
queryTimeout,
577+
)
578+
} else {
559579
queryableCreator = query.NewQueryableCreator(
560580
logger,
561581
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
562582
proxy,
563583
maxConcurrentSelects,
564584
queryTimeout,
565585
)
566-
)
586+
}
567587

568588
// Run File Service Discovery and update the store set when the files are modified.
569589
if fileSD != nil {
@@ -863,6 +883,7 @@ func prepareEndpointSet(
863883
dialOpts []grpc.DialOption,
864884
unhealthyStoreTimeout time.Duration,
865885
endpointInfoTimeout time.Duration,
886+
ignoreStoreErrors bool,
866887
queryConnMetricLabels ...string,
867888
) *query.EndpointSet {
868889
endpointSet := query.NewEndpointSet(
@@ -881,9 +902,13 @@ func prepareEndpointSet(
881902

882903
for _, dnsProvider := range dnsProviders {
883904
var tmpSpecs []*query.GRPCEndpointSpec
884-
885-
for _, addr := range dnsProvider.Addresses() {
886-
tmpSpecs = append(tmpSpecs, query.NewGRPCEndpointSpec(addr, false))
905+
for dnsName, addrs := range dnsProvider.AddressesWithDNS() {
906+
// The dns name is like "dnssrv+pantheon-db-rep0:10901" whose replica key is "pantheon-db-rep0".
907+
// TODO: have a more robust protocol to extract the replica key.
908+
replicaKey := strings.Split(strings.TrimPrefix(dnsName, dnsPrefix), ":")[0]
909+
for _, addr := range addrs {
910+
tmpSpecs = append(tmpSpecs, query.NewGRPCEndpointSpecWithReplicaKey(replicaKey, addr, false, ignoreStoreErrors))
911+
}
887912
}
888913
tmpSpecs = removeDuplicateEndpointSpecs(logger, duplicatedStores, tmpSpecs)
889914
specs = append(specs, tmpSpecs...)

cmd/thanos/rule.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,7 @@ func runRule(
439439
dialOpts,
440440
5*time.Minute,
441441
5*time.Second,
442+
false,
442443
)
443444

444445
// Periodically update the GRPC addresses from query config by resolving them using DNS SD if necessary.
@@ -911,7 +912,7 @@ func queryFuncCreator(
911912
var spanID string
912913

913914
switch partialResponseStrategy {
914-
case storepb.PartialResponseStrategy_WARN:
915+
case storepb.PartialResponseStrategy_WARN, storepb.PartialResponseStrategy_GROUP_REPLICA:
915916
spanID = "/rule_instant_query HTTP[client]"
916917
case storepb.PartialResponseStrategy_ABORT:
917918
spanID = "/rule_instant_query_part_resp_abort HTTP[client]"

pkg/discovery/dns/provider.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,3 +164,14 @@ func (p *Provider) Addresses() []string {
164164
}
165165
return result
166166
}
167+
168+
func (p *Provider) AddressesWithDNS() map[string][]string {
169+
p.RLock()
170+
defer p.RUnlock()
171+
172+
result := make(map[string][]string, len(p.resolved))
173+
for dns, addrs := range p.resolved {
174+
result[dns] = addrs
175+
}
176+
return result
177+
}

pkg/promclient/promclient.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ func (p *QueryOptions) AddTo(values url.Values) error {
397397

398398
var partialResponseValue string
399399
switch p.PartialResponseStrategy {
400-
case storepb.PartialResponseStrategy_WARN:
400+
case storepb.PartialResponseStrategy_WARN, storepb.PartialResponseStrategy_GROUP_REPLICA:
401401
partialResponseValue = strconv.FormatBool(true)
402402
case storepb.PartialResponseStrategy_ABORT:
403403
partialResponseValue = strconv.FormatBool(false)

0 commit comments

Comments
 (0)