Skip to content

Commit 18b42de

Browse files
craig[bot]rharding6373andy-kimball
committed
144668: changefeedccl: make quantization metamorphic r=andyyang890 a=rharding6373 This PR turns changefeed.resolved_timestamp.granularity into a metamorphic test constant. Epic: none Fixes: #144632 Release note: None 144800: cspann: improve TestIndexConcurrency and fix issues it found r=drewkimball a=andy-kimball Improve the TestIndexConcurrency test and fix issues it found. #### cspann: fix buglet in searcher Fix a small bug, where we were not setting s.levels in the case where we're inserting into the root partition. This was causing crdb_test runs to fail, since it reallocates the s.levels buffer every time.. #### cspann: fix data race in memstore.GetFullVectors The race detector found a data race in MemStore's GetFullVectors that was caused by not locking when accessing a partition's centroid. This was deliberate, because the centroid was thought to be immutable, but the TryClearPartition code sets the centroid to itself, which triggers the race detector. While we could change TryClearPartition to avoid that, it's probably best to just do the locking. However, that causes another problem, which is that the MemStore locking needed for getting partition centroids is different than the locking needed to get primary key vectors. Update the GetFullVectors API to make it clear that we can only ask for one or the other in a single call, so that we don't need to handle the case where partition keys and primary keys are interleaved in the same request. #### cspann: set correct parent partition in fallbackOnTargets If an insert fails to insert a vector into a partition that does not allow it, it calls fallbackOnTargets to redirect to one of the target partitions. However, the existing code does not set the parent partition correctly for target partitions. A split of a non-root partition should set the target partitions' parent as the parent of the splitting partition, not the splitting partition itself. #### cspann: add better control over search for update retries The retry logic in searchForUpdateHelper has a couple of problems: - In the event of an infinite retry loop bug, it can stack overflow, since it recursively calls itself. - Race conditions with the MemStore can cause it to not find a valid insert partition. Reads of MemStore partitions race with background fixups that update and delete the partitions. This commit improves the situation by specifying the maximum number of insert/delete attempts we'll make before giving up. The remaining attempts are preserved when making recursive retry calls, which prevents infinite retry loops. #### cspann: add DeletingForSplit state to split fixup flow Under heavy stress, a target partition can be split and deleted before its source partition finishes its own split. This results in a situation where the source partition is still pointing to target partitions that are now deleted. To prevent that state, this commit adds a new DeletingForSplit state into the split fixup flow, to be used for non-root partitions. After vectors have been copied to target partitions, the splitting partition is marked as DeletingForSplit. Next, the target partitions are marked Ready, and can now safely be split or merged themselves. Finally, the splitting partition is actually removed from the tree and deleted. #### cspann: fix partition reload race condition During split, we reload a partition's vectors, in case any have changed while we created target partitions. However, it's also possible that another worker has updated the splitting partition. In that case, we should abort and let the racing worker take over. The existing code was not doing that, and ended up trying to continue the split, but with incorrect target partition keys. #### cspann: improve logging during concurrent splits The existing logging makes it difficult to debug bugs caused by multiple concurrent workers racing to split a partition. This commit updates the logging to: - log after every key step in the split process - only log for the worker that actually wins the race for this step These changes reduce logging noise while still improving what does get logged. #### memstore: restart operation when reading a deleted partition When the memstore comes across a deleted partition, there are two cases: 1. The txn was started before the deletion, in which case it should be restarted, so that any search can find a different path through the tree. 2. The txn was started after the deletion, in which case ErrPartitionNotFound should be returned. #### memstore: fix race condition creating an empty partition The memstore.TryCreateEmptyPartition method has a race condition, such that two callers can end up creating different instances of the same partition. This commit fixes that by checking whether the partition already exists and then creating it if needed, all within the scope of the same lock. This change also uncovered an existing bug in splitPartition, in which we weren't fetching metadata for the left and right sub-partitions when restarting the split in the AddingLevel state. #### cspann: simulate multiple index instances in TestIndexConcurrency Create multiple index instances in the TestIndexConcurrency test, all hooked up to the same store. This simulates multiple CRDB nodes, each independently inserting, removing, searching, and splitting a shared index. Eliminate the delay for one instance assisting another, in order to maximize the possibility of race conditions. Co-authored-by: rharding6373 <[email protected]> Co-authored-by: Andrew Kimball <[email protected]>
3 parents 4966e23 + 2070dd5 + 88b1782 commit 18b42de

File tree

