Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## master / unreleased

* [BUGFIX] Compactor: Avoid race condition which allow a grouper to not compact all partitions. #7082


## 1.20.0 in progress

* [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603
Expand Down
154 changes: 154 additions & 0 deletions pkg/compactor/compactor_paritioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1826,3 +1826,157 @@ func TestPartitionCompactor_ShouldNotFailCompactionIfAccessDeniedErrReturnedFrom

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
}

func TestPartitionCompactionRaceCondition(t *testing.T) {
t.Run("planner_detects_missing_partition_group", func(t *testing.T) {
setup := newRaceConditionTestSetup(12345)

// Create a planner that will try to process blocks but find missing partition group
planner := setup.createPlanner()
cortexMetaExtensions := setup.createCortexMetaExtensions(time.Now().Unix())
metasByMinTime := setup.createTestMetadata()

result, err := planner.PlanWithPartition(setup.ctx, metasByMinTime, cortexMetaExtensions, nil)

require.Error(t, err, "Planner should fail when partition group is missing")
require.Nil(t, result, "Should not return any result when partition group is missing")
require.ErrorIs(t, err, plannerCompletedPartitionError, "Error should be completed partition error when partition group is missing")
})

t.Run("planner_detects_creation_time_mismatch", func(t *testing.T) {
setup := newRaceConditionTestSetup(54321)
originalCreationTime := time.Now().Unix()

// Create initial partition group
partitionedGroupInfo := setup.createPartitionedGroupInfo(originalCreationTime)
_, err := UpdatePartitionedGroupInfo(setup.ctx, setup.bucket, setup.logger, *partitionedGroupInfo)
require.NoError(t, err)

// Simulate cleaner deleting partition group
partitionGroupFile := GetPartitionedGroupFile(setup.partitionedGroupID)
err = setup.bucket.Delete(setup.ctx, partitionGroupFile)
require.NoError(t, err)

// Create new partition group with same ID but different creation time
newCreationTime := time.Now().Unix() + 200
newPartitionedGroupInfo := setup.createPartitionedGroupInfo(newCreationTime)
_, err = UpdatePartitionedGroupInfo(setup.ctx, setup.bucket, setup.logger, *newPartitionedGroupInfo)
require.NoError(t, err)

// Test planner creation time validation
planner := setup.createPlanner()
cortexMetaExtensions := setup.createCortexMetaExtensions(originalCreationTime) // OLD creation time
metasByMinTime := setup.createTestMetadata()

result, err := planner.PlanWithPartition(setup.ctx, metasByMinTime, cortexMetaExtensions, nil)

require.Error(t, err, "Planner should detect creation time mismatch")
require.ErrorIs(t, err, plannerCompletedPartitionError, "Should abort with completed partition error")
require.Nil(t, result, "Should not return any result when aborting")
})

t.Run("normal_operation_with_matching_creation_time", func(t *testing.T) {
setup := newRaceConditionTestSetup(99999)
creationTime := time.Now().Unix()

// Create partition group
partitionedGroupInfo := setup.createPartitionedGroupInfo(creationTime)
_, err := UpdatePartitionedGroupInfo(setup.ctx, setup.bucket, setup.logger, *partitionedGroupInfo)
require.NoError(t, err)

// Create planner and test with matching creation time
planner := setup.createPlanner()
cortexMetaExtensions := setup.createCortexMetaExtensions(creationTime) // MATCHING creation time
metasByMinTime := setup.createTestMetadata()

result, err := planner.PlanWithPartition(setup.ctx, metasByMinTime, cortexMetaExtensions, nil)

require.NoError(t, err, "Should not fail when creation times match")
require.NotNil(t, result, "Should return result when creation times match")
})
}

// raceConditionTestSetup provides common setup for race condition tests
type raceConditionTestSetup struct {
ctx context.Context
logger log.Logger
bucket objstore.InstrumentedBucket
userID string
partitionedGroupID uint32
partitionID int
partitionCount int
ranges []int64
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark
}

func newRaceConditionTestSetup(partitionedGroupID uint32) *raceConditionTestSetup {
return &raceConditionTestSetup{
ctx: context.Background(),
logger: log.NewNopLogger(),
bucket: objstore.WithNoopInstr(objstore.NewInMemBucket()),
userID: "test-user",
partitionedGroupID: partitionedGroupID,
partitionID: 0,
partitionCount: 2,
ranges: []int64{2 * 60 * 60 * 1000}, // 2 hours in milliseconds
noCompBlocksFunc: func() map[ulid.ULID]*metadata.NoCompactMark { return nil },
}
}

