From ec8a06e98994972e4c7390e3deef06bacb443b9c Mon Sep 17 00:00:00 2001 From: devanbenz Date: Thu, 21 Aug 2025 15:05:45 -0500 Subject: [PATCH 1/4] fix: Future truncated shards return duplicate results This PR resolves an issue where we are seeing the following - Create a database with a shard group duration of 72 hours. - Insert a data point in the future. - Run truncate-shards. - Insert the same datapoint again into the future. - Query for that time range. Two data points will return. - Inspect the shards, each datapoint is in a different shard. --- services/meta/data.go | 14 +- services/meta/data_test.go | 377 +++++++++++++++++++++++++++++++++++++ 2 files changed, 388 insertions(+), 3 deletions(-) diff --git a/services/meta/data.go b/services/meta/data.go index c18e0141e69..846f9b3a46a 100644 --- a/services/meta/data.go +++ b/services/meta/data.go @@ -1438,14 +1438,22 @@ func (a ShardGroupInfos) Less(i, j int) bool { return iEnd.Before(jEnd) } -// Contains returns true iif StartTime ≤ t < EndTime. +// Contains returns true iif StartTime ≤ t < EndTime (or TruncatedAt if truncated). func (sgi *ShardGroupInfo) Contains(t time.Time) bool { - return !t.Before(sgi.StartTime) && t.Before(sgi.EndTime) + effectiveEnd := sgi.EndTime + if sgi.Truncated() { + effectiveEnd = sgi.TruncatedAt + } + return !t.Before(sgi.StartTime) && t.Before(effectiveEnd) } // Overlaps returns whether the shard group contains data for the time range between min and max func (sgi *ShardGroupInfo) Overlaps(min, max time.Time) bool { - return !sgi.StartTime.After(max) && sgi.EndTime.After(min) + effectiveEnd := sgi.EndTime + if sgi.Truncated() { + effectiveEnd = sgi.TruncatedAt + } + return !sgi.StartTime.After(max) && effectiveEnd.After(min) } // Deleted returns whether this ShardGroup has been deleted. diff --git a/services/meta/data_test.go b/services/meta/data_test.go index 1d84ca4a307..4c6b860c551 100644 --- a/services/meta/data_test.go +++ b/services/meta/data_test.go @@ -397,6 +397,383 @@ func TestData_TruncateShardGroups(t *testing.T) { assert.Equal(t, expectTimes[i].end, groups[i].EndTime.String(), "end time %d", i) assert.Equal(t, expectTimes[i].truncated, groups[i].TruncatedAt.String(), "truncate time %d", i) } + +} + +func TestData_TruncateShardGroups_FutureOverlappingShards(t *testing.T) { + data := &meta.Data{} + + must := func(err error) { + if err != nil { + t.Fatal(err) + } + } + + must(data.CreateDatabase("db")) + rp := meta.NewRetentionPolicyInfo("rp") + rp.ShardGroupDuration = 72 * time.Hour // 72 hours as per the issue description + must(data.CreateRetentionPolicy("db", rp, true)) + + // Reproduce the exact scenario from the issue description: + // Step 1: Insert a data point in the future (this creates a shard) + futureTime := time.Unix(0, 0).Add(365 * 24 * time.Hour) // 1 year in future + must(data.CreateShardGroup("db", "rp", futureTime)) + + initialShardCount := len(data.Databases[0].RetentionPolicies[0].ShardGroups) + t.Logf("After initial shard creation: %d shards", initialShardCount) + + // Step 2: Run truncate-shards + truncateTime := time.Now().UTC() + data.TruncateShardGroups(truncateTime) + + afterTruncateCount := len(data.Databases[0].RetentionPolicies[0].ShardGroups) + t.Logf("After truncate-shards: %d shards", afterTruncateCount) + + // Step 3: Insert the same datapoint again into the future + // This should NOT create a new overlapping shard + must(data.CreateShardGroup("db", "rp", futureTime)) + + finalShardCount := len(data.Databases[0].RetentionPolicies[0].ShardGroups) + t.Logf("After re-inserting same future timestamp: %d shards", finalShardCount) + + // The bug would manifest as creating a second shard for the same time range + if finalShardCount > afterTruncateCount { + t.Errorf("BUG REPRODUCED: Additional shard was created for same future timestamp after truncation") + } + + // Step 4: Verify no overlapping shards exist + groups := data.Databases[0].RetentionPolicies[0].ShardGroups + + // Debug: Print all shard groups + t.Logf("Total shard groups after operations: %d", len(groups)) + for i, group := range groups { + effectiveEnd := group.EndTime + if group.Truncated() { + effectiveEnd = group.TruncatedAt + } + t.Logf("Shard %d: [%v - %v) effective [%v - %v) truncated=%v deleted=%v", + i, group.StartTime, group.EndTime, group.StartTime, effectiveEnd, + group.Truncated(), group.Deleted()) + } + + // Check that we don't have overlapping time coverage for the same time range + for i := 0; i < len(groups); i++ { + for j := i + 1; j < len(groups); j++ { + if groups[i].Deleted() || groups[j].Deleted() { + continue + } + + // Calculate effective ranges considering truncation + startI, endI := groups[i].StartTime, groups[i].EndTime + if groups[i].Truncated() { + endI = groups[i].TruncatedAt + } + + startJ, endJ := groups[j].StartTime, groups[j].EndTime + if groups[j].Truncated() { + endJ = groups[j].TruncatedAt + } + + // Check for overlapping coverage - ranges [a,b) and [c,d) overlap if a < d && c < b + if startI.Before(endJ) && startJ.Before(endI) { + t.Fatalf("Found overlapping shard groups after truncate-shards:\n"+ + " Shard %d: [%v - %v) effective [%v - %v)\n"+ + " Shard %d: [%v - %v) effective [%v - %v)", + i, groups[i].StartTime, groups[i].EndTime, startI, endI, + j, groups[j].StartTime, groups[j].EndTime, startJ, endJ) + } + } + } + + // Additional check: ensure the future timestamp has exactly one covering shard + coveringShards := 0 + for _, group := range groups { + if group.Deleted() { + continue + } + + effectiveEnd := group.EndTime + if group.Truncated() { + effectiveEnd = group.TruncatedAt + } + + // Check if this shard covers our future time + if !futureTime.Before(group.StartTime) && futureTime.Before(effectiveEnd) { + coveringShards++ + } + } + + if coveringShards != 1 { + t.Fatalf("Expected exactly 1 shard to cover future time %v, but found %d covering shards", + futureTime, coveringShards) + } +} + +// TestData_TruncateShardGroups_ActualOverlapBug reproduces the exact issue described: +// 1. Create database with 72h shard duration +// 2. Insert data point in future +// 3. Run truncate-shards with time that's after the start but before end of future shard +// 4. Insert same datapoint again - this should create overlapping shards +func TestData_TruncateShardGroups_ActualOverlapBug(t *testing.T) { + data := &meta.Data{} + + must := func(err error) { + if err != nil { + t.Fatal(err) + } + } + + // Setup database with 72 hour shard group duration + must(data.CreateDatabase("db")) + rp := meta.NewRetentionPolicyInfo("rp") + rp.ShardGroupDuration = 72 * time.Hour + must(data.CreateRetentionPolicy("db", rp, true)) + + // Step 1: Insert a data point in the future (creates a shard) + futureTime := time.Unix(0, 0).Add(365 * 24 * time.Hour) // 1 year in future + must(data.CreateShardGroup("db", "rp", futureTime)) + + // Get the created shard group to understand its time range + sg, err := data.ShardGroupByTimestamp("db", "rp", futureTime) + if err != nil || sg == nil { + t.Fatal("Failed to create or find future shard group") + } + + t.Logf("Created future shard: [%v - %v)", sg.StartTime, sg.EndTime) + + // Step 2: Run truncate-shards with a time that's WITHIN the future shard's range + // This should truncate the future shard + truncateTime := sg.StartTime.Add(24 * time.Hour) // 24 hours into the 72-hour shard + t.Logf("Truncating at: %v", truncateTime) + data.TruncateShardGroups(truncateTime) + + // Refresh the shard group reference after truncation + groups := data.Databases[0].RetentionPolicies[0].ShardGroups + for i := range groups { + if groups[i].ID == sg.ID { + *sg = groups[i] + break + } + } + + t.Logf("After truncation - Shard: [%v - %v), truncated=%v, truncatedAt=%v", + sg.StartTime, sg.EndTime, sg.Truncated(), sg.TruncatedAt) + + // Step 3: Try to insert data at a time that falls in the truncated portion + // This is the critical test - timestamp after truncation but within original shard range + timeInTruncatedRange := sg.StartTime.Add(48 * time.Hour) // Beyond truncation but within original range + beforeCount := len(data.Databases[0].RetentionPolicies[0].ShardGroups) + must(data.CreateShardGroup("db", "rp", timeInTruncatedRange)) + afterCount := len(data.Databases[0].RetentionPolicies[0].ShardGroups) + + t.Logf("Shard count before: %d, after creating shard for truncated range: %d", beforeCount, afterCount) + + // Print all shards for debugging + groups = data.Databases[0].RetentionPolicies[0].ShardGroups + for i, group := range groups { + effectiveEnd := group.EndTime + if group.Truncated() { + effectiveEnd = group.TruncatedAt + } + t.Logf("Shard %d: [%v - %v) effective [%v - %v) truncated=%v", + i, group.StartTime, group.EndTime, group.StartTime, effectiveEnd, group.Truncated()) + } + + // Check for the bug: ensure timestamp coverage is not duplicated + testTimestamp := timeInTruncatedRange + coveringShards := 0 + var coveringShardDetails []string + + for i, group := range groups { + if group.Deleted() { + continue + } + + effectiveEnd := group.EndTime + if group.Truncated() { + effectiveEnd = group.TruncatedAt + } + + // Check if this shard covers our test timestamp + if !testTimestamp.Before(group.StartTime) && testTimestamp.Before(effectiveEnd) { + coveringShards++ + coveringShardDetails = append(coveringShardDetails, + fmt.Sprintf("Shard %d [%v - %v)", i, group.StartTime, effectiveEnd)) + } + } + + t.Logf("Timestamp %v coverage:", testTimestamp) + for _, detail := range coveringShardDetails { + t.Logf(" %s", detail) + } + + if coveringShards > 1 { + t.Errorf("BUG REPRODUCED: Timestamp %v is covered by %d shards (should be 1)", + testTimestamp, coveringShards) + } else if coveringShards == 0 { + t.Errorf("COVERAGE GAP: Timestamp %v is not covered by any shard", testTimestamp) + } else { + t.Logf("PASS: Timestamp %v is covered by exactly 1 shard", testTimestamp) + } +} + +// TestData_ShardGroupsByTimeRange_TruncatedShardsBug demonstrates the bug where +// truncated shards are incorrectly included in time range queries +func TestData_ShardGroupsByTimeRange_TruncatedShardsBug(t *testing.T) { + data := &meta.Data{} + + must := func(err error) { + if err != nil { + t.Fatal(err) + } + } + + must(data.CreateDatabase("db")) + rp := meta.NewRetentionPolicyInfo("rp") + rp.ShardGroupDuration = 72 * time.Hour + must(data.CreateRetentionPolicy("db", rp, true)) + + // Create future shard + futureTime := time.Unix(0, 0).Add(365 * 24 * time.Hour) + must(data.CreateShardGroup("db", "rp", futureTime)) + + sg, err := data.ShardGroupByTimestamp("db", "rp", futureTime) + if err != nil || sg == nil { + t.Fatal("Failed to create future shard group") + } + + t.Logf("Created shard: [%v - %v)", sg.StartTime, sg.EndTime) + + // Truncate the shard partway through + truncateTime := sg.StartTime.Add(24 * time.Hour) + data.TruncateShardGroups(truncateTime) + + // Create a new shard for the truncated range + timeInTruncatedRange := sg.StartTime.Add(48 * time.Hour) + must(data.CreateShardGroup("db", "rp", timeInTruncatedRange)) + + // BUG: Query for time range that should only hit the new shard + // but will incorrectly include the truncated shard due to Overlaps() bug + queryStart := truncateTime.Add(12 * time.Hour) // After truncation + queryEnd := queryStart.Add(6 * time.Hour) // Well after truncation + + shards, err := data.ShardGroupsByTimeRange("db", "rp", queryStart, queryEnd) + if err != nil { + t.Fatal("Failed to get shards by time range:", err) + } + + t.Logf("Query range: [%v - %v)", queryStart, queryEnd) + t.Logf("ShardGroupsByTimeRange returned %d shards:", len(shards)) + + for i, shard := range shards { + effectiveEnd := shard.EndTime + if shard.Truncated() { + effectiveEnd = shard.TruncatedAt + } + t.Logf(" Shard %d: [%v - %v) effective [%v - %v) truncated=%v", + i, shard.StartTime, shard.EndTime, shard.StartTime, effectiveEnd, shard.Truncated()) + + // Check if this shard should actually be included + shouldBeIncluded := !queryStart.Before(shard.StartTime) && !queryEnd.Before(shard.StartTime) || + !shard.StartTime.After(queryEnd) && effectiveEnd.After(queryStart) + + actuallyIncluded := !shard.StartTime.After(queryEnd) && shard.EndTime.After(queryStart) + + if actuallyIncluded && !shouldBeIncluded { + t.Errorf("BUG DETECTED: Truncated shard incorrectly included in query results") + t.Errorf(" Shard effective range: [%v - %v)", shard.StartTime, effectiveEnd) + t.Errorf(" Query range: [%v - %v)", queryStart, queryEnd) + t.Errorf(" Shard should NOT be included because effective end %v <= query start %v", + effectiveEnd, queryStart) + } + } + + // Verify the fix: only the correct shard should be returned + if len(shards) != 1 { + t.Errorf("Expected exactly 1 shard for query range, got %d", len(shards)) + if len(shards) > 1 { + t.Errorf("Multiple shards would cause duplicate data points in query results") + } + } else { + // Verify it's the correct shard (the non-truncated one) + shard := shards[0] + if shard.Truncated() { + t.Errorf("Query returned truncated shard, should return the non-truncated shard") + } else { + t.Logf("SUCCESS: Query correctly returned only the non-truncated shard") + } + } +} + +// TestShardGroupInfo_Contains_TruncatedShards verifies Contains method respects truncation +func TestShardGroupInfo_Contains_TruncatedShards(t *testing.T) { + data := &meta.Data{} + + must := func(err error) { + if err != nil { + t.Fatal(err) + } + } + + must(data.CreateDatabase("db")) + rp := meta.NewRetentionPolicyInfo("rp") + rp.ShardGroupDuration = 72 * time.Hour + must(data.CreateRetentionPolicy("db", rp, true)) + + // Create and truncate a shard group + futureTime := time.Unix(0, 0).Add(365 * 24 * time.Hour) + must(data.CreateShardGroup("db", "rp", futureTime)) + + sg, err := data.ShardGroupByTimestamp("db", "rp", futureTime) + if err != nil || sg == nil { + t.Fatal("Failed to create future shard group") + } + + // Truncate the shard partway through + truncateTime := sg.StartTime.Add(24 * time.Hour) + data.TruncateShardGroups(truncateTime) + + // Refresh the shard group reference + groups := data.Databases[0].RetentionPolicies[0].ShardGroups + for i := range groups { + if groups[i].ID == sg.ID { + *sg = groups[i] + break + } + } + + // Test timestamps + beforeStart := sg.StartTime.Add(-1 * time.Hour) + atStart := sg.StartTime + beforeTruncation := sg.StartTime.Add(12 * time.Hour) + atTruncation := sg.TruncatedAt + afterTruncation := sg.StartTime.Add(36 * time.Hour) + beforeOriginalEnd := sg.EndTime.Add(-1 * time.Hour) + atOriginalEnd := sg.EndTime + + testCases := []struct { + name string + timestamp time.Time + expected bool + }{ + {"before start", beforeStart, false}, + {"at start", atStart, true}, + {"before truncation", beforeTruncation, true}, + {"at truncation", atTruncation, false}, // Truncation point is exclusive + {"after truncation", afterTruncation, false}, + {"before original end", beforeOriginalEnd, false}, + {"at original end", atOriginalEnd, false}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := sg.Contains(tc.timestamp) + if result != tc.expected { + t.Errorf("Contains(%v) = %v, expected %v", tc.timestamp, result, tc.expected) + t.Errorf("Shard: [%v - %v), truncated at %v", sg.StartTime, sg.EndTime, sg.TruncatedAt) + } + }) + } } func TestUserInfo_AuthorizeDatabase(t *testing.T) { From f94f12a998b79242165973cbaa9c7a89734f1440 Mon Sep 17 00:00:00 2001 From: devanbenz Date: Thu, 21 Aug 2025 17:26:23 -0500 Subject: [PATCH 2/4] feat: adjust testing --- services/meta/data_test.go | 129 +++++++++++++++++-------------------- 1 file changed, 60 insertions(+), 69 deletions(-) diff --git a/services/meta/data_test.go b/services/meta/data_test.go index 4c6b860c551..f0794f8bdc7 100644 --- a/services/meta/data_test.go +++ b/services/meta/data_test.go @@ -418,32 +418,32 @@ func TestData_TruncateShardGroups_FutureOverlappingShards(t *testing.T) { // Step 1: Insert a data point in the future (this creates a shard) futureTime := time.Unix(0, 0).Add(365 * 24 * time.Hour) // 1 year in future must(data.CreateShardGroup("db", "rp", futureTime)) - + initialShardCount := len(data.Databases[0].RetentionPolicies[0].ShardGroups) t.Logf("After initial shard creation: %d shards", initialShardCount) - + // Step 2: Run truncate-shards truncateTime := time.Now().UTC() data.TruncateShardGroups(truncateTime) - + afterTruncateCount := len(data.Databases[0].RetentionPolicies[0].ShardGroups) t.Logf("After truncate-shards: %d shards", afterTruncateCount) - + // Step 3: Insert the same datapoint again into the future // This should NOT create a new overlapping shard must(data.CreateShardGroup("db", "rp", futureTime)) - + finalShardCount := len(data.Databases[0].RetentionPolicies[0].ShardGroups) t.Logf("After re-inserting same future timestamp: %d shards", finalShardCount) - + // The bug would manifest as creating a second shard for the same time range if finalShardCount > afterTruncateCount { t.Errorf("BUG REPRODUCED: Additional shard was created for same future timestamp after truncation") } - + // Step 4: Verify no overlapping shards exist groups := data.Databases[0].RetentionPolicies[0].ShardGroups - + // Debug: Print all shard groups t.Logf("Total shard groups after operations: %d", len(groups)) for i, group := range groups { @@ -451,29 +451,29 @@ func TestData_TruncateShardGroups_FutureOverlappingShards(t *testing.T) { if group.Truncated() { effectiveEnd = group.TruncatedAt } - t.Logf("Shard %d: [%v - %v) effective [%v - %v) truncated=%v deleted=%v", - i, group.StartTime, group.EndTime, group.StartTime, effectiveEnd, + t.Logf("Shard %d: [%v - %v) effective [%v - %v) truncated=%v deleted=%v", + i, group.StartTime, group.EndTime, group.StartTime, effectiveEnd, group.Truncated(), group.Deleted()) } - + // Check that we don't have overlapping time coverage for the same time range for i := 0; i < len(groups); i++ { for j := i + 1; j < len(groups); j++ { if groups[i].Deleted() || groups[j].Deleted() { continue } - + // Calculate effective ranges considering truncation startI, endI := groups[i].StartTime, groups[i].EndTime if groups[i].Truncated() { endI = groups[i].TruncatedAt } - + startJ, endJ := groups[j].StartTime, groups[j].EndTime if groups[j].Truncated() { endJ = groups[j].TruncatedAt } - + // Check for overlapping coverage - ranges [a,b) and [c,d) overlap if a < d && c < b if startI.Before(endJ) && startJ.Before(endI) { t.Fatalf("Found overlapping shard groups after truncate-shards:\n"+ @@ -484,27 +484,27 @@ func TestData_TruncateShardGroups_FutureOverlappingShards(t *testing.T) { } } } - + // Additional check: ensure the future timestamp has exactly one covering shard coveringShards := 0 for _, group := range groups { if group.Deleted() { continue } - + effectiveEnd := group.EndTime if group.Truncated() { effectiveEnd = group.TruncatedAt } - + // Check if this shard covers our future time if !futureTime.Before(group.StartTime) && futureTime.Before(effectiveEnd) { coveringShards++ } } - + if coveringShards != 1 { - t.Fatalf("Expected exactly 1 shard to cover future time %v, but found %d covering shards", + t.Fatalf("Expected exactly 1 shard to cover future time %v, but found %d covering shards", futureTime, coveringShards) } } @@ -532,21 +532,21 @@ func TestData_TruncateShardGroups_ActualOverlapBug(t *testing.T) { // Step 1: Insert a data point in the future (creates a shard) futureTime := time.Unix(0, 0).Add(365 * 24 * time.Hour) // 1 year in future must(data.CreateShardGroup("db", "rp", futureTime)) - + // Get the created shard group to understand its time range sg, err := data.ShardGroupByTimestamp("db", "rp", futureTime) if err != nil || sg == nil { t.Fatal("Failed to create or find future shard group") } - + t.Logf("Created future shard: [%v - %v)", sg.StartTime, sg.EndTime) - + // Step 2: Run truncate-shards with a time that's WITHIN the future shard's range // This should truncate the future shard truncateTime := sg.StartTime.Add(24 * time.Hour) // 24 hours into the 72-hour shard t.Logf("Truncating at: %v", truncateTime) data.TruncateShardGroups(truncateTime) - + // Refresh the shard group reference after truncation groups := data.Databases[0].RetentionPolicies[0].ShardGroups for i := range groups { @@ -555,19 +555,19 @@ func TestData_TruncateShardGroups_ActualOverlapBug(t *testing.T) { break } } - - t.Logf("After truncation - Shard: [%v - %v), truncated=%v, truncatedAt=%v", + + t.Logf("After truncation - Shard: [%v - %v), truncated=%v, truncatedAt=%v", sg.StartTime, sg.EndTime, sg.Truncated(), sg.TruncatedAt) - + // Step 3: Try to insert data at a time that falls in the truncated portion // This is the critical test - timestamp after truncation but within original shard range timeInTruncatedRange := sg.StartTime.Add(48 * time.Hour) // Beyond truncation but within original range beforeCount := len(data.Databases[0].RetentionPolicies[0].ShardGroups) must(data.CreateShardGroup("db", "rp", timeInTruncatedRange)) afterCount := len(data.Databases[0].RetentionPolicies[0].ShardGroups) - + t.Logf("Shard count before: %d, after creating shard for truncated range: %d", beforeCount, afterCount) - + // Print all shards for debugging groups = data.Databases[0].RetentionPolicies[0].ShardGroups for i, group := range groups { @@ -575,40 +575,40 @@ func TestData_TruncateShardGroups_ActualOverlapBug(t *testing.T) { if group.Truncated() { effectiveEnd = group.TruncatedAt } - t.Logf("Shard %d: [%v - %v) effective [%v - %v) truncated=%v", + t.Logf("Shard %d: [%v - %v) effective [%v - %v) truncated=%v", i, group.StartTime, group.EndTime, group.StartTime, effectiveEnd, group.Truncated()) } - + // Check for the bug: ensure timestamp coverage is not duplicated testTimestamp := timeInTruncatedRange coveringShards := 0 var coveringShardDetails []string - + for i, group := range groups { if group.Deleted() { continue } - + effectiveEnd := group.EndTime if group.Truncated() { effectiveEnd = group.TruncatedAt } - + // Check if this shard covers our test timestamp if !testTimestamp.Before(group.StartTime) && testTimestamp.Before(effectiveEnd) { coveringShards++ - coveringShardDetails = append(coveringShardDetails, + coveringShardDetails = append(coveringShardDetails, fmt.Sprintf("Shard %d [%v - %v)", i, group.StartTime, effectiveEnd)) } } - + t.Logf("Timestamp %v coverage:", testTimestamp) for _, detail := range coveringShardDetails { t.Logf(" %s", detail) } - + if coveringShards > 1 { - t.Errorf("BUG REPRODUCED: Timestamp %v is covered by %d shards (should be 1)", + t.Errorf("BUG REPRODUCED: Timestamp %v is covered by %d shards (should be 1)", testTimestamp, coveringShards) } else if coveringShards == 0 { t.Errorf("COVERAGE GAP: Timestamp %v is not covered by any shard", testTimestamp) @@ -617,8 +617,6 @@ func TestData_TruncateShardGroups_ActualOverlapBug(t *testing.T) { } } -// TestData_ShardGroupsByTimeRange_TruncatedShardsBug demonstrates the bug where -// truncated shards are incorrectly included in time range queries func TestData_ShardGroupsByTimeRange_TruncatedShardsBug(t *testing.T) { data := &meta.Data{} @@ -633,62 +631,55 @@ func TestData_ShardGroupsByTimeRange_TruncatedShardsBug(t *testing.T) { rp.ShardGroupDuration = 72 * time.Hour must(data.CreateRetentionPolicy("db", rp, true)) - // Create future shard futureTime := time.Unix(0, 0).Add(365 * 24 * time.Hour) must(data.CreateShardGroup("db", "rp", futureTime)) - + sg, err := data.ShardGroupByTimestamp("db", "rp", futureTime) if err != nil || sg == nil { t.Fatal("Failed to create future shard group") } - + t.Logf("Created shard: [%v - %v)", sg.StartTime, sg.EndTime) - - // Truncate the shard partway through + truncateTime := sg.StartTime.Add(24 * time.Hour) data.TruncateShardGroups(truncateTime) - - // Create a new shard for the truncated range + timeInTruncatedRange := sg.StartTime.Add(48 * time.Hour) must(data.CreateShardGroup("db", "rp", timeInTruncatedRange)) - - // BUG: Query for time range that should only hit the new shard - // but will incorrectly include the truncated shard due to Overlaps() bug - queryStart := truncateTime.Add(12 * time.Hour) // After truncation - queryEnd := queryStart.Add(6 * time.Hour) // Well after truncation - + + queryStart := truncateTime.Add(12 * time.Hour) + queryEnd := queryStart.Add(6 * time.Hour) + shards, err := data.ShardGroupsByTimeRange("db", "rp", queryStart, queryEnd) if err != nil { t.Fatal("Failed to get shards by time range:", err) } - + t.Logf("Query range: [%v - %v)", queryStart, queryEnd) t.Logf("ShardGroupsByTimeRange returned %d shards:", len(shards)) - + for i, shard := range shards { effectiveEnd := shard.EndTime if shard.Truncated() { effectiveEnd = shard.TruncatedAt } - t.Logf(" Shard %d: [%v - %v) effective [%v - %v) truncated=%v", + t.Logf(" Shard %d: [%v - %v) effective [%v - %v) truncated=%v", i, shard.StartTime, shard.EndTime, shard.StartTime, effectiveEnd, shard.Truncated()) - - // Check if this shard should actually be included + shouldBeIncluded := !queryStart.Before(shard.StartTime) && !queryEnd.Before(shard.StartTime) || - !shard.StartTime.After(queryEnd) && effectiveEnd.After(queryStart) - + !shard.StartTime.After(queryEnd) && effectiveEnd.After(queryStart) + actuallyIncluded := !shard.StartTime.After(queryEnd) && shard.EndTime.After(queryStart) - + if actuallyIncluded && !shouldBeIncluded { t.Errorf("BUG DETECTED: Truncated shard incorrectly included in query results") t.Errorf(" Shard effective range: [%v - %v)", shard.StartTime, effectiveEnd) t.Errorf(" Query range: [%v - %v)", queryStart, queryEnd) - t.Errorf(" Shard should NOT be included because effective end %v <= query start %v", + t.Errorf(" Shard should NOT be included because effective end %v <= query start %v", effectiveEnd, queryStart) } } - - // Verify the fix: only the correct shard should be returned + if len(shards) != 1 { t.Errorf("Expected exactly 1 shard for query range, got %d", len(shards)) if len(shards) > 1 { @@ -723,16 +714,16 @@ func TestShardGroupInfo_Contains_TruncatedShards(t *testing.T) { // Create and truncate a shard group futureTime := time.Unix(0, 0).Add(365 * 24 * time.Hour) must(data.CreateShardGroup("db", "rp", futureTime)) - + sg, err := data.ShardGroupByTimestamp("db", "rp", futureTime) if err != nil || sg == nil { t.Fatal("Failed to create future shard group") } - + // Truncate the shard partway through truncateTime := sg.StartTime.Add(24 * time.Hour) data.TruncateShardGroups(truncateTime) - + // Refresh the shard group reference groups := data.Databases[0].RetentionPolicies[0].ShardGroups for i := range groups { @@ -741,7 +732,7 @@ func TestShardGroupInfo_Contains_TruncatedShards(t *testing.T) { break } } - + // Test timestamps beforeStart := sg.StartTime.Add(-1 * time.Hour) atStart := sg.StartTime @@ -750,7 +741,7 @@ func TestShardGroupInfo_Contains_TruncatedShards(t *testing.T) { afterTruncation := sg.StartTime.Add(36 * time.Hour) beforeOriginalEnd := sg.EndTime.Add(-1 * time.Hour) atOriginalEnd := sg.EndTime - + testCases := []struct { name string timestamp time.Time @@ -764,7 +755,7 @@ func TestShardGroupInfo_Contains_TruncatedShards(t *testing.T) { {"before original end", beforeOriginalEnd, false}, {"at original end", atOriginalEnd, false}, } - + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { result := sg.Contains(tc.timestamp) From 8261e31a3b7d5ff4733fba9ba2bc6a9da611826c Mon Sep 17 00:00:00 2001 From: devanbenz Date: Fri, 22 Aug 2025 10:06:08 -0500 Subject: [PATCH 3/4] fix: Testing some changes locally, pushing them up to save them --- services/meta/data.go | 14 +- services/meta/data_test.go | 368 ------------------------------------- 2 files changed, 6 insertions(+), 376 deletions(-) diff --git a/services/meta/data.go b/services/meta/data.go index 846f9b3a46a..5f3f09eaf33 100644 --- a/services/meta/data.go +++ b/services/meta/data.go @@ -1440,20 +1440,18 @@ func (a ShardGroupInfos) Less(i, j int) bool { // Contains returns true iif StartTime ≤ t < EndTime (or TruncatedAt if truncated). func (sgi *ShardGroupInfo) Contains(t time.Time) bool { - effectiveEnd := sgi.EndTime - if sgi.Truncated() { - effectiveEnd = sgi.TruncatedAt + if sgi.Truncated() && sgi.TruncatedAt.Equal(sgi.StartTime) { + return false } - return !t.Before(sgi.StartTime) && t.Before(effectiveEnd) + return !t.Before(sgi.StartTime) && t.Before(sgi.EndTime) } // Overlaps returns whether the shard group contains data for the time range between min and max func (sgi *ShardGroupInfo) Overlaps(min, max time.Time) bool { - effectiveEnd := sgi.EndTime - if sgi.Truncated() { - effectiveEnd = sgi.TruncatedAt + if sgi.Truncated() && sgi.TruncatedAt.Equal(sgi.StartTime) { + return false } - return !sgi.StartTime.After(max) && effectiveEnd.After(min) + return !sgi.StartTime.After(max) && sgi.EndTime.After(min) } // Deleted returns whether this ShardGroup has been deleted. diff --git a/services/meta/data_test.go b/services/meta/data_test.go index f0794f8bdc7..1d84ca4a307 100644 --- a/services/meta/data_test.go +++ b/services/meta/data_test.go @@ -397,374 +397,6 @@ func TestData_TruncateShardGroups(t *testing.T) { assert.Equal(t, expectTimes[i].end, groups[i].EndTime.String(), "end time %d", i) assert.Equal(t, expectTimes[i].truncated, groups[i].TruncatedAt.String(), "truncate time %d", i) } - -} - -func TestData_TruncateShardGroups_FutureOverlappingShards(t *testing.T) { - data := &meta.Data{} - - must := func(err error) { - if err != nil { - t.Fatal(err) - } - } - - must(data.CreateDatabase("db")) - rp := meta.NewRetentionPolicyInfo("rp") - rp.ShardGroupDuration = 72 * time.Hour // 72 hours as per the issue description - must(data.CreateRetentionPolicy("db", rp, true)) - - // Reproduce the exact scenario from the issue description: - // Step 1: Insert a data point in the future (this creates a shard) - futureTime := time.Unix(0, 0).Add(365 * 24 * time.Hour) // 1 year in future - must(data.CreateShardGroup("db", "rp", futureTime)) - - initialShardCount := len(data.Databases[0].RetentionPolicies[0].ShardGroups) - t.Logf("After initial shard creation: %d shards", initialShardCount) - - // Step 2: Run truncate-shards - truncateTime := time.Now().UTC() - data.TruncateShardGroups(truncateTime) - - afterTruncateCount := len(data.Databases[0].RetentionPolicies[0].ShardGroups) - t.Logf("After truncate-shards: %d shards", afterTruncateCount) - - // Step 3: Insert the same datapoint again into the future - // This should NOT create a new overlapping shard - must(data.CreateShardGroup("db", "rp", futureTime)) - - finalShardCount := len(data.Databases[0].RetentionPolicies[0].ShardGroups) - t.Logf("After re-inserting same future timestamp: %d shards", finalShardCount) - - // The bug would manifest as creating a second shard for the same time range - if finalShardCount > afterTruncateCount { - t.Errorf("BUG REPRODUCED: Additional shard was created for same future timestamp after truncation") - } - - // Step 4: Verify no overlapping shards exist - groups := data.Databases[0].RetentionPolicies[0].ShardGroups - - // Debug: Print all shard groups - t.Logf("Total shard groups after operations: %d", len(groups)) - for i, group := range groups { - effectiveEnd := group.EndTime - if group.Truncated() { - effectiveEnd = group.TruncatedAt - } - t.Logf("Shard %d: [%v - %v) effective [%v - %v) truncated=%v deleted=%v", - i, group.StartTime, group.EndTime, group.StartTime, effectiveEnd, - group.Truncated(), group.Deleted()) - } - - // Check that we don't have overlapping time coverage for the same time range - for i := 0; i < len(groups); i++ { - for j := i + 1; j < len(groups); j++ { - if groups[i].Deleted() || groups[j].Deleted() { - continue - } - - // Calculate effective ranges considering truncation - startI, endI := groups[i].StartTime, groups[i].EndTime - if groups[i].Truncated() { - endI = groups[i].TruncatedAt - } - - startJ, endJ := groups[j].StartTime, groups[j].EndTime - if groups[j].Truncated() { - endJ = groups[j].TruncatedAt - } - - // Check for overlapping coverage - ranges [a,b) and [c,d) overlap if a < d && c < b - if startI.Before(endJ) && startJ.Before(endI) { - t.Fatalf("Found overlapping shard groups after truncate-shards:\n"+ - " Shard %d: [%v - %v) effective [%v - %v)\n"+ - " Shard %d: [%v - %v) effective [%v - %v)", - i, groups[i].StartTime, groups[i].EndTime, startI, endI, - j, groups[j].StartTime, groups[j].EndTime, startJ, endJ) - } - } - } - - // Additional check: ensure the future timestamp has exactly one covering shard - coveringShards := 0 - for _, group := range groups { - if group.Deleted() { - continue - } - - effectiveEnd := group.EndTime - if group.Truncated() { - effectiveEnd = group.TruncatedAt - } - - // Check if this shard covers our future time - if !futureTime.Before(group.StartTime) && futureTime.Before(effectiveEnd) { - coveringShards++ - } - } - - if coveringShards != 1 { - t.Fatalf("Expected exactly 1 shard to cover future time %v, but found %d covering shards", - futureTime, coveringShards) - } -} - -// TestData_TruncateShardGroups_ActualOverlapBug reproduces the exact issue described: -// 1. Create database with 72h shard duration -// 2. Insert data point in future -// 3. Run truncate-shards with time that's after the start but before end of future shard -// 4. Insert same datapoint again - this should create overlapping shards -func TestData_TruncateShardGroups_ActualOverlapBug(t *testing.T) { - data := &meta.Data{} - - must := func(err error) { - if err != nil { - t.Fatal(err) - } - } - - // Setup database with 72 hour shard group duration - must(data.CreateDatabase("db")) - rp := meta.NewRetentionPolicyInfo("rp") - rp.ShardGroupDuration = 72 * time.Hour - must(data.CreateRetentionPolicy("db", rp, true)) - - // Step 1: Insert a data point in the future (creates a shard) - futureTime := time.Unix(0, 0).Add(365 * 24 * time.Hour) // 1 year in future - must(data.CreateShardGroup("db", "rp", futureTime)) - - // Get the created shard group to understand its time range - sg, err := data.ShardGroupByTimestamp("db", "rp", futureTime) - if err != nil || sg == nil { - t.Fatal("Failed to create or find future shard group") - } - - t.Logf("Created future shard: [%v - %v)", sg.StartTime, sg.EndTime) - - // Step 2: Run truncate-shards with a time that's WITHIN the future shard's range - // This should truncate the future shard - truncateTime := sg.StartTime.Add(24 * time.Hour) // 24 hours into the 72-hour shard - t.Logf("Truncating at: %v", truncateTime) - data.TruncateShardGroups(truncateTime) - - // Refresh the shard group reference after truncation - groups := data.Databases[0].RetentionPolicies[0].ShardGroups - for i := range groups { - if groups[i].ID == sg.ID { - *sg = groups[i] - break - } - } - - t.Logf("After truncation - Shard: [%v - %v), truncated=%v, truncatedAt=%v", - sg.StartTime, sg.EndTime, sg.Truncated(), sg.TruncatedAt) - - // Step 3: Try to insert data at a time that falls in the truncated portion - // This is the critical test - timestamp after truncation but within original shard range - timeInTruncatedRange := sg.StartTime.Add(48 * time.Hour) // Beyond truncation but within original range - beforeCount := len(data.Databases[0].RetentionPolicies[0].ShardGroups) - must(data.CreateShardGroup("db", "rp", timeInTruncatedRange)) - afterCount := len(data.Databases[0].RetentionPolicies[0].ShardGroups) - - t.Logf("Shard count before: %d, after creating shard for truncated range: %d", beforeCount, afterCount) - - // Print all shards for debugging - groups = data.Databases[0].RetentionPolicies[0].ShardGroups - for i, group := range groups { - effectiveEnd := group.EndTime - if group.Truncated() { - effectiveEnd = group.TruncatedAt - } - t.Logf("Shard %d: [%v - %v) effective [%v - %v) truncated=%v", - i, group.StartTime, group.EndTime, group.StartTime, effectiveEnd, group.Truncated()) - } - - // Check for the bug: ensure timestamp coverage is not duplicated - testTimestamp := timeInTruncatedRange - coveringShards := 0 - var coveringShardDetails []string - - for i, group := range groups { - if group.Deleted() { - continue - } - - effectiveEnd := group.EndTime - if group.Truncated() { - effectiveEnd = group.TruncatedAt - } - - // Check if this shard covers our test timestamp - if !testTimestamp.Before(group.StartTime) && testTimestamp.Before(effectiveEnd) { - coveringShards++ - coveringShardDetails = append(coveringShardDetails, - fmt.Sprintf("Shard %d [%v - %v)", i, group.StartTime, effectiveEnd)) - } - } - - t.Logf("Timestamp %v coverage:", testTimestamp) - for _, detail := range coveringShardDetails { - t.Logf(" %s", detail) - } - - if coveringShards > 1 { - t.Errorf("BUG REPRODUCED: Timestamp %v is covered by %d shards (should be 1)", - testTimestamp, coveringShards) - } else if coveringShards == 0 { - t.Errorf("COVERAGE GAP: Timestamp %v is not covered by any shard", testTimestamp) - } else { - t.Logf("PASS: Timestamp %v is covered by exactly 1 shard", testTimestamp) - } -} - -func TestData_ShardGroupsByTimeRange_TruncatedShardsBug(t *testing.T) { - data := &meta.Data{} - - must := func(err error) { - if err != nil { - t.Fatal(err) - } - } - - must(data.CreateDatabase("db")) - rp := meta.NewRetentionPolicyInfo("rp") - rp.ShardGroupDuration = 72 * time.Hour - must(data.CreateRetentionPolicy("db", rp, true)) - - futureTime := time.Unix(0, 0).Add(365 * 24 * time.Hour) - must(data.CreateShardGroup("db", "rp", futureTime)) - - sg, err := data.ShardGroupByTimestamp("db", "rp", futureTime) - if err != nil || sg == nil { - t.Fatal("Failed to create future shard group") - } - - t.Logf("Created shard: [%v - %v)", sg.StartTime, sg.EndTime) - - truncateTime := sg.StartTime.Add(24 * time.Hour) - data.TruncateShardGroups(truncateTime) - - timeInTruncatedRange := sg.StartTime.Add(48 * time.Hour) - must(data.CreateShardGroup("db", "rp", timeInTruncatedRange)) - - queryStart := truncateTime.Add(12 * time.Hour) - queryEnd := queryStart.Add(6 * time.Hour) - - shards, err := data.ShardGroupsByTimeRange("db", "rp", queryStart, queryEnd) - if err != nil { - t.Fatal("Failed to get shards by time range:", err) - } - - t.Logf("Query range: [%v - %v)", queryStart, queryEnd) - t.Logf("ShardGroupsByTimeRange returned %d shards:", len(shards)) - - for i, shard := range shards { - effectiveEnd := shard.EndTime - if shard.Truncated() { - effectiveEnd = shard.TruncatedAt - } - t.Logf(" Shard %d: [%v - %v) effective [%v - %v) truncated=%v", - i, shard.StartTime, shard.EndTime, shard.StartTime, effectiveEnd, shard.Truncated()) - - shouldBeIncluded := !queryStart.Before(shard.StartTime) && !queryEnd.Before(shard.StartTime) || - !shard.StartTime.After(queryEnd) && effectiveEnd.After(queryStart) - - actuallyIncluded := !shard.StartTime.After(queryEnd) && shard.EndTime.After(queryStart) - - if actuallyIncluded && !shouldBeIncluded { - t.Errorf("BUG DETECTED: Truncated shard incorrectly included in query results") - t.Errorf(" Shard effective range: [%v - %v)", shard.StartTime, effectiveEnd) - t.Errorf(" Query range: [%v - %v)", queryStart, queryEnd) - t.Errorf(" Shard should NOT be included because effective end %v <= query start %v", - effectiveEnd, queryStart) - } - } - - if len(shards) != 1 { - t.Errorf("Expected exactly 1 shard for query range, got %d", len(shards)) - if len(shards) > 1 { - t.Errorf("Multiple shards would cause duplicate data points in query results") - } - } else { - // Verify it's the correct shard (the non-truncated one) - shard := shards[0] - if shard.Truncated() { - t.Errorf("Query returned truncated shard, should return the non-truncated shard") - } else { - t.Logf("SUCCESS: Query correctly returned only the non-truncated shard") - } - } -} - -// TestShardGroupInfo_Contains_TruncatedShards verifies Contains method respects truncation -func TestShardGroupInfo_Contains_TruncatedShards(t *testing.T) { - data := &meta.Data{} - - must := func(err error) { - if err != nil { - t.Fatal(err) - } - } - - must(data.CreateDatabase("db")) - rp := meta.NewRetentionPolicyInfo("rp") - rp.ShardGroupDuration = 72 * time.Hour - must(data.CreateRetentionPolicy("db", rp, true)) - - // Create and truncate a shard group - futureTime := time.Unix(0, 0).Add(365 * 24 * time.Hour) - must(data.CreateShardGroup("db", "rp", futureTime)) - - sg, err := data.ShardGroupByTimestamp("db", "rp", futureTime) - if err != nil || sg == nil { - t.Fatal("Failed to create future shard group") - } - - // Truncate the shard partway through - truncateTime := sg.StartTime.Add(24 * time.Hour) - data.TruncateShardGroups(truncateTime) - - // Refresh the shard group reference - groups := data.Databases[0].RetentionPolicies[0].ShardGroups - for i := range groups { - if groups[i].ID == sg.ID { - *sg = groups[i] - break - } - } - - // Test timestamps - beforeStart := sg.StartTime.Add(-1 * time.Hour) - atStart := sg.StartTime - beforeTruncation := sg.StartTime.Add(12 * time.Hour) - atTruncation := sg.TruncatedAt - afterTruncation := sg.StartTime.Add(36 * time.Hour) - beforeOriginalEnd := sg.EndTime.Add(-1 * time.Hour) - atOriginalEnd := sg.EndTime - - testCases := []struct { - name string - timestamp time.Time - expected bool - }{ - {"before start", beforeStart, false}, - {"at start", atStart, true}, - {"before truncation", beforeTruncation, true}, - {"at truncation", atTruncation, false}, // Truncation point is exclusive - {"after truncation", afterTruncation, false}, - {"before original end", beforeOriginalEnd, false}, - {"at original end", atOriginalEnd, false}, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - result := sg.Contains(tc.timestamp) - if result != tc.expected { - t.Errorf("Contains(%v) = %v, expected %v", tc.timestamp, result, tc.expected) - t.Errorf("Shard: [%v - %v), truncated at %v", sg.StartTime, sg.EndTime, sg.TruncatedAt) - } - }) - } } func TestUserInfo_AuthorizeDatabase(t *testing.T) { From 803fb1ea100bea4fcfeb1f56552b7757b32e41f6 Mon Sep 17 00:00:00 2001 From: devanbenz Date: Fri, 22 Aug 2025 10:10:44 -0500 Subject: [PATCH 4/4] chore: Adjust comment --- services/meta/data.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/services/meta/data.go b/services/meta/data.go index 5f3f09eaf33..322afe24ae5 100644 --- a/services/meta/data.go +++ b/services/meta/data.go @@ -1438,7 +1438,8 @@ func (a ShardGroupInfos) Less(i, j int) bool { return iEnd.Before(jEnd) } -// Contains returns true iif StartTime ≤ t < EndTime (or TruncatedAt if truncated). +// Contains returns true iif StartTime ≤ t < EndTime +// or if shard was Truncated and the StartTime == TruncatedAt. func (sgi *ShardGroupInfo) Contains(t time.Time) bool { if sgi.Truncated() && sgi.TruncatedAt.Equal(sgi.StartTime) { return false @@ -1447,6 +1448,7 @@ func (sgi *ShardGroupInfo) Contains(t time.Time) bool { } // Overlaps returns whether the shard group contains data for the time range between min and max +// or if shard was Truncated and the StartTime == TruncatedAt. func (sgi *ShardGroupInfo) Overlaps(min, max time.Time) bool { if sgi.Truncated() && sgi.TruncatedAt.Equal(sgi.StartTime) { return false