23 files changed

+955
-753
lines changed

23 files changed

+955
-753
lines changed

pkg/ccl/changefeedccl/changefeedbase/settings.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,6 @@ var DefaultLaggingRangesPollingInterval = 1 * time.Minute
345345
var Quantize = settings.RegisterDurationSettingWithExplicitUnit(
346346
settings.ApplicationLevel,
347347
"changefeed.resolved_timestamp.granularity",
348-
"the granularity at which changefeed progress are quantized to make tracking more efficient",
349-
0,
348+
"the granularity at which changefeed progress is quantized to make tracking more efficient",
349+
time.Duration(metamorphic.ConstantWithTestRange("changefeed.resolved_timestamp.granularity", 0, 0, 20))*time.Second,
350350
)

pkg/cmd/roachtest/tests/cdc.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -778,11 +778,17 @@ func newCDCTester(ctx context.Context, t test.Test, c cluster.Cluster, opts ...o
778778

779779
startOpts, settings := makeCDCBenchOptions(c)
780780

781-
// With a target_duration of 10s, we won't see slow span logs from changefeeds untils we are > 100s
781+
// With a target_duration of 10s, we won't see slow span logs from changefeeds until we are > 100s
782782
// behind, which is well above the 60s targetSteadyLatency we have in some tests.
783783
settings.ClusterSettings["changefeed.slow_span_log_threshold"] = "30s"
784784
settings.ClusterSettings["server.child_metrics.enabled"] = "true"
785785

786+
// Randomly set a quantization interval since metamorphic settings
787+
// don't extend to roachtests.
788+
quantization := fmt.Sprintf("%ds", rand.Intn(30))
789+
settings.ClusterSettings["changefeed.resolved_timestamp.granularity"] = quantization
790+
t.Status(fmt.Sprintf("changefeed.resolved_timestamp.granularity: %s", quantization))
791+
786792
settings.Env = append(settings.Env, envVars...)
787793

788794
c.Start(ctx, t.L(), startOpts, settings, tester.crdbNodes)

pkg/sql/vecindex/cspann/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ go_test(
7676
"//pkg/util/log",
7777
"//pkg/util/num32",
7878
"//pkg/util/stop",
79+
"//pkg/util/syncutil",
7980
"//pkg/util/vector",
8081
"@com_github_cockroachdb_crlib//crtime",
8182
"@com_github_cockroachdb_datadriven//:datadriven",

pkg/sql/vecindex/cspann/commontest/storetests.go

Lines changed: 59 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func (suite *StoreTestSuite) TestGetPartitionMetadata() {
129129
suite.ctx, treeKey, cspann.RootKey, false /* forUpdate */)
130130
suite.NoError(err)
131131
CheckPartitionMetadata(suite.T(), metadata, cspann.LeafLevel, vector.T{0, 0},
132-
cspann.MakeReadyDetails())
132+
cspann.PartitionStateDetails{State: cspann.ReadyState})
133133

134134
// Non-root partition does not yet exist, expect error.
135135
_, err = txn.GetPartitionMetadata(
@@ -145,12 +145,12 @@ func (suite *StoreTestSuite) TestGetPartitionMetadata() {
145145
metadata, err = txn.GetPartitionMetadata(
146146
suite.ctx, treeKey, partitionKey, true /* forUpdate */)
147147
CheckPartitionMetadata(suite.T(), metadata, cspann.SecondLevel, vector.T{4, 3},
148-
cspann.MakeReadyDetails())
148+
cspann.PartitionStateDetails{State: cspann.ReadyState})
149149
})
150150

151151
// Update the partition state to DrainingForSplit.
152152
expected := metadata
153-
metadata.StateDetails = cspann.MakeDrainingForSplitDetails(20, 30)
153+
metadata.StateDetails.MakeDrainingForSplit(20, 30)
154154
suite.NoError(store.TryUpdatePartitionMetadata(
155155
suite.ctx, treeKey, partitionKey, metadata, expected))
156156

