Skip to content

Commit 1c69c7e

Browse files
authored
[ES-1314123] race condition for memorize tsdb client (#108)
2 parents 67f5336 + ea908b1 commit 1c69c7e

File tree

3 files changed

+58
-56
lines changed

3 files changed

+58
-56
lines changed

docs/sharding.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Queries against store gateway which are touching large number of blocks (no matt
1818

1919
# Relabelling
2020

21-
Similar to [promtail](https://grafana.com/docs/loki/latest/send-data/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax.
21+
Similar to [promtail](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax.
2222

2323
Currently, thanos only supports the following relabel actions:
2424

pkg/receive/multitsdb.go

Lines changed: 15 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,6 @@ type MultiTSDB struct {
6262
hashFunc metadata.HashFunc
6363
hashringConfigs []HashringConfig
6464

65-
tsdbClients []store.Client
66-
tsdbClientsNeedUpdate bool
67-
68-
exemplarClients map[string]*exemplars.TSDB
69-
exemplarClientsNeedUpdate bool
70-
7165
metricNameFilterEnabled bool
7266
}
7367

@@ -100,19 +94,17 @@ func NewMultiTSDB(
10094
}
10195

10296
mt := &MultiTSDB{
103-
dataDir: dataDir,
104-
logger: log.With(l, "component", "multi-tsdb"),
105-
reg: reg,
106-
tsdbOpts: tsdbOpts,
107-
mtx: &sync.RWMutex{},
108-
tenants: map[string]*tenant{},
109-
labels: labels,
110-
tsdbClientsNeedUpdate: true,
111-
exemplarClientsNeedUpdate: true,
112-
tenantLabelName: tenantLabelName,
113-
bucket: bucket,
114-
allowOutOfOrderUpload: allowOutOfOrderUpload,
115-
hashFunc: hashFunc,
97+
dataDir: dataDir,
98+
logger: log.With(l, "component", "multi-tsdb"),
99+
reg: reg,
100+
tsdbOpts: tsdbOpts,
101+
mtx: &sync.RWMutex{},
102+
tenants: map[string]*tenant{},
103+
labels: labels,
104+
tenantLabelName: tenantLabelName,
105+
bucket: bucket,
106+
allowOutOfOrderUpload: allowOutOfOrderUpload,
107+
hashFunc: hashFunc,
116108
}
117109

118110
for _, option := range options {
@@ -435,8 +427,6 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {
435427

436428
level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID)
437429
delete(t.tenants, tenantID)
438-
t.tsdbClientsNeedUpdate = true
439-
t.exemplarClientsNeedUpdate = true
440430
}
441431

442432
return merr.Err()
@@ -598,17 +588,7 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error {
598588

599589
func (t *MultiTSDB) TSDBLocalClients() []store.Client {
600590
t.mtx.RLock()
601-
if !t.tsdbClientsNeedUpdate {
602-
t.mtx.RUnlock()
603-
return t.tsdbClients
604-
}
605-
606-
t.mtx.RUnlock()
607-
t.mtx.Lock()
608-
defer t.mtx.Unlock()
609-
if !t.tsdbClientsNeedUpdate {
610-
return t.tsdbClients
611-
}
591+
defer t.mtx.RUnlock()
612592

613593
res := make([]store.Client, 0, len(t.tenants))
614594
for _, tenant := range t.tenants {
@@ -618,25 +598,12 @@ func (t *MultiTSDB) TSDBLocalClients() []store.Client {
618598
}
619599
}
620600

621-
t.tsdbClientsNeedUpdate = false
622-
t.tsdbClients = res
623-
624-
return t.tsdbClients
601+
return res
625602
}
626603

627604
func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB {
628605
t.mtx.RLock()
629-
if !t.exemplarClientsNeedUpdate {
630-
t.mtx.RUnlock()
631-
return t.exemplarClients
632-
}
633-
t.mtx.RUnlock()
634-
t.mtx.Lock()
635-
defer t.mtx.Unlock()
636-
637-
if !t.exemplarClientsNeedUpdate {
638-
return t.exemplarClients
639-
}
606+
defer t.mtx.RUnlock()
640607

641608
res := make(map[string]*exemplars.TSDB, len(t.tenants))
642609
for k, tenant := range t.tenants {
@@ -645,10 +612,7 @@ func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB {
645612
res[k] = e
646613
}
647614
}
648-
649-
t.exemplarClientsNeedUpdate = false
650-
t.exemplarClients = res
651-
return t.exemplarClients
615+
return res
652616
}
653617

654618
func (t *MultiTSDB) TenantStats(limit int, statsByLabelName string, tenantIDs ...string) []status.TenantStats {
@@ -725,8 +689,6 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
725689
if err != nil {
726690
t.mtx.Lock()
727691
delete(t.tenants, tenantID)
728-
t.tsdbClientsNeedUpdate = true
729-
t.exemplarClientsNeedUpdate = true
730692
t.mtx.Unlock()
731693
return err
732694
}
@@ -779,8 +741,6 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan
779741

780742
tenant = newTenant()
781743
t.tenants[tenantID] = tenant
782-
t.tsdbClientsNeedUpdate = true
783-
t.exemplarClientsNeedUpdate = true
784744
t.mtx.Unlock()
785745

786746
logger := log.With(t.logger, "tenant", tenantID)

pkg/receive/multitsdb_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package receive
55

66
import (
77
"context"
8+
"fmt"
89
"io"
910
"math"
1011
"os"
@@ -541,6 +542,47 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) {
541542
testutil.Equals(t, 1, len(m.TSDBLocalClients()))
542543
}
543544

545+
func TestMultiTSDBAddNewTenant(t *testing.T) {
546+
t.Parallel()
547+
const iterations = 10
548+
// This test detects race conditions, so we run it multiple times to increase the chance of catching the issue.
549+
for i := 0; i < iterations; i++ {
550+
t.Run(fmt.Sprintf("iteration-%d", i), func(t *testing.T) {
551+
dir := t.TempDir()
552+
m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(),
553+
&tsdb.Options{
554+
MinBlockDuration: (2 * time.Hour).Milliseconds(),
555+
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
556+
RetentionDuration: (6 * time.Hour).Milliseconds(),
557+
},
558+
labels.FromStrings("replica", "test"),
559+
"tenant_id",
560+
objstore.NewInMemBucket(),
561+
false,
562+
metadata.NoneFunc,
563+
)
564+
defer func() { testutil.Ok(t, m.Close()) }()
565+
566+
concurrency := 50
567+
var wg sync.WaitGroup
568+
for i := 0; i < concurrency; i++ {
569+
wg.Add(1)
570+
// simulate remote write with new tenant concurrently
571+
go func(i int) {
572+
defer wg.Done()
573+
testutil.Ok(t, appendSample(m, fmt.Sprintf("tenant-%d", i), time.UnixMilli(int64(10))))
574+
}(i)
575+
// simulate read request concurrently
576+
go func() {
577+
m.TSDBLocalClients()
578+
}()
579+
}
580+
wg.Wait()
581+
testutil.Equals(t, concurrency, len(m.TSDBLocalClients()))
582+
})
583+
}
584+
}
585+
544586
func TestAlignedHeadFlush(t *testing.T) {
545587
t.Parallel()
546588

0 commit comments

Comments
 (0)