@@ -7,6 +7,7 @@ package cspann
7
7
8
8
import (
9
9
"context"
10
+ "math"
10
11
"slices"
11
12
12
13
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/cspann/workspace"
@@ -227,6 +228,15 @@ func (fw *fixupWorker) splitPartition(
227
228
return err
228
229
}
229
230
231
+ // While most vectors will be assigned to the new sub-partitions, some may
232
+ // need to be reassigned to siblings that are closer.
233
+ vectors , err = fw .reassignToSiblings (
234
+ ctx , parentPartitionKey , partitionKey , partition , vectors ,
235
+ leftPartitionKey , rightPartitionKey )
236
+ if err != nil {
237
+ return err
238
+ }
239
+
230
240
// If still updating the sub-partitions, then distribute vectors among them.
231
241
leftState := leftMetadata .StateDetails .State
232
242
rightState := rightMetadata .StateDetails .State
@@ -346,6 +356,149 @@ func (fw *fixupWorker) splitPartition(
346
356
return nil
347
357
}
348
358
359
+ // reassignToSiblings checks if the vectors in a splitting partition need to be
360
+ // assigned to partitions other than the left and right sub-partitions. If a
361
+ // vector is closer to a sibling partition's centroid than it is to the left or
362
+ // right partitions' centroids, then it will be added to the sibling partition.
363
+ // It is also removed from "sourcePartition" and from "sourceVectors".
364
+ //
365
+ // reassignToSiblings returns the source vectors, minus any reassigned vectors.
366
+ func (fw * fixupWorker ) reassignToSiblings (
367
+ ctx context.Context ,
368
+ parentPartitionKey , sourcePartitionKey PartitionKey ,
369
+ sourcePartition * Partition ,
370
+ sourceVectors vector.Set ,
371
+ leftPartitionKey , rightPartitionKey PartitionKey ,
372
+ ) (vector.Set , error ) {
373
+ // No siblings if this is the root.
374
+ if parentPartitionKey == InvalidKey {
375
+ return sourceVectors , nil
376
+ }
377
+
378
+ // Fetch parent partition. If it does not exist, then it could be because
379
+ // another agent completed the split or because the parent was itself split.
380
+ // In either case, abort this split. If it's not yet done, it will be
381
+ // restarted at a later time with the new parent.
382
+ parentPartition , err := fw .getPartition (ctx , parentPartitionKey )
383
+ if err != nil {
384
+ return vector.Set {}, err
385
+ }
386
+ if parentPartition == nil {
387
+ return vector.Set {}, errors .Wrapf (errFixupAborted ,
388
+ "parent partition %d of partition %d no longer exists" ,
389
+ parentPartitionKey , sourcePartitionKey )
390
+ }
391
+
392
+ // Remove the splitting partition, since vectors cannot be assigned to it. If
393
+ // it is not a child of the parent, then abort the split. This can happen if
394
+ // another agent has completed the split or if the splitting partition has
395
+ // been re-parented when a new level was added to the tree.
396
+ if ! parentPartition .ReplaceWithLastByKey (ChildKey {PartitionKey : sourcePartitionKey }) {
397
+ return vector.Set {}, errors .Wrapf (errFixupAborted ,
398
+ "partition %d is no longer a child of parent partition %d" ,
399
+ sourcePartitionKey , parentPartitionKey )
400
+ }
401
+
402
+ // Lazily get sibling metadata only if it's actually needed.
403
+ fw .tempMetadataToGet = fw .tempMetadataToGet [:0 ]
404
+ getSiblingMetadata := func () ([]PartitionMetadataToGet , error ) {
405
+ if len (fw .tempMetadataToGet ) == 0 {
406
+ fw .tempMetadataToGet = ensureSliceLen (fw .tempMetadataToGet , parentPartition .Count ())
407
+ for i := range len (fw .tempMetadataToGet ) {
408
+ fw .tempMetadataToGet [i ].Key = parentPartition .ChildKeys ()[i ].PartitionKey
409
+ }
410
+ err = fw .index .store .TryGetPartitionMetadata (ctx , fw .treeKey , fw .tempMetadataToGet )
411
+ if err != nil {
412
+ return nil , errors .Wrapf (err ,
413
+ "getting partition metadata for %d siblings of partition %d (parent=%d)" ,
414
+ len (fw .tempMetadataToGet )- 1 , sourcePartitionKey , parentPartitionKey )
415
+ }
416
+ }
417
+ return fw .tempMetadataToGet , nil
418
+ }
419
+
420
+ tempSiblingDistances := fw .workspace .AllocFloats (parentPartition .Count ())
421
+ defer fw .workspace .FreeFloats (tempSiblingDistances )
422
+ tempSiblingErrorBounds := fw .workspace .AllocFloats (parentPartition .Count ())
423
+ defer fw .workspace .FreeFloats (tempSiblingErrorBounds )
424
+
425
+ for i := 0 ; i < sourceVectors .Count ; i ++ {
426
+ // Check whether the vector is closer to a sibling centroid than its own
427
+ // new centroid.
428
+ vec := sourceVectors .At (i )
429
+ parentPartition .Quantizer ().EstimateDistances (& fw .workspace ,
430
+ parentPartition .QuantizedSet (), vec , tempSiblingDistances , tempSiblingErrorBounds )
431
+
432
+ var leftDistance , rightDistance float32
433
+ minDistance := float32 (math .MaxFloat32 )
434
+ for offset , childKey := range parentPartition .ChildKeys () {
435
+ minDistance = min (minDistance , tempSiblingDistances [offset ])
436
+ if childKey .PartitionKey == leftPartitionKey {
437
+ leftDistance = tempSiblingDistances [offset ]
438
+ } else if childKey .PartitionKey == rightPartitionKey {
439
+ rightDistance = tempSiblingDistances [offset ]
440
+ }
441
+ }
442
+ if minDistance >= leftDistance && minDistance >= rightDistance {
443
+ // Could not find a closer sibling, so done with this vector.
444
+ continue
445
+ }
446
+
447
+ // Lazily fetch metadata for sibling partitions.
448
+ allSiblingMetadata , err := getSiblingMetadata ()
449
+ if err != nil {
450
+ return vector.Set {}, err
451
+ }
452
+
453
+ // Find nearest sibling that allows inserts and is closer than either the
454
+ // left or right sub-partitions.
455
+ siblingOffset := - 1
456
+ minDistance = min (leftDistance , rightDistance )
457
+ for offset , distance := range tempSiblingDistances {
458
+ if distance >= minDistance {
459
+ continue
460
+ } else if ! fw .tempMetadataToGet [offset ].Metadata .StateDetails .State .AllowAddOrRemove () {
461
+ continue
462
+ }
463
+ siblingOffset = offset
464
+ minDistance = distance
465
+ }
466
+ if siblingOffset == - 1 {
467
+ // No closer sibling could be found, so return.
468
+ continue
469
+ }
470
+
471
+ // Attempt to insert into the partition.
472
+ siblingPartitionKey := parentPartition .ChildKeys ()[siblingOffset ].PartitionKey
473
+ siblingMetadata := allSiblingMetadata [siblingOffset ].Metadata
474
+ childKey := sourcePartition .ChildKeys ()[i : i + 1 ]
475
+ valueBytes := sourcePartition .ValueBytes ()[i : i + 1 ]
476
+ _ , err = fw .addToPartition (ctx , siblingPartitionKey , vec .AsSet (),
477
+ childKey , valueBytes , siblingMetadata )
478
+ if err != nil {
479
+ allSiblingMetadata [siblingOffset ].Metadata , err = suppressRaceErrors (err )
480
+ if err == nil {
481
+ // Another worker raced to update the metadata, so just skip.
482
+ continue
483
+ }
484
+ return vector.Set {}, errors .Wrapf (err ,
485
+ "adding vector from splitting partition %d to partition %d" ,
486
+ sourcePartitionKey , siblingPartitionKey )
487
+ }
488
+
489
+ // Add succeeded, so remove the vector from the splitting partition.
490
+ sourceVectors .ReplaceWithLast (i )
491
+ sourcePartition .ReplaceWithLast (i )
492
+ i --
493
+
494
+ log .VEventf (ctx , 3 ,
495
+ "reassigning vector from splitting partition %d (parent=%d) to sibling partition %d" ,
496
+ sourcePartitionKey , parentPartitionKey , siblingPartitionKey )
497
+ }
498
+
499
+ return sourceVectors , nil
500
+ }
501
+
349
502
// getPartition returns the partition with the given key, or nil if it does not
350
503
// exist.
351
504
func (fw * fixupWorker ) getPartition (
@@ -364,7 +517,7 @@ func (fw *fixupWorker) getPartition(
364
517
// getPartitionMetadata returns the up-to-date metadata of the partition with
365
518
// the given key.
366
519
func (fw * fixupWorker ) getPartitionMetadata (
367
- ctx context.Context , partitionKey PartitionKey ,
520
+ ctx context.Context , partitionKey PartitionKey ,
368
521
) (PartitionMetadata , error ) {
369
522
fw .tempMetadataToGet = ensureSliceLen (fw .tempMetadataToGet , 1 )
370
523
fw .tempMetadataToGet [0 ].Key = partitionKey
@@ -569,6 +722,7 @@ func (fw *fixupWorker) createSplitSubPartition(
569
722
err = fw .index .store .TryCreateEmptyPartition (ctx , fw .treeKey , partitionKey , targetMetadata )
570
723
if err != nil {
571
724
targetMetadata , err = suppressRaceErrors (err )
725
+ centroid = targetMetadata .Centroid
572
726
if err != nil {
573
727
return PartitionMetadata {}, errors .Wrap (err , "creating empty sub-partition" )
574
728
}
@@ -630,7 +784,7 @@ func (fw *fixupWorker) addToParentPartition(
630
784
centroid = tempCentroid
631
785
}
632
786
633
- // Add the target partition key to the root paritition .
787
+ // Add the target partition key to the parent partition .
634
788
fw .tempChildKey [0 ] = ChildKey {PartitionKey : partitionKey }
635
789
fw .tempValueBytes [0 ] = nil
636
790
added , err := fw .addToPartition (ctx , parentPartitionKey ,
0 commit comments