@@ -159,15 +159,16 @@ func (suite *StoreTestSuite) TestGetPartitionMetadata() {
159159
metadata, err := txn.GetPartitionMetadata(
160160
suite.ctx, treeKey, partitionKey, false /* forUpdate */)
161161
suite.NoError(err)
162-
CheckPartitionMetadata(suite.T(), metadata, cspann.SecondLevel, vector.T{4, 3},
163-
cspann.MakeDrainingForSplitDetails(20, 30))
162+
details := cspann.PartitionStateDetails{
163+
State: cspann.DrainingForSplitState, Target1: 20, Target2: 30}
164+
CheckPartitionMetadata(suite.T(), metadata, cspann.SecondLevel, vector.T{4, 3}, details)
164165

165166
// If forUpdate = true, GetPartitionMetadata should error.
166167
var errConditionFailed *cspann.ConditionFailedError
167168
_, err = txn.GetPartitionMetadata(suite.ctx, treeKey, partitionKey, true /* forUpdate */)
168169
suite.ErrorAs(err, &errConditionFailed)
169170
CheckPartitionMetadata(suite.T(), errConditionFailed.Actual, cspann.SecondLevel,
170-
vector.T{4, 3}, cspann.MakeDrainingForSplitDetails(20, 30))
171+
vector.T{4, 3}, details)
171172
})
172173
suite.NoError(err)
173174
}
@@ -223,7 +224,7 @@ func (suite *StoreTestSuite) TestAddToPartition() {
223224
partition, err := store.TryGetPartition(suite.ctx, treeKey, cspann.RootKey)
224225
suite.NoError(err)
225226
CheckPartitionMetadata(suite.T(), *partition.Metadata(), cspann.LeafLevel,
226-
vector.T{0, 0}, cspann.MakeReadyDetails())
227+
vector.T{0, 0}, cspann.PartitionStateDetails{State: cspann.ReadyState})
227228
suite.Equal([]cspann.ChildKey{primaryKey1, primaryKey2}, partition.ChildKeys())
228229
suite.Equal([]cspann.ValueBytes{valueBytes1, valueBytes3}, partition.ValueBytes())
229230

@@ -241,7 +242,7 @@ func (suite *StoreTestSuite) TestAddToPartition() {
241242
// Update the partition state to DrainingForMerge.
242243
expected := *partition.Metadata()
243244
metadata := expected
244-
metadata.StateDetails = cspann.MakeDrainingForMergeDetails(20)
245+
metadata.StateDetails.MakeDrainingForMerge(20)
245246
suite.NoError(store.TryUpdatePartitionMetadata(
246247
suite.ctx, treeKey, partitionKey, metadata, expected))
247248

@@ -251,8 +252,9 @@ func (suite *StoreTestSuite) TestAddToPartition() {
251252
err = txn.AddToPartition(suite.ctx, treeKey, partitionKey, cspann.SecondLevel,
252253
vec4, partitionKey4, valueBytes4)
253254
suite.ErrorAs(err, &errConditionFailed)
255+
details := cspann.PartitionStateDetails{State: cspann.DrainingForMergeState, Target1: 20}
254256
CheckPartitionMetadata(suite.T(), errConditionFailed.Actual, cspann.SecondLevel,
255-
vector.T{4, 3}, cspann.MakeDrainingForMergeDetails(20))
257+
vector.T{4, 3}, details)
256258
})
257259
}
258260

@@ -289,9 +291,12 @@ func (suite *StoreTestSuite) TestRemoveFromPartition() {
289291
err = txn.AddToPartition(suite.ctx, treeKey, cspann.RootKey, cspann.LeafLevel,
290292
vec1, primaryKey1, valueBytes1)
291293
suite.NoError(err)
294+
})
295+
CheckPartitionCount(suite.ctx, suite.T(), store, treeKey, cspann.RootKey, 1)
292296

293-
// Remove that vector.
294-
err = txn.RemoveFromPartition(suite.ctx, treeKey, cspann.RootKey,
297+
// Remove the vector that was added to the root.
298+
RunTransaction(suite.ctx, suite.T(), store, func(txn cspann.Txn) {
299+
err := txn.RemoveFromPartition(suite.ctx, treeKey, cspann.RootKey,
295300
cspann.LeafLevel, primaryKey1)
296301
suite.NoError(err)
297302
})
@@ -311,7 +316,7 @@ func (suite *StoreTestSuite) TestRemoveFromPartition() {
311316
// Update the partition state to DrainingForSplit.
312317
expected := *partition.Metadata()
313318
metadata := expected
314-
metadata.StateDetails = cspann.MakeDrainingForSplitDetails(20, 30)
319+
metadata.StateDetails.MakeDrainingForSplit(20, 30)
315320
suite.NoError(store.TryUpdatePartitionMetadata(
316321
suite.ctx, treeKey, partitionKey, metadata, expected))
317322

@@ -321,8 +326,10 @@ func (suite *StoreTestSuite) TestRemoveFromPartition() {
321326
err := txn.RemoveFromPartition(suite.ctx, treeKey, partitionKey, cspann.SecondLevel,
322327
partitionKey3)
323328
suite.ErrorAs(err, &errConditionFailed)
329+
details := cspann.PartitionStateDetails{
330+
State: cspann.DrainingForSplitState, Target1: 20, Target2: 30}
324331
CheckPartitionMetadata(suite.T(), errConditionFailed.Actual, cspann.SecondLevel,
325-
vector.T{4, 3}, cspann.MakeDrainingForSplitDetails(20, 30))
332+
vector.T{4, 3}, details)
326333
})
327334
}
328335

