Skip to content

Commit e42b472

Browse files
authored
[ES-1393569] azure compactor crash loop due to symbol table size limits (#146)
2 parents 657c860 + 055c964 commit e42b472

File tree

5 files changed

+97
-6
lines changed

5 files changed

+97
-6
lines changed

cmd/thanos/compact.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ func runCompact(
404404
planner,
405405
comp,
406406
compact.DefaultBlockDeletableChecker{},
407-
compact.NewOverlappingCompactionLifecycleCallback(reg, conf.enableOverlappingRemoval),
407+
compact.NewOverlappingCompactionLifecycleCallback(reg, logger, conf.enableOverlappingRemoval),
408408
compactDir,
409409
insBkt,
410410
conf.compactionConcurrency,

pkg/block/fetcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch c
345345
case metaChan <- id:
346346
}
347347
return nil
348-
}); err != nil {
348+
}, objstore.WithUpdatedAt()); err != nil {
349349
return nil, err
350350
}
351351
close(metaChan)

pkg/compact/compact.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,7 @@ type CompactionLifecycleCallback interface {
842842
PreCompactionCallback(ctx context.Context, logger log.Logger, group *Group, toCompactBlocks []*metadata.Meta) error
843843
PostCompactionCallback(ctx context.Context, logger log.Logger, group *Group, blockID ulid.ULID) error
844844
GetBlockPopulator(ctx context.Context, logger log.Logger, group *Group) (tsdb.BlockPopulator, error)
845+
HandleError(ctx context.Context, logger log.Logger, group *Group, toCompactBlocks []*metadata.Meta, err error) int // how many error cases it handles
845846
}
846847

