Skip to content

Commit fc25ae6

Browse files
committed
Fix visit marker race condition
1 parent fc6da2d commit fc25ae6

File tree

3 files changed

+195
-0
lines changed

3 files changed

+195
-0
lines changed

pkg/compactor/compactor_paritioning_test.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1826,3 +1826,157 @@ func TestPartitionCompactor_ShouldNotFailCompactionIfAccessDeniedErrReturnedFrom
18261826

18271827
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
18281828
}
1829+
1830+
func TestPartitionCompactionRaceCondition(t *testing.T) {
1831+
t.Run("planner_detects_missing_partition_group", func(t *testing.T) {
1832+
setup := newRaceConditionTestSetup(12345)
1833+
1834+
// Create a planner that will try to process blocks but find missing partition group
1835+
planner := setup.createPlanner()
1836+
cortexMetaExtensions := setup.createCortexMetaExtensions(time.Now().Unix())
1837+
metasByMinTime := setup.createTestMetadata()
1838+
1839+
result, err := planner.PlanWithPartition(setup.ctx, metasByMinTime, cortexMetaExtensions, nil)
1840+
1841+
require.Error(t, err, "Planner should fail when partition group is missing")
1842+
require.Nil(t, result, "Should not return any result when partition group is missing")
1843+
require.ErrorIs(t, err, plannerCompletedPartitionError, "Error should be completed partition error when partition group is missing")
1844+
})
1845+
1846+
t.Run("planner_detects_creation_time_mismatch", func(t *testing.T) {
1847+
setup := newRaceConditionTestSetup(54321)
1848+
originalCreationTime := time.Now().Unix()
1849+
1850+
// Create initial partition group
1851+
partitionedGroupInfo := setup.createPartitionedGroupInfo(originalCreationTime)
1852+
_, err := UpdatePartitionedGroupInfo(setup.ctx, setup.bucket, setup.logger, *partitionedGroupInfo)
1853+
require.NoError(t, err)
1854+
1855+
// Simulate cleaner deleting partition group
1856+
partitionGroupFile := GetPartitionedGroupFile(setup.partitionedGroupID)
1857+
err = setup.bucket.Delete(setup.ctx, partitionGroupFile)
1858+
require.NoError(t, err)
1859+
1860+
// Create new partition group with same ID but different creation time
1861+
newCreationTime := time.Now().Unix() + 200
1862+
newPartitionedGroupInfo := setup.createPartitionedGroupInfo(newCreationTime)
1863+
_, err = UpdatePartitionedGroupInfo(setup.ctx, setup.bucket, setup.logger, *newPartitionedGroupInfo)
1864+
require.NoError(t, err)
1865+
1866+
// Test planner creation time validation
1867+
planner := setup.createPlanner()
1868+
cortexMetaExtensions := setup.createCortexMetaExtensions(originalCreationTime) // OLD creation time
1869+
metasByMinTime := setup.createTestMetadata()
1870+
1871+
result, err := planner.PlanWithPartition(setup.ctx, metasByMinTime, cortexMetaExtensions, nil)
1872+
1873+
require.Error(t, err, "Planner should detect creation time mismatch")
1874+
require.ErrorIs(t, err, plannerCompletedPartitionError, "Should abort with completed partition error")
1875+
require.Nil(t, result, "Should not return any result when aborting")
1876+
})
1877+
1878+
t.Run("normal_operation_with_matching_creation_time", func(t *testing.T) {
1879+
setup := newRaceConditionTestSetup(99999)
1880+
creationTime := time.Now().Unix()
1881+
1882+
// Create partition group
1883+
partitionedGroupInfo := setup.createPartitionedGroupInfo(creationTime)
1884+
_, err := UpdatePartitionedGroupInfo(setup.ctx, setup.bucket, setup.logger, *partitionedGroupInfo)
1885+
require.NoError(t, err)
1886+
1887+
// Create planner and test with matching creation time
1888+
planner := setup.createPlanner()
1889+
cortexMetaExtensions := setup.createCortexMetaExtensions(creationTime) // MATCHING creation time
1890+
metasByMinTime := setup.createTestMetadata()
1891+
1892+
result, err := planner.PlanWithPartition(setup.ctx, metasByMinTime, cortexMetaExtensions, nil)
1893+
1894+
require.NoError(t, err, "Should not fail when creation times match")
1895+
require.NotNil(t, result, "Should return result when creation times match")
1896+
})
1897+
}
1898+
1899+
// raceConditionTestSetup provides common setup for race condition tests
1900+
type raceConditionTestSetup struct {
1901+
ctx context.Context
1902+
logger log.Logger
1903+
bucket objstore.InstrumentedBucket
1904+
userID string
1905+
partitionedGroupID uint32
1906+
partitionID int
1907+
partitionCount int
1908+
ranges []int64
1909+
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark
1910+
}
1911+
1912+
func newRaceConditionTestSetup(partitionedGroupID uint32) *raceConditionTestSetup {
1913+
return &raceConditionTestSetup{
1914+
ctx: context.Background(),
1915+
logger: log.NewNopLogger(),
1916+
bucket: objstore.WithNoopInstr(objstore.NewInMemBucket()),
1917+
userID: "test-user",
1918+
partitionedGroupID: partitionedGroupID,
1919+
partitionID: 0,
1920+
partitionCount: 2,
1921+
ranges: []int64{2 * 60 * 60 * 1000}, // 2 hours in milliseconds
1922+
noCompBlocksFunc: func() map[ulid.ULID]*metadata.NoCompactMark { return nil },
1923+
}
1924+
}
1925+
1926+
func (s *raceConditionTestSetup) createPartitionedGroupInfo(creationTime int64) *PartitionedGroupInfo {
1927+
return &PartitionedGroupInfo{
1928+
PartitionedGroupID: s.partitionedGroupID,
1929+
PartitionCount: s.partitionCount,
1930+
Partitions: []Partition{
1931+
{PartitionID: 0, Blocks: []ulid.ULID{ulid.MustNew(ulid.Now(), nil)}},
1932+
{PartitionID: 1, Blocks: []ulid.ULID{ulid.MustNew(ulid.Now(), nil)}},
1933+
},
1934+
RangeStart: 0,
1935+
RangeEnd: 2 * 60 * 60 * 1000,
1936+
CreationTime: creationTime,
1937+
Version: PartitionedGroupInfoVersion1,
1938+
}
1939+
}
1940+
1941+
func (s *raceConditionTestSetup) createPlanner() *PartitionCompactionPlanner {
1942+
// Use the same metrics pattern as other tests
1943+
registerer := prometheus.NewPedanticRegistry()
1944+
metrics := newCompactorMetrics(registerer)
1945+
1946+
return NewPartitionCompactionPlanner(
1947+
s.ctx,
1948+
s.bucket,
1949+
s.logger,
1950+
s.ranges,
1951+
s.noCompBlocksFunc,
1952+
"test-compactor",
1953+
s.userID,
1954+
time.Second,
1955+
10*time.Minute,
1956+
time.Minute,
1957+
metrics,
1958+
)
1959+
}
1960+
1961+
func (s *raceConditionTestSetup) createCortexMetaExtensions(creationTime int64) *cortex_tsdb.CortexMetaExtensions {
1962+
return &cortex_tsdb.CortexMetaExtensions{
1963+
PartitionInfo: &cortex_tsdb.PartitionInfo{
1964+
PartitionedGroupID: s.partitionedGroupID,
1965+
PartitionCount: s.partitionCount,
1966+
PartitionID: s.partitionID,
1967+
PartitionedGroupCreationTime: creationTime,
1968+
},
1969+
}
1970+
}
1971+
1972+
func (s *raceConditionTestSetup) createTestMetadata() []*metadata.Meta {
1973+
return []*metadata.Meta{
1974+
{
1975+
BlockMeta: tsdb.BlockMeta{
1976+
ULID: ulid.MustNew(ulid.Now(), nil),
1977+
MinTime: 0,
1978+
MaxTime: 2 * 60 * 60 * 1000,
1979+
},
1980+
},
1981+
}
1982+
}