@@ -351,10 +358,10 @@ func (suite *StoreTestSuite) TestSearchPartitions() {
351358
// Create another partition.
352359
testPartitionKey2 := cspann.PartitionKey(20)
353360
metadata := cspann.PartitionMetadata{
354-
Level: cspann.SecondLevel,
355-
Centroid: vector.T{2, 4},
356-
StateDetails: cspann.MakeSplittingDetails(10, 20),
361+
Level: cspann.SecondLevel,
362+
Centroid: vector.T{2, 4},
357363
}
364+
metadata.StateDetails.MakeSplitting(10, 20)
358365
suite.NoError(store.TryCreateEmptyPartition(suite.ctx, treeKey, testPartitionKey2, metadata))
359366
vectors := vector.MakeSet(2)
360367
vectors.Add(vec4)
@@ -423,40 +430,51 @@ func (suite *StoreTestSuite) TestGetFullVectors() {
423430
// Create partitions.
424431
treeKey := store.MakeTreeKey(suite.T(), treeID)
425432
metadata := cspann.PartitionMetadata{
426-
Level: cspann.SecondLevel,
427-
Centroid: vector.T{0, 0},
428-
StateDetails: cspann.MakeSplittingDetails(20, 30),
433+
Level: cspann.SecondLevel,
434+
Centroid: vector.T{0, 0},
429435
}
436+
metadata.StateDetails.MakeSplitting(20, 30)
430437
suite.NoError(store.TryCreateEmptyPartition(suite.ctx, treeKey, cspann.RootKey, metadata))
431438
partitionKey, _ := suite.createTestPartition(store, treeKey)
432439

433440
RunTransaction(suite.ctx, suite.T(), store, func(txn cspann.Txn) {
441+
// Empty request set.
442+
err := txn.GetFullVectors(suite.ctx, treeKey, []cspann.VectorWithKey{})
443+
suite.NoError(err)
444+
434445
// Insert some full vectors into the test store.
435446
key1 := store.InsertVector(suite.T(), treeID, vec1)
436447
key2 := store.InsertVector(suite.T(), treeID, vec2)
437448
key3 := store.InsertVector(suite.T(), treeID, vec3)
438449

439-
// Include primary keys, partition keys, and keys that cannot be found.
450+
// Start by fetching partition keys, both that exist and that do not.
440451
results := []cspann.VectorWithKey{
452+
{Key: cspann.ChildKey{PartitionKey: cspann.RootKey}},
453+
{Key: cspann.ChildKey{PartitionKey: cspann.PartitionKey(99)}}, // No such partition.
454+
{Key: cspann.ChildKey{PartitionKey: partitionKey}},
455+
}
456+
err = txn.GetFullVectors(suite.ctx, treeKey, results)
457+
suite.NoError(err)
458+
suite.Equal(vector.T{0, 0}, results[0].Vector)
459+
suite.Nil(results[1].Vector)
460+
suite.Equal(vector.T{4, 3}, results[2].Vector)
461+
462+
// Next fetch primary keys that reference vectors that exist and that
463+
// do not exist.
464+
results = []cspann.VectorWithKey{
441465
{Key: cspann.ChildKey{KeyBytes: key1}},
442466
{Key: cspann.ChildKey{KeyBytes: cspann.KeyBytes{0}}},
443-
{Key: cspann.ChildKey{PartitionKey: cspann.RootKey}},
444467
{Key: cspann.ChildKey{KeyBytes: key2}},
445468
{Key: cspann.ChildKey{KeyBytes: cspann.KeyBytes{0}}},
446-
{Key: cspann.ChildKey{PartitionKey: partitionKey}},
447469
{Key: cspann.ChildKey{KeyBytes: key3}},
448-
{Key: cspann.ChildKey{PartitionKey: cspann.PartitionKey(99)}}, // No such partition.
449470
}
450-
err := txn.GetFullVectors(suite.ctx, treeKey, results)
471+
err = txn.GetFullVectors(suite.ctx, treeKey, results)
451472
suite.NoError(err)
452473
suite.Equal(vec1, results[0].Vector)
453474
suite.Nil(results[1].Vector)
454-
suite.Equal(vector.T{0, 0}, results[2].Vector)
455-
suite.Equal(vec2, results[3].Vector)
456-
suite.Nil(results[4].Vector)
457-
suite.Equal(vector.T{4, 3}, results[5].Vector)
458-
suite.Equal(vec3, results[6].Vector)
459-
suite.Nil(results[7].Vector)
475+
suite.Equal(vec2, results[2].Vector)
476+
suite.Nil(results[3].Vector)
477+
suite.Equal(vec3, results[4].Vector)
460478

461479
// Grab another set of vectors to ensure that saved state is properly reset.
462480
results = []cspann.VectorWithKey{
@@ -530,10 +548,10 @@ func (suite *StoreTestSuite) TestTryCreateEmptyPartition() {
530548

531549
// Create empty partition.
532550
metadata := cspann.PartitionMetadata{
533-
Level: cspann.SecondLevel,
534-
Centroid: centroid,
535-
StateDetails: cspann.MakeSplittingDetails(20, 30),
551+
Level: cspann.SecondLevel,
552+
Centroid: centroid,
536553
}
554+
metadata.StateDetails.MakeSplitting(20, 30)
537555
suite.NoError(store.TryCreateEmptyPartition(suite.ctx, treeKey, partitionKey, metadata))
538556

539557
// Fetch back the partition and validate it.
@@ -622,10 +640,10 @@ func (suite *StoreTestSuite) TestTryGetPartition() {
622640

623641
// Create partition with some vectors in it.
624642
metadata := cspann.PartitionMetadata{
625-
Level: cspann.LeafLevel,
626-
Centroid: centroid,
627-
StateDetails: cspann.MakeUpdatingDetails(20),
643+
Level: cspann.LeafLevel,
644+
Centroid: centroid,
628645
}
646+
metadata.StateDetails.MakeUpdating(20)
629647
suite.NoError(store.TryCreateEmptyPartition(suite.ctx, treeKey, partitionKey, metadata))
630648
vectors := vector.MakeSet(2)
631649
vectors.Add(vec1)
@@ -683,7 +701,7 @@ func (suite *StoreTestSuite) TestTryGetPartitionMetadata() {
683701
// Update the metadata and verify we get the updated values.
684702
expected := *partition.Metadata()
685703
metadata := expected
686-
metadata.StateDetails = cspann.MakeUpdatingDetails(30)
704+
metadata.StateDetails.MakeUpdating(30)
687705
suite.NoError(store.TryUpdatePartitionMetadata(
688706
suite.ctx, treeKey, partitionKey, metadata, expected))
689707

@@ -761,10 +779,10 @@ func (suite *StoreTestSuite) TestTryAddToPartition() {
761779

762780
// Partition does not yet exist.
763781
metadata := cspann.PartitionMetadata{
764-
Level: cspann.LeafLevel,
765-
Centroid: centroid,
766-
StateDetails: cspann.MakeUpdatingDetails(20),
782+
Level: cspann.LeafLevel,
783+
Centroid: centroid,
767784
}
785+
metadata.StateDetails.MakeUpdating(20)
768786
addVectors := vector.MakeSet(2)
769787
addVectors.Add(vec1)
770788
addVectors.Add(vec2)
@@ -1006,11 +1024,7 @@ func (suite *StoreTestSuite) createTestPartition(
10061024
store TestStore, treeKey cspann.TreeKey,
10071025
) (cspann.PartitionKey, *cspann.Partition) {
10081026
partitionKey := cspann.PartitionKey(10)
1009-
metadata := cspann.PartitionMetadata{
1010-
Level: cspann.SecondLevel,
1011-
Centroid: vector.T{4, 3},
1012-
StateDetails: cspann.MakeReadyDetails(),
1013-
}
1027+
metadata := cspann.MakeReadyPartitionMetadata(cspann.SecondLevel, vector.T{4, 3})
10141028
suite.NoError(store.TryCreateEmptyPartition(suite.ctx, treeKey, partitionKey, metadata))
10151029
vectors := vector.MakeSet(2)
10161030
vectors.Add(vec1)

0 commit comments

Comments
 (0)