Skip to content

Commit d3b24cb

Browse files
authored
Introduce quorum-based chunk deduplication strategy (#68)
2 parents 759d333 + ee7c342 commit d3b24cb

File tree

4 files changed

+252
-7
lines changed

4 files changed

+252
-7
lines changed

cmd/thanos/query.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ func registerQuery(app *extkingpin.App) {
129129

130130
enableDedupMerge := cmd.Flag("query.dedup-merge", "Enable deduplication merge of multiple time series with the same labels.").
131131
Default("false").Bool()
132+
enableQuorumChunkDedup := cmd.Flag("query.quorum-chunk-dedup", "Enable quorum-based deduplication for chuncks from replicas.").
133+
Default("false").Bool()
132134

133135
instantDefaultMaxSourceResolution := extkingpin.ModelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())
134136

@@ -378,6 +380,7 @@ func registerQuery(app *extkingpin.App) {
378380
*tenantLabel,
379381
*enableGroupReplicaPartialStrategy,
380382
*enableDedupMerge,
383+
*enableQuorumChunkDedup,
381384
)
382385
})
383386
}
@@ -462,6 +465,7 @@ func runQuery(
462465
tenantLabel string,
463466
groupReplicaPartialResponseStrategy bool,
464467
enableDedupMerge bool,
468+
enableQuorumChunkDedup bool,
465469
) error {
466470
if alertQueryURL == "" {
467471
lastColon := strings.LastIndex(httpBindAddr, ":")
@@ -536,6 +540,7 @@ func runQuery(
536540
options := []store.ProxyStoreOption{
537541
store.WithTSDBSelector(tsdbSelector),
538542
store.WithProxyStoreDebugLogging(debugLogging),
543+
store.WithQuorumChunkDedup(enableQuorumChunkDedup),
539544
}
540545

541546
var (

pkg/store/proxy.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ type ProxyStore struct {
9494
retrievalStrategy RetrievalStrategy
9595
debugLogging bool
9696
tsdbSelector *TSDBSelector
97+
quorumChunkDedup bool
9798
}
9899

99100
type proxyStoreMetrics struct {
@@ -127,6 +128,12 @@ func WithProxyStoreDebugLogging(enable bool) ProxyStoreOption {
127128
}
128129
}
129130

131+
func WithQuorumChunkDedup(enable bool) ProxyStoreOption {
132+
return func(s *ProxyStore) {
133+
s.quorumChunkDedup = enable
134+
}
135+
}
136+
130137
// WithTSDBSelector sets the TSDB selector for the proxy.
131138
func WithTSDBSelector(selector *TSDBSelector) ProxyStoreOption {
132139
return func(s *ProxyStore) {
@@ -449,6 +456,9 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
449456
level.Debug(reqLogger).Log("msg", "Series: started fanout streams", "status", strings.Join(storeDebugMsgs, ";"))
450457

451458
respHeap := NewResponseDeduplicator(NewProxyResponseLoserTree(storeResponses...))
459+
if s.quorumChunkDedup {
460+
respHeap.quorumChunkDedup = true
461+
}
452462
for respHeap.Next() {
453463
resp := respHeap.At()
454464

pkg/store/proxy_merge.go

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ type responseDeduplicator struct {
3434
bufferedResp []*storepb.SeriesResponse
3535
buffRespI int
3636

37-
prev *storepb.SeriesResponse
38-
ok bool
37+
prev *storepb.SeriesResponse
38+
ok bool
39+
quorumChunkDedup bool
3940
}
4041

4142
// NewResponseDeduplicator returns a wrapper around a loser tree that merges duplicated series messages into one.
@@ -73,7 +74,7 @@ func (d *responseDeduplicator) Next() bool {
7374
d.ok = d.h.Next()
7475
if !d.ok {
7576
if len(d.bufferedSameSeries) > 0 {
76-
d.bufferedResp = append(d.bufferedResp, chainSeriesAndRemIdenticalChunks(d.bufferedSameSeries))
77+
d.bufferedResp = append(d.bufferedResp, chainSeriesAndRemIdenticalChunks(d.bufferedSameSeries, d.quorumChunkDedup))
7778
}
7879
return len(d.bufferedResp) > 0
7980
}
@@ -101,15 +102,16 @@ func (d *responseDeduplicator) Next() bool {
101102
continue
102103
}
103104

104-
d.bufferedResp = append(d.bufferedResp, chainSeriesAndRemIdenticalChunks(d.bufferedSameSeries))
105+
d.bufferedResp = append(d.bufferedResp, chainSeriesAndRemIdenticalChunks(d.bufferedSameSeries, d.quorumChunkDedup))
105106
d.prev = s
106107

107108
return true
108109
}
109110
}
110111

111-
func chainSeriesAndRemIdenticalChunks(series []*storepb.SeriesResponse) *storepb.SeriesResponse {
112+
func chainSeriesAndRemIdenticalChunks(series []*storepb.SeriesResponse, quorum bool) *storepb.SeriesResponse {
112113
chunkDedupMap := map[uint64]*storepb.AggrChunk{}
114+
chunckCountMap := map[uint64]int{}
113115

114116
for _, s := range series {
115117
for _, chk := range s.GetSeries().Chunks {
@@ -127,7 +129,10 @@ func chainSeriesAndRemIdenticalChunks(series []*storepb.SeriesResponse) *storepb
127129
if _, ok := chunkDedupMap[hash]; !ok {
128130
chk := chk
129131
chunkDedupMap[hash] = &chk
132+
chunckCountMap[hash] = 1
130133
break
134+
} else {
135+
chunckCountMap[hash]++
131136
}
132137
}
133138
}
@@ -139,8 +144,24 @@ func chainSeriesAndRemIdenticalChunks(series []*storepb.SeriesResponse) *storepb
139144
}
140145

141146
finalChunks := make([]storepb.AggrChunk, 0, len(chunkDedupMap))
142-
for _, chk := range chunkDedupMap {
143-
finalChunks = append(finalChunks, *chk)
147+
for hash, chk := range chunkDedupMap {
148+
if quorum {
149+
// NB: this is specific to Databricks' setup where each time series is written to at least 2 out of 3 replicas.
150+
// Each chunk should have 3 replicas in most cases, and 2 replicas in the worst acceptable cases.
151+
// Quorum-based deduplication is used to pick the majority value among 3 replicas.
152+
// If a chunck has only 2 identical replicas, there might be another chunk with corrupt data.
153+
// We want to send those two identical replicas to the later quorum-based deduplication process to dominate any corrupt third replica.
154+
if chunckCountMap[hash] >= 3 {
155+
// Most of cases should hit this branch.
156+
finalChunks = append(finalChunks, *chk)
157+
} else {
158+
for i := 0; i < chunckCountMap[hash]; i++ {
159+
finalChunks = append(finalChunks, *chk)
160+
}
161+
}
162+
} else {
163+
finalChunks = append(finalChunks, *chk)
164+
}
144165
}
145166

146167
sort.Slice(finalChunks, func(i, j int) bool {

pkg/store/proxy_test.go

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2295,3 +2295,212 @@ func TestDedupRespHeap_Deduplication(t *testing.T) {
22952295
}
22962296

22972297
}
2298+
2299+
func TestDedupRespHeap_QuorumChunkDedup(t *testing.T) {
2300+
t.Parallel()
2301+
2302+
for _, tcase := range []struct {
2303+
responses []*storepb.SeriesResponse
2304+
testFn func(responses []*storepb.SeriesResponse, h *responseDeduplicator)
2305+
tname string
2306+
}{
2307+
{
2308+
tname: "edge case with only one response",
2309+
responses: []*storepb.SeriesResponse{
2310+
{
2311+
Result: &storepb.SeriesResponse_Series{
2312+
Series: &storepb.Series{
2313+
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
2314+
Chunks: []storepb.AggrChunk{
2315+
{
2316+
Raw: &storepb.Chunk{
2317+
Type: storepb.Chunk_XOR,
2318+
Data: []byte(`abcdefgh`),
2319+
},
2320+
},
2321+
},
2322+
},
2323+
},
2324+
},
2325+
},
2326+
testFn: func(responses []*storepb.SeriesResponse, h *responseDeduplicator) {
2327+
testutil.Equals(t, true, h.Next())
2328+
resp := h.At()
2329+
testutil.Equals(t, responses[0], resp)
2330+
testutil.Equals(t, false, h.Next())
2331+
},
2332+
},
2333+
{
2334+
tname: "keep 2 identical series",
2335+
responses: []*storepb.SeriesResponse{
2336+
{
2337+
Result: &storepb.SeriesResponse_Series{
2338+
Series: &storepb.Series{
2339+
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
2340+
Chunks: []storepb.AggrChunk{
2341+
{
2342+
Raw: &storepb.Chunk{
2343+
Type: storepb.Chunk_XOR,
2344+
Data: []byte(`abcdefgh`),
2345+
},
2346+
},
2347+
},
2348+
},
2349+
},
2350+
},
2351+
{
2352+
Result: &storepb.SeriesResponse_Series{
2353+
Series: &storepb.Series{
2354+
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
2355+
Chunks: []storepb.AggrChunk{
2356+
{
2357+
Raw: &storepb.Chunk{
2358+
Type: storepb.Chunk_XOR,
2359+
Hash: xxhash.Sum64([]byte(`abcdefgh`)),
2360+
Data: []byte(`abcdefgh`),
2361+
},
2362+
},
2363+
},
2364+
},
2365+
},
2366+
},
2367+
},
2368+
testFn: func(responses []*storepb.SeriesResponse, h *responseDeduplicator) {
2369+
testutil.Equals(t, true, h.Next())
2370+
resp := h.At()
2371+
testutil.Equals(t, 2, len(resp.GetSeries().Chunks))
2372+
testutil.Equals(t, false, h.Next())
2373+
},
2374+
},
2375+
{
2376+
tname: "keep 2 identical series + a corrupted one",
2377+
responses: []*storepb.SeriesResponse{
2378+
{
2379+
Result: &storepb.SeriesResponse_Series{
2380+
Series: &storepb.Series{
2381+
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
2382+
Chunks: []storepb.AggrChunk{
2383+
{
2384+
Raw: &storepb.Chunk{
2385+
Type: storepb.Chunk_XOR,
2386+
Data: []byte(`abcdefgh`),
2387+
},
2388+
},
2389+
},
2390+
},
2391+
},
2392+
},
2393+
{
2394+
Result: &storepb.SeriesResponse_Series{
2395+
Series: &storepb.Series{
2396+
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
2397+
Chunks: []storepb.AggrChunk{
2398+
{
2399+
Raw: &storepb.Chunk{
2400+
Type: storepb.Chunk_XOR,
2401+
Hash: xxhash.Sum64([]byte(`abcdefgh`)),
2402+
Data: []byte(`abcdefgh`),
2403+
},
2404+
},
2405+
},
2406+
},
2407+
},
2408+
},
2409+
{
2410+
Result: &storepb.SeriesResponse_Series{
2411+
Series: &storepb.Series{
2412+
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
2413+
Chunks: []storepb.AggrChunk{
2414+
{
2415+
Raw: &storepb.Chunk{
2416+
Type: storepb.Chunk_XOR,
2417+
Hash: xxhash.Sum64([]byte(`corrupted`)),
2418+
Data: []byte(`corrupted`),
2419+
},
2420+
},
2421+
},
2422+
},
2423+
},
2424+
},
2425+
},
2426+
testFn: func(responses []*storepb.SeriesResponse, h *responseDeduplicator) {
2427+
testutil.Equals(t, true, h.Next())
2428+
resp := h.At()
2429+
testutil.Equals(t, 3, len(resp.GetSeries().Chunks))
2430+
testutil.Equals(t, false, h.Next())
2431+
},
2432+
},
2433+
{
2434+
tname: "dedup 3 identical series",
2435+
responses: []*storepb.SeriesResponse{
2436+
{
2437+
Result: &storepb.SeriesResponse_Series{
2438+
Series: &storepb.Series{
2439+
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
2440+
Chunks: []storepb.AggrChunk{
2441+
{
2442+
Raw: &storepb.Chunk{
2443+
Type: storepb.Chunk_XOR,
2444+
Data: []byte(`abcdefgh`),
2445+
},
2446+
},
2447+
},
2448+
},
2449+
},
2450+
},
2451+
{
2452+
Result: &storepb.SeriesResponse_Series{
2453+
Series: &storepb.Series{
2454+
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
2455+
Chunks: []storepb.AggrChunk{
2456+
{
2457+
Raw: &storepb.Chunk{
2458+
Type: storepb.Chunk_XOR,
2459+
Hash: xxhash.Sum64([]byte(`abcdefgh`)),
2460+
Data: []byte(`abcdefgh`),
2461+
},
2462+
},
2463+
},
2464+
},
2465+
},
2466+
},
2467+
{
2468+
Result: &storepb.SeriesResponse_Series{
2469+
Series: &storepb.Series{
2470+
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
2471+
Chunks: []storepb.AggrChunk{
2472+
{
2473+
Raw: &storepb.Chunk{
2474+
Type: storepb.Chunk_XOR,
2475+
Hash: xxhash.Sum64([]byte(`abcdefgh`)),
2476+
Data: []byte(`abcdefgh`),
2477+
},
2478+
},
2479+
},
2480+
},
2481+
},
2482+
},
2483+
},
2484+
testFn: func(responses []*storepb.SeriesResponse, h *responseDeduplicator) {
2485+
testutil.Equals(t, true, h.Next())
2486+
resp := h.At()
2487+
testutil.Equals(t, responses[0], resp)
2488+
testutil.Equals(t, 1, len(resp.GetSeries().Chunks))
2489+
testutil.Equals(t, false, h.Next())
2490+
},
2491+
},
2492+
} {
2493+
t.Run(tcase.tname, func(t *testing.T) {
2494+
h := NewResponseDeduplicator(NewProxyResponseLoserTree(
2495+
&eagerRespSet{
2496+
closeSeries: func() {},
2497+
wg: &sync.WaitGroup{},
2498+
bufferedResponses: tcase.responses,
2499+
},
2500+
))
2501+
h.quorumChunkDedup = true
2502+
tcase.testFn(tcase.responses, h)
2503+
})
2504+
}
2505+
2506+
}

0 commit comments

Comments
 (0)