Skip to content

Commit cdd22dc

Browse files
authored
fix tenant glob match bug (#128)
2 parents 034daed + 5e13b42 commit cdd22dc

File tree

7 files changed

+121
-50
lines changed

7 files changed

+121
-50
lines changed

.github/workflows/go.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ jobs:
147147
matrix:
148148
parallelism: [8]
149149
index: [0, 1, 2, 3, 4, 5, 6, 7]
150-
runs-on: ubuntu-latest
150+
runs-on: ubuntu-24.04
151151
name: Thanos end-to-end tests
152152
env:
153153
GOBIN: /tmp/.bin

pkg/extprom/http/instrument_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type ClientMetrics struct {
2323

2424
// NewClientMetrics creates a new instance of ClientMetrics.
2525
// It will also register the metrics with the included register.
26-
// This ClientMetrics should be re-used for diff clients with the same purpose.
26+
// This ClientMetrics should be reused for diff clients with the same purpose.
2727
// e.g. 1 ClientMetrics should be used for all the clients that talk to Alertmanager.
2828
func NewClientMetrics(reg prometheus.Registerer) *ClientMetrics {
2929
var m ClientMetrics

pkg/query/endpointset.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ func (e *EndpointSet) Update(ctx context.Context) {
460460
if er.HasStoreAPI() && (er.ComponentType() == component.Sidecar || er.ComponentType() == component.Rule) &&
461461
stats[component.Sidecar.String()][extLset]+stats[component.Rule.String()][extLset] > 0 {
462462

463-
level.Warn(e.logger).Log("msg", "found duplicate storeEndpoints producer (sidecar or ruler). This is not advices as it will malform data in in the same bucket",
463+
level.Warn(e.logger).Log("msg", "found duplicate storeEndpoints producer (sidecar or ruler). This is not advice as it will malform data in in the same bucket",
464464
"address", addr, "extLset", extLset, "duplicates", fmt.Sprintf("%v", stats[component.Sidecar.String()][extLset]+stats[component.Rule.String()][extLset]+1))
465465
}
466466
stats[er.ComponentType().String()][extLset]++

pkg/receive/hashring.go

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -244,13 +244,42 @@ func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (En
244244
return c.endpoints[endpointIndex], nil
245245
}
246246

247+
type tenantSet map[string]tenantMatcher
248+
249+
func (t tenantSet) match(tenant string) (bool, error) {
250+
// Fast path for the common case of direct match.
251+
if mt, ok := t[tenant]; ok && isExactMatcher(mt) {
252+
return true, nil
253+
} else {
254+
for tenantPattern, matcherType := range t {
255+
switch matcherType {
256+
case TenantMatcherGlob:
257+
matches, err := filepath.Match(tenantPattern, tenant)
258+
if err != nil {
259+
return false, fmt.Errorf("error matching tenant pattern %s (tenant %s): %w", tenantPattern, tenant, err)
260+
}
261+
if matches {
262+
return true, nil
263+
}
264+
case TenantMatcherTypeExact:
265+
// Already checked above, skipping.
266+
fallthrough
267+
default:
268+
continue
269+
}
270+
271+
}
272+
}
273+
return false, nil
274+
}
275+
247276
// multiHashring represents a set of hashrings.
248277
// Which hashring to use for a tenant is determined
249278
// by the tenants field of the hashring configuration.
250279
type multiHashring struct {
251280
cache map[string]Hashring
252281
hashrings []Hashring
253-
tenantSets []map[string]tenantMatcher
282+
tenantSets []tenantSet
254283

255284
// We need a mutex to guard concurrent access
256285
// to the cache map, as this is both written to
@@ -283,28 +312,10 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (En
283312
if t == nil {
284313
found = true
285314
} else {
286-
// Fast path for the common case of direct match.
287-
if mt, ok := t[tenant]; ok && isExactMatcher(mt) {
288-
found = true
289-
} else {
290-
for tenantPattern, matcherType := range t {
291-
switch matcherType {
292-
case TenantMatcherGlob:
293-
matches, err := filepath.Match(tenantPattern, tenant)
294-
if err != nil {
295-
return Endpoint{}, fmt.Errorf("error matching tenant pattern %s (tenant %s): %w", tenantPattern, tenant, err)
296-
}
297-
found = matches
298-
case TenantMatcherTypeExact:
299-
// Already checked above, skipping.
300-
fallthrough
301-
default:
302-
continue
303-
}
304-
305-
}
315+
var err error
316+
if found, err = t.match(tenant); err != nil {
317+
return Endpoint{}, err
306318
}
307-
308319
}
309320
if found {
310321
m.mu.Lock()

pkg/receive/hashring_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,63 @@ func TestHashringGet(t *testing.T) {
140140
"node6": {},
141141
},
142142
},
143+
{
144+
name: "glob hashring match",
145+
cfg: []HashringConfig{
146+
{
147+
Endpoints: []Endpoint{{Address: "node1"}, {Address: "node2"}, {Address: "node3"}},
148+
Tenants: []string{"prefix*"},
149+
TenantMatcherType: TenantMatcherGlob,
150+
},
151+
{
152+
Endpoints: []Endpoint{{Address: "node4"}, {Address: "node5"}, {Address: "node6"}},
153+
},
154+
},
155+
nodes: map[string]struct{}{
156+
"node1": {},
157+
"node2": {},
158+
"node3": {},
159+
},
160+
tenant: "prefix-1",
161+
},
162+
{
163+
name: "glob hashring not match",
164+
cfg: []HashringConfig{
165+
{
166+
Endpoints: []Endpoint{{Address: "node1"}, {Address: "node2"}, {Address: "node3"}},
167+
Tenants: []string{"prefix*"},
168+
TenantMatcherType: TenantMatcherGlob,
169+
},
170+
{
171+
Endpoints: []Endpoint{{Address: "node4"}, {Address: "node5"}, {Address: "node6"}},
172+
},
173+
},
174+
nodes: map[string]struct{}{
175+
"node4": {},
176+
"node5": {},
177+
"node6": {},
178+
},
179+
tenant: "suffix-1",
180+
},
181+
{
182+
name: "glob hashring multiple matches",
183+
cfg: []HashringConfig{
184+
{
185+
Endpoints: []Endpoint{{Address: "node1"}, {Address: "node2"}, {Address: "node3"}},
186+
Tenants: []string{"t1-*", "t2", "t3-*"},
187+
TenantMatcherType: TenantMatcherGlob,
188+
},
189+
{
190+
Endpoints: []Endpoint{{Address: "node4"}, {Address: "node5"}, {Address: "node6"}},
191+
},
192+
},
193+
nodes: map[string]struct{}{
194+
"node1": {},
195+
"node2": {},
196+
"node3": {},
197+
},
198+
tenant: "t2",
199+
},
143200
} {
144201
hs, err := NewMultiHashring(AlgorithmHashmod, 3, tc.cfg)
145202
require.NoError(t, err)

pkg/runutil/runutil.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
// The rununtil.Exhaust* family of functions provide the same functionality but
4646
// they take an io.ReadCloser and they exhaust the whole reader before closing
4747
// them. They are useful when trying to use http keep-alive connections because
48-
// for the same connection to be re-used the whole response body needs to be
48+
// for the same connection to be reused the whole response body needs to be
4949
// exhausted.
5050
package runutil
5151

test/e2e/query_test.go

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1248,46 +1248,49 @@ func TestSidecarQueryEvaluation(t *testing.T) {
12481248
}
12491249
}
12501250

1251-
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
1252-
// struct{}, since vars of this type must have distinct addresses.
1253-
type emptyCtx int
1251+
var chromedpAllocator context.Context
12541252

1255-
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
1256-
return
1257-
}
1258-
1259-
func (*emptyCtx) Done() <-chan struct{} {
1260-
return nil
1261-
}
1262-
1263-
func (*emptyCtx) Err() error {
1264-
return nil
1265-
}
1266-
1267-
func (*emptyCtx) Value(key any) any {
1268-
return nil
1269-
}
1270-
1271-
func (e *emptyCtx) String() string {
1272-
return "Context"
1253+
func TestMain(m *testing.M) {
1254+
execAlloc, execCancel := chromedp.NewExecAllocator(
1255+
context.Background(),
1256+
)
1257+
chromedpAllocator = execAlloc
1258+
rc := m.Run()
1259+
execCancel()
1260+
os.Exit(rc)
12731261
}
12741262

12751263
func checkNetworkRequests(t *testing.T, addr string) {
1276-
ctx, cancel := chromedp.NewContext(new(emptyCtx))
1264+
opts := append(chromedp.DefaultExecAllocatorOptions[:],
1265+
chromedp.Flag("headless", true),
1266+
chromedp.Flag("no-sandbox", true),
1267+
chromedp.Flag("disable-gpu", true),
1268+
)
1269+
allocCtx, cancel := chromedp.NewExecAllocator(chromedpAllocator, opts...)
1270+
defer cancel()
1271+
1272+
ctx, cancel := chromedp.NewContext(allocCtx)
12771273
t.Cleanup(cancel)
12781274

1275+
// make sure browser is already started
1276+
err := chromedp.Run(ctx)
1277+
testutil.Ok(t, err)
1278+
12791279
testutil.Ok(t, runutil.Retry(1*time.Minute, ctx.Done(), func() error {
12801280
var networkErrors []string
12811281

1282+
var newCtx context.Context
1283+
newCtx, newCancel := chromedp.NewContext(ctx)
1284+
t.Cleanup(newCancel)
12821285
// Listen for failed network requests and push them to an array.
1283-
chromedp.ListenTarget(ctx, func(ev interface{}) {
1286+
chromedp.ListenTarget(newCtx, func(ev interface{}) {
12841287
switch ev := ev.(type) {
12851288
case *network.EventLoadingFailed:
12861289
networkErrors = append(networkErrors, ev.ErrorText)
12871290
}
12881291
})
12891292

1290-
err := chromedp.Run(ctx,
1293+
err := chromedp.Run(newCtx,
12911294
network.Enable(),
12921295
chromedp.Navigate(addr),
12931296
chromedp.WaitVisible(`body`),

0 commit comments

Comments
 (0)