pkg/compactor/partition_compaction_grouper.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,19 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact
639639
level.Info(partitionedGroupLogger).Log("msg", "skipping group because partition is visited")
640640
continue
641641
}
642+
643+
// Validate that the partition group still exists before creating a visit marker
644+
// This prevents the race condition where the cleaner deletes the partition group
645+
// between the visit marker check and the visit marker creation
646+
if _, err := ReadPartitionedGroupInfo(g.ctx, g.bkt, g.logger, partitionedGroupID); err != nil {
647+
if errors.Is(err, ErrorPartitionedGroupInfoNotFound) {
648+
level.Info(partitionedGroupLogger).Log("msg", "skipping group because partition group was deleted by cleaner", "partitioned_group_id", partitionedGroupID)
649+
} else {
650+
level.Warn(partitionedGroupLogger).Log("msg", "unable to read partition group info", "err", err, "partitioned_group_id", partitionedGroupID)
651+
}
652+
continue
653+
}
654+
642655
partitionedGroupKey := createGroupKeyWithPartitionID(groupHash, partitionID, *partitionedGroup)
643656

644657
level.Info(partitionedGroupLogger).Log("msg", "found compactable group for user", "group", partitionedGroup.String())

pkg/compactor/partition_compaction_planner.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,34 @@ func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasB
109109
}
110110
}
111111

112+
// Double-check that the partition group still exists and is the same one we started with
113+
// to prevent race condition with cleaner. If the cleaner deleted the partition group
114+
// after we created the visit marker in the grouper, we should abort the compaction
115+
// to avoid orphaned visit markers.
116+
currentPartitionedGroupInfo, err := ReadPartitionedGroupInfo(p.ctx, p.bkt, p.logger, partitionedGroupID)
117+
if err != nil {
118+
if errors.Is(err, ErrorPartitionedGroupInfoNotFound) {
119+
level.Warn(p.logger).Log("msg", "partition group was deleted by cleaner, aborting compaction", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID)
120+
return nil, plannerCompletedPartitionError
121+
} else {
122+
level.Warn(p.logger).Log("msg", "unable to read partition group info during planning", "err", err, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID)
123+
return nil, fmt.Errorf("unable to read partition group info for partition ID %d, partitioned group ID %d: %s", partitionID, partitionedGroupID, err.Error())
124+
}
125+
}
126+
127+
// Verify that this is the same partition group that the grouper created the visit marker for
128+
// by comparing creation times. If they don't match, it means the cleaner deleted the old
129+
// partition group and a new one was created with the same ID.
130+
expectedCreationTime := partitionInfo.PartitionedGroupCreationTime
131+
if currentPartitionedGroupInfo.CreationTime != expectedCreationTime {
132+
level.Warn(p.logger).Log("msg", "partition group creation time mismatch, cleaner deleted old group and new one was created, aborting compaction",
133+
"partitioned_group_id", partitionedGroupID,
134+
"partition_id", partitionID,
135+
"expected_creation_time", expectedCreationTime,
136+
"current_creation_time", currentPartitionedGroupInfo.CreationTime)
137+
return nil, plannerCompletedPartitionError
138+
}
139+
112140
// Ensure all blocks fits within the largest range. This is a double check
113141
// to ensure there's no bug in the previous blocks grouping, given this Plan()
114142
// is just a pass-through.

0 commit comments

Comments
 (0)