Skip to content

Commit 65cdde2

Browse files
authored
[ES-1393569] fix concurrency apply retention and deletion (#148)
2 parents e42b472 + 5a03afd commit 65cdde2

File tree

7 files changed

+10
-9
lines changed

7 files changed

+10
-9
lines changed

pkg/compact/blocks_cleaner.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package compact
55

66
import (
77
"context"
8+
"runtime"
89
"sync"
910
"time"
1011

@@ -46,7 +47,7 @@ func (s *BlocksCleaner) DeleteMarkedBlocks(ctx context.Context) error {
4647

4748
deletionMarkMap := s.ignoreDeletionMarkFilter.DeletionMarkBlocks()
4849
wg := &sync.WaitGroup{}
49-
sem := make(chan struct{}, ParallelLimit)
50+
sem := make(chan struct{}, runtime.NumCPU())
5051
for _, deletionMark := range deletionMarkMap {
5152
if time.Since(time.Unix(deletionMark.DeletionTime, 0)).Seconds() > s.deleteDelay.Seconds() {
5253
sem <- struct{}{} // acquire BEFORE spawning goroutine

pkg/compact/clean.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package compact
55

66
import (
77
"context"
8+
"runtime"
89
"sync"
910
"time"
1011

@@ -41,7 +42,7 @@ func BestEffortCleanAbortedPartialUploads(
4142
// can be assumed in this case. Keep partialUploadThresholdAge long for now.
4243
// Mitigate this by adding ModifiedTime to bkt and check that instead of ULID (block creation time).
4344
wg := &sync.WaitGroup{}
44-
sem := make(chan struct{}, ParallelLimit)
45+
sem := make(chan struct{}, runtime.NumCPU())
4546
for id := range partial {
4647
if ulid.Now()-id.Time() <= uint64(PartialUploadThresholdAge/time.Millisecond) {
4748
// Minimum delay has not expired, ignore for now.

pkg/compact/overlapping.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const (
2323
overlappingReason = "blocks-overlapping"
2424

2525
symbolTableSizeExceedsError = "symbol table size exceeds"
26-
symbolTableSizeLimit = 128 * 1024
26+
symbolTableSizeLimit = 1024 * 1024
2727
)
2828

2929
type OverlappingCompactionLifecycleCallback struct {

pkg/compact/overlapping_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,9 @@ func TestHandleError(t *testing.T) {
175175
input: []*metadata.Meta{
176176
createCustomBlockMeta(1, 1, 2, metadata.ReceiveSource, 1),
177177
createCustomBlockMeta(2, 1, 6, metadata.CompactorSource, 1),
178-
createCustomBlockMeta(3, 1, 6, metadata.ReceiveSource, 1024*1024),
178+
createCustomBlockMeta(3, 1, 6, metadata.ReceiveSource, 2*1024*1024),
179179
},
180-
err: errors.New(symbolTableSizeExceedsError + " 1024*1024"),
180+
err: errors.New(symbolTableSizeExceedsError + " 2*1024*1024"),
181181
handledErrs: 1,
182182
errBlockIdx: 2,
183183
},

pkg/compact/progress.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import (
1010
"github.com/prometheus/client_golang/prometheus/promauto"
1111
)
1212

13-
const ParallelLimit = 1024
14-
1513
type CompactorState int64
1614

1715
// Use a gauge to track the state of a compactor pod.

pkg/compact/retention.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"fmt"
99
"regexp"
10+
"runtime"
1011
"sync"
1112
"time"
1213

@@ -40,7 +41,7 @@ func ApplyRetentionPolicyByResolution(
4041
) error {
4142
level.Info(logger).Log("msg", "start optional retention")
4243
wg := &sync.WaitGroup{}
43-
sem := make(chan struct{}, ParallelLimit)
44+
sem := make(chan struct{}, runtime.NumCPU())
4445
for id, m := range metas {
4546
retentionDuration := retentionByResolution[ResolutionLevel(m.Thanos.Downsample.Resolution)]
4647
if retentionDuration.Seconds() == 0 {

pkg/receive/handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1031,6 +1031,7 @@ func (h *Handler) sendRemoteWrite(
10311031
// Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated.
10321032
Replica: realReplicationIndex,
10331033
}, endpointReplica, trackedSeries.seriesIDs, responses, func(err error) {
1034+
defer wg.Done()
10341035
if err == nil {
10351036
h.forwardRequests.WithLabelValues(labelSuccess).Inc()
10361037
if !alreadyReplicated {
@@ -1045,7 +1046,6 @@ func (h *Handler) sendRemoteWrite(
10451046
}
10461047
}
10471048
}
1048-
wg.Done()
10491049
})
10501050
}
10511051

0 commit comments

Comments
 (0)