Skip to content

Commit 7028bf7

Browse files
committed
[ES-1393569] add compaction errors handler
Signed-off-by: Yi Jin <[email protected]>
1 parent f90f979 commit 7028bf7

File tree

4 files changed

+58
-4
lines changed

4 files changed

+58
-4
lines changed

cmd/thanos/compact.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ func runCompact(
404404
planner,
405405
comp,
406406
compact.DefaultBlockDeletableChecker{},
407-
compact.NewOverlappingCompactionLifecycleCallback(reg, conf.enableOverlappingRemoval),
407+
compact.NewOverlappingCompactionLifecycleCallback(reg, logger, conf.enableOverlappingRemoval),
408408
compactDir,
409409
insBkt,
410410
conf.compactionConcurrency,

pkg/compact/compact.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,7 @@ type CompactionLifecycleCallback interface {
842842
PreCompactionCallback(ctx context.Context, logger log.Logger, group *Group, toCompactBlocks []*metadata.Meta) error
843843
PostCompactionCallback(ctx context.Context, logger log.Logger, group *Group, blockID ulid.ULID) error
844844
GetBlockPopulator(ctx context.Context, logger log.Logger, group *Group) (tsdb.BlockPopulator, error)
845+
HandleError(ctx context.Context, logger log.Logger, group *Group, toCompactBlocks []*metadata.Meta, err error)
845846
}
846847

847848
type DefaultCompactionLifecycleCallback struct {
@@ -873,6 +874,9 @@ func (c DefaultCompactionLifecycleCallback) GetBlockPopulator(_ context.Context,
873874
return tsdb.DefaultBlockPopulator{}, nil
874875
}
875876

877+
func (c DefaultCompactionLifecycleCallback) HandleError(_ context.Context, _ log.Logger, _ *Group, _ []*metadata.Meta, err error) {
878+
}
879+
876880
// Compactor provides compaction against an underlying storage of time series data.
877881
// It is similar to tsdb.Compactor but only relevant methods are kept. Plan and Write are removed.
878882
// TODO(bwplotka): Split the Planner from Compactor on upstream as well, so we can import it.
@@ -1253,6 +1257,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
12531257
compIDs, e = comp.CompactWithBlockPopulator(dir, toCompactDirs, nil, populateBlockFunc)
12541258
return e
12551259
}); err != nil {
1260+
compactionLifecycleCallback.HandleError(ctx, cg.logger, cg, toCompact, err)
12561261
return false, nil, halt(errors.Wrapf(err, "compact blocks %v", toCompactDirs))
12571262
}
12581263
if len(compIDs) == 0 {

pkg/compact/overlapping.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package compact
66
import (
77
"context"
88
"fmt"
9+
"strings"
910

1011
"github.com/go-kit/log"
1112
"github.com/go-kit/log/level"
@@ -18,16 +19,22 @@ import (
1819
"github.com/thanos-io/thanos/pkg/block/metadata"
1920
)
2021

21-
const overlappingReason = "blocks-overlapping"
22+
const (
23+
overlappingReason = "blocks-overlapping"
24+
25+
symbolTableSizeExceedsError = "symbol table size exceeds"
26+
symbolTableSizeLimit = 1024 * 1024
27+
)
2228

2329
type OverlappingCompactionLifecycleCallback struct {
2430
overlappingBlocks prometheus.Counter
2531
noCompaction prometheus.Counter
2632
noDownsampling prometheus.Counter
2733
}
2834

29-
func NewOverlappingCompactionLifecycleCallback(reg *prometheus.Registry, enabled bool) CompactionLifecycleCallback {
35+
func NewOverlappingCompactionLifecycleCallback(reg *prometheus.Registry, logger log.Logger, enabled bool) CompactionLifecycleCallback {
3036
if enabled {
37+
level.Info(logger).Log("msg", "enabled overlapping blocks compaction lifecycle callback")
3138
return OverlappingCompactionLifecycleCallback{
3239
overlappingBlocks: promauto.With(reg).NewCounter(prometheus.CounterOpts{
3340
Name: "thanos_compact_group_overlapping_blocks_total",
@@ -122,3 +129,23 @@ func (o OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context
122129
func (o OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) {
123130
return tsdb.DefaultBlockPopulator{}, nil
124131
}
132+
133+
func (o OverlappingCompactionLifecycleCallback) HandleError(ctx context.Context, logger log.Logger, g *Group, toCompact []*metadata.Meta, compactErr error) {
134+
level.Error(logger).Log("msg", "failed to compact blocks", "err", compactErr)
135+
if strings.Contains(compactErr.Error(), symbolTableSizeExceedsError) {
136+
for _, m := range toCompact {
137+
if m.Thanos.Source != metadata.ReceiveSource {
138+
level.Debug(logger).Log("msg", "bypass blocks that are already compacted", "block", m.String())
139+
continue
140+
}
141+
if m.Stats.NumSeries < symbolTableSizeLimit {
142+
level.Warn(logger).Log("msg", "bypass small blocks", "block", m.String(), "series", m.Stats.NumSeries)
143+
continue
144+
}
145+
if err := block.MarkForNoCompact(ctx, logger, g.bkt, m.ULID, symbolTableSizeExceedsError,
146+
fmt.Sprintf("failed to compact blocks: %s", m.ULID.String()), o.noCompaction); err != nil {
147+
level.Error(logger).Log("msg", "failed to mark block for no compact", "block", m.String(), "err", err)
148+
}
149+
}
150+
}
151+
}

pkg/compact/overlapping_test.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
func TestPreCompactionCallback(t *testing.T) {
2222
reg := prometheus.NewRegistry()
2323
logger := log.NewNopLogger()
24-
callback := NewOverlappingCompactionLifecycleCallback(reg, true)
24+
callback := NewOverlappingCompactionLifecycleCallback(reg, logger, true)
2525
for _, tcase := range []struct {
2626
testName string
2727
input []*metadata.Meta
@@ -145,6 +145,28 @@ func TestPreCompactionCallback(t *testing.T) {
145145
}
146146
}
147147

148+
func TestHandleError(t *testing.T) {
149+
reg := prometheus.NewRegistry()
150+
logger := log.NewNopLogger()
151+
callback := NewOverlappingCompactionLifecycleCallback(reg, logger, true)
152+
for _, tcase := range []struct {
153+
testName string
154+
input []*metadata.Meta
155+
err error
156+
}{
157+
{
158+
testName: "empty error",
159+
},
160+
} {
161+
t.Run(tcase.testName, func(t *testing.T) {
162+
ctx := context.Background()
163+
bkt := objstore.NewInMemBucket()
164+
group := &Group{logger: log.NewNopLogger(), bkt: bkt}
165+
callback.HandleError(ctx, logger, group, tcase.input, tcase.err)
166+
})
167+
}
168+
}
169+
148170
func createCustomBlockMeta(id uint64, minTime, maxTime int64, source metadata.SourceType, numSeries uint64) *metadata.Meta {
149171
labels := map[string]string{"a": "1"}
150172
m := createBlockMeta(id, minTime, maxTime, labels, downsample.ResLevel0, []uint64{})

0 commit comments

Comments
 (0)