func (s *raceConditionTestSetup) createPartitionedGroupInfo(creationTime int64) *PartitionedGroupInfo {
return &PartitionedGroupInfo{
PartitionedGroupID: s.partitionedGroupID,
PartitionCount: s.partitionCount,
Partitions: []Partition{
{PartitionID: 0, Blocks: []ulid.ULID{ulid.MustNew(ulid.Now(), nil)}},
{PartitionID: 1, Blocks: []ulid.ULID{ulid.MustNew(ulid.Now(), nil)}},
},
RangeStart: 0,
RangeEnd: 2 * 60 * 60 * 1000,
CreationTime: creationTime,
Version: PartitionedGroupInfoVersion1,
}
}

func (s *raceConditionTestSetup) createPlanner() *PartitionCompactionPlanner {
// Use the same metrics pattern as other tests
registerer := prometheus.NewPedanticRegistry()
metrics := newCompactorMetrics(registerer)

return NewPartitionCompactionPlanner(
s.ctx,
s.bucket,
s.logger,
s.ranges,
s.noCompBlocksFunc,
"test-compactor",
s.userID,
time.Second,
10*time.Minute,
time.Minute,
metrics,
)
}

func (s *raceConditionTestSetup) createCortexMetaExtensions(creationTime int64) *cortex_tsdb.CortexMetaExtensions {
return &cortex_tsdb.CortexMetaExtensions{
PartitionInfo: &cortex_tsdb.PartitionInfo{
PartitionedGroupID: s.partitionedGroupID,
PartitionCount: s.partitionCount,
PartitionID: s.partitionID,
PartitionedGroupCreationTime: creationTime,
},
}
}

func (s *raceConditionTestSetup) createTestMetadata() []*metadata.Meta {
return []*metadata.Meta{
{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(ulid.Now(), nil),
MinTime: 0,
MaxTime: 2 * 60 * 60 * 1000,
},
},
}
}
13 changes: 13 additions & 0 deletions pkg/compactor/partition_compaction_grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,19 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact
level.Info(partitionedGroupLogger).Log("msg", "skipping group because partition is visited")
continue
}

// Validate that the partition group still exists before creating a visit marker
// This prevents the race condition where the cleaner deletes the partition group
// between the visit marker check and the visit marker creation
if _, err := ReadPartitionedGroupInfo(g.ctx, g.bkt, g.logger, partitionedGroupID); err != nil {
if errors.Is(err, ErrorPartitionedGroupInfoNotFound) {
level.Info(partitionedGroupLogger).Log("msg", "skipping group because partition group was deleted by cleaner", "partitioned_group_id", partitionedGroupID)
} else {
level.Warn(partitionedGroupLogger).Log("msg", "unable to read partition group info", "err", err, "partitioned_group_id", partitionedGroupID)
}
continue
}

partitionedGroupKey := createGroupKeyWithPartitionID(groupHash, partitionID, *partitionedGroup)

level.Info(partitionedGroupLogger).Log("msg", "found compactable group for user", "group", partitionedGroup.String())
Expand Down
28 changes: 28 additions & 0 deletions pkg/compactor/partition_compaction_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,34 @@ func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasB
}
}

// Double-check that the partition group still exists and is the same one we started with
// to prevent race condition with cleaner. If the cleaner deleted the partition group
// after we created the visit marker in the grouper, we should abort the compaction
// to avoid orphaned visit markers.
currentPartitionedGroupInfo, err := ReadPartitionedGroupInfo(p.ctx, p.bkt, p.logger, partitionedGroupID)
if err != nil {
if errors.Is(err, ErrorPartitionedGroupInfoNotFound) {
level.Warn(p.logger).Log("msg", "partition group was deleted by cleaner, aborting compaction", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID)
return nil, plannerCompletedPartitionError
} else {
level.Warn(p.logger).Log("msg", "unable to read partition group info during planning", "err", err, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID)
return nil, fmt.Errorf("unable to read partition group info for partition ID %d, partitioned group ID %d: %s", partitionID, partitionedGroupID, err.Error())
}
}

// Verify that this is the same partition group that the grouper created the visit marker for
// by comparing creation times. If they don't match, it means the cleaner deleted the old
// partition group and a new one was created with the same ID.
expectedCreationTime := partitionInfo.PartitionedGroupCreationTime
if currentPartitionedGroupInfo.CreationTime != expectedCreationTime {
level.Warn(p.logger).Log("msg", "partition group creation time mismatch, cleaner deleted old group and new one was created, aborting compaction",
"partitioned_group_id", partitionedGroupID,
"partition_id", partitionID,
"expected_creation_time", expectedCreationTime,
"current_creation_time", currentPartitionedGroupInfo.CreationTime)
return nil, plannerCompletedPartitionError
}

// Ensure all blocks fits within the largest range. This is a double check
// to ensure there's no bug in the previous blocks grouping, given this Plan()
// is just a pass-through.
Expand Down
Loading