847848
type DefaultCompactionLifecycleCallback struct {
@@ -873,6 +874,10 @@ func (c DefaultCompactionLifecycleCallback) GetBlockPopulator(_ context.Context,
873874
return tsdb.DefaultBlockPopulator{}, nil
874875
}
875876

877+
func (c DefaultCompactionLifecycleCallback) HandleError(_ context.Context, _ log.Logger, _ *Group, _ []*metadata.Meta, err error) int {
878+
return 0
879+
}
880+
876881
// Compactor provides compaction against an underlying storage of time series data.
877882
// It is similar to tsdb.Compactor but only relevant methods are kept. Plan and Write are removed.
878883
// TODO(bwplotka): Split the Planner from Compactor on upstream as well, so we can import it.
@@ -1253,7 +1258,8 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
12531258
compIDs, e = comp.CompactWithBlockPopulator(dir, toCompactDirs, nil, populateBlockFunc)
12541259
return e
12551260
}); err != nil {
1256-
return false, nil, halt(errors.Wrapf(err, "compact blocks %v", toCompactDirs))
1261+
handledErrs := compactionLifecycleCallback.HandleError(ctx, cg.logger, cg, toCompact, err)
1262+
return false, nil, halt(errors.Wrapf(err, "compact blocks %v, handled %d errors", toCompactDirs, handledErrs))
12571263
}
12581264
if len(compIDs) == 0 {
12591265
// No compacted blocks means all compacted blocks are of no sample.

pkg/compact/overlapping.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package compact
66
import (
77
"context"
88
"fmt"
9+
"strings"
910

1011
"github.com/go-kit/log"
1112
"github.com/go-kit/log/level"
@@ -18,16 +19,22 @@ import (
1819
"github.com/thanos-io/thanos/pkg/block/metadata"
1920
)
2021

21-
const overlappingReason = "blocks-overlapping"
22+
const (
23+
overlappingReason = "blocks-overlapping"
24+
25+
symbolTableSizeExceedsError = "symbol table size exceeds"
26+
symbolTableSizeLimit = 128 * 1024
27+
)
2228

2329
type OverlappingCompactionLifecycleCallback struct {
2430
overlappingBlocks prometheus.Counter
2531
noCompaction prometheus.Counter
2632
noDownsampling prometheus.Counter
2733
}
2834

29-
func NewOverlappingCompactionLifecycleCallback(reg *prometheus.Registry, enabled bool) CompactionLifecycleCallback {
35+
func NewOverlappingCompactionLifecycleCallback(reg *prometheus.Registry, logger log.Logger, enabled bool) CompactionLifecycleCallback {
3036
if enabled {
37+
level.Info(logger).Log("msg", "enabled overlapping blocks compaction lifecycle callback")
3138
return OverlappingCompactionLifecycleCallback{
3239
overlappingBlocks: promauto.With(reg).NewCounter(prometheus.CounterOpts{
3340
Name: "thanos_compact_group_overlapping_blocks_total",
@@ -122,3 +129,29 @@ func (o OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context
122129
func (o OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) {
123130
return tsdb.DefaultBlockPopulator{}, nil
124131
}
132+
133+
func (o OverlappingCompactionLifecycleCallback) HandleError(ctx context.Context, logger log.Logger, g *Group, toCompact []*metadata.Meta, compactErr error) int {
134+
handledErrs := 0
135+
if compactErr == nil {
136+
return handledErrs
137+
}
138+
level.Error(logger).Log("msg", "failed to compact blocks", "err", compactErr)
139+
if strings.Contains(compactErr.Error(), symbolTableSizeExceedsError) {
140+
for _, m := range toCompact {
141+
if m.Thanos.Source != metadata.ReceiveSource {
142+
level.Debug(logger).Log("msg", "bypass blocks that are already compacted", "block", m.String())
143+
continue
144+
}
145+
if m.Stats.NumSeries < symbolTableSizeLimit {
146+
level.Warn(logger).Log("msg", "bypass small blocks", "block", m.String(), "series", m.Stats.NumSeries)
147+
continue
148+
}
149+
handledErrs++
150+
if err := block.MarkForNoCompact(ctx, logger, g.bkt, m.ULID, symbolTableSizeExceedsError,
151+
fmt.Sprintf("failed to compact blocks: %s", m.ULID.String()), o.noCompaction); err != nil {
152+
level.Error(logger).Log("msg", "failed to mark block for no compact", "block", m.String(), "err", err)
153+
}
154+
}
155+
}
156+
return handledErrs
157+
}

pkg/compact/overlapping_test.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/go-kit/log"
1414
"github.com/pkg/errors"
1515
"github.com/prometheus/client_golang/prometheus"
16+
"github.com/stretchr/testify/require"
1617
"github.com/thanos-io/objstore"
1718
"github.com/thanos-io/thanos/pkg/block/metadata"
1819
"github.com/thanos-io/thanos/pkg/compact/downsample"
@@ -21,7 +22,7 @@ import (
2122
func TestPreCompactionCallback(t *testing.T) {
2223
reg := prometheus.NewRegistry()
2324
logger := log.NewNopLogger()
24-
callback := NewOverlappingCompactionLifecycleCallback(reg, true)
25+
callback := NewOverlappingCompactionLifecycleCallback(reg, logger, true)
2526
for _, tcase := range []struct {
2627
testName string
2728
input []*metadata.Meta
@@ -145,6 +146,57 @@ func TestPreCompactionCallback(t *testing.T) {
145146
}
146147
}
147148

149+
func TestHandleError(t *testing.T) {
150+
reg := prometheus.NewRegistry()
151+
logger := log.NewNopLogger()
152+
callback := NewOverlappingCompactionLifecycleCallback(reg, logger, true)
153+
for _, tcase := range []struct {
154+
testName string
155+
input []*metadata.Meta
156+
err error
157+
handledErrs int
158+
errBlockIdx int
159+
}{
160+
{
161+
testName: "empty error",
162+
input: []*metadata.Meta{},
163+
},
164+
{
165+
testName: "non empty error but not handled",
166+
input: []*metadata.Meta{
167+
createCustomBlockMeta(1, 1, 2, metadata.ReceiveSource, 1),
168+
createCustomBlockMeta(2, 1, 6, metadata.CompactorSource, 1),
169+
},
170+
err: errors.New("some error"),
171+
handledErrs: 0,
172+
},
173+
{
174+
testName: "non empty error symbol table size exceeds",
175+
input: []*metadata.Meta{
176+
createCustomBlockMeta(1, 1, 2, metadata.ReceiveSource, 1),
177+
createCustomBlockMeta(2, 1, 6, metadata.CompactorSource, 1),
178+
createCustomBlockMeta(3, 1, 6, metadata.ReceiveSource, 1024*1024),
179+
},
180+
err: errors.New(symbolTableSizeExceedsError + " 1024*1024"),
181+
handledErrs: 1,
182+
errBlockIdx: 2,
183+
},
184+
} {
185+
t.Run(tcase.testName, func(t *testing.T) {
186+
ctx := context.Background()
187+
bkt := objstore.NewInMemBucket()
188+
group := &Group{logger: log.NewNopLogger(), bkt: bkt}
189+
require.Equal(t, tcase.handledErrs, callback.HandleError(ctx, logger, group, tcase.input, tcase.err))
190+
if tcase.handledErrs > 0 {
191+
for i := 0; i < len(tcase.input); i++ {
192+
ok, _ := bkt.Exists(ctx, path.Join(tcase.input[i].ULID.String(), metadata.NoCompactMarkFilename))
193+
require.Equal(t, i == tcase.errBlockIdx, ok)
194+
}
195+
}
196+
})
197+
}
198+
}
199+
148200
func createCustomBlockMeta(id uint64, minTime, maxTime int64, source metadata.SourceType, numSeries uint64) *metadata.Meta {
149201
labels := map[string]string{"a": "1"}
150202
m := createBlockMeta(id, minTime, maxTime, labels, downsample.ResLevel0, []uint64{})

0 commit comments

Comments
 (0)