Skip to content

Commit c91bb76

Browse files
MB-57888: Support for downtime mitigation upon updates to index mapping (#2106)
- Added new apis for index update - Added logic to determine whether index mappings can be updated and what specifically changed between mappings - Added logic to store and retrieve said information within bolt - Added checks to prevent deleted data from being referenced within queries till the actual data on the segment is removed during the merge process Related: - blevesearch/bleve_index_api#64 - blevesearch/scorch_segment_api#61 - blevesearch/zapx#318 --------- Co-authored-by: Abhinav Dangeti <abhinav@couchbase.com>
1 parent 179da11 commit c91bb76

15 files changed

+4329
-83
lines changed

builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func newBuilder(path string, mapping mapping.IndexMapping, config map[string]int
6868
return nil, err
6969
}
7070
config["internal"] = map[string][]byte{
71-
string(mappingInternalKey): mappingBytes,
71+
string(util.MappingInternalKey): mappingBytes,
7272
}
7373

7474
// do not use real config, as these are options for the builder,

docs/index_update.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Ability to reduce downtime during index mapping updates
2+
3+
* *v2.5.4* (and after) will come with support to delete or modify any field mapping in the index mapping without requiring a full rebuild of the index
4+
* We do this by storing which portions of the field has to be deleted within zap and then lazily executing the deletion during subsequent merging of the segments
5+
6+
## Usage
7+
8+
While opening an index, if an updated mapping is provided as a string under the key `updated_mapping` within the `runtimeConfig` parameter of `OpenUsing`, then we open the index and try to update it to use the new mapping provided.
9+
10+
If the update fails, the index is unchanged and an error is returned explaining why the update was unsuccessful.
11+
12+
## What can be deleted and what can't be deleted?
13+
Fields can be partially deleted by changing their Index, Store, and DocValues parameters from true to false, or completely removed by deleting the field itself.
14+
15+
Additionally, document mappings can be deleted either by fully removing them from the index mapping or by setting the Enabled value to false, which deletes all fields defined within that mapping.
16+
17+
However, if any of the following conditions are met, the index is considered non-updatable.
18+
* Any additional fields or enabled document mappings in the new index mapping
19+
* Any changes to IncludeInAll, type, IncludeTermVectors and SkipFreqNorm
20+
* Any document mapping having it's enabled value changing from false to true
21+
* Text fields with a different analyser or date time fields with a different date time format
22+
* Vector and VectorBase64 fields changing dims, similarity or vectorIndexOptimizedFor
23+
* Any changes when field is part of `_all`
24+
* Full field deletions when it is covered by any dynamic setting (Index, Store or DocValues Dynamic)
25+
* Any changes to dynamic settings at the top level or any enabled document mapping
26+
* If multiple fields sharing the same field name either from different type mappings or aliases are present, then any non compatible changes across all of these fields
27+
28+
## How to enforce immediate deletion?
29+
Since the deletion is only done during merging, a [force merge](https://github.com/blevesearch/bleve/blob/b82baf10b205511cf12da5cb24330abd9f5b1b74/index/scorch/merge.go#L164) may be used to completely remove the stale data.
30+
31+
## Sample code to update an existing index
32+
```
33+
newMapping := `<Updated Index Mapping>`
34+
config := map[string]interface{}{
35+
"updated_mapping": newMapping
36+
}
37+
index, err := OpenUsing("<Path to Index>", config)
38+
if err != nil {
39+
return err
40+
}
41+
```

index.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,8 @@ func Open(path string) (Index, error) {
325325
// The mapping used when it was created will be used for all Index/Search operations.
326326
// The provided runtimeConfig can override settings
327327
// persisted when the kvstore was created.
328+
// If runtimeConfig has updated mapping, then an index update is attempted
329+
// Throws an error without any changes to the index if an unupdatable mapping is provided
328330
func OpenUsing(path string, runtimeConfig map[string]interface{}) (Index, error) {
329331
return openIndexUsing(path, runtimeConfig)
330332
}

index/scorch/optimize_knn.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ func (o *OptimizeVR) Finish() error {
7979
wg.Done()
8080
}()
8181
for field, vrs := range o.vrs {
82+
// Early exit if the field is supposed to be completely deleted or
83+
// if it's index data has been deleted
84+
if info, ok := o.snapshot.updatedFields[field]; ok && (info.Deleted || info.Index) {
85+
continue
86+
}
87+
8288
vecIndex, err := segment.InterpretVectorIndex(field,
8389
o.requiresFiltering, origSeg.deleted)
8490
if err != nil {

index/scorch/persister.go

Lines changed: 63 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ func persistToDirectory(seg segment.UnpersistedSegment, d index.Directory,
608608
func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
609609
segPlugin SegmentPlugin, exclude map[uint64]struct{}, d index.Directory) (
610610
[]string, map[uint64]string, error) {
611-
snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket)
611+
snapshotsBucket, err := tx.CreateBucketIfNotExists(util.BoltSnapshotsBucket)
612612
if err != nil {
613613
return nil, nil, err
614614
}
@@ -619,17 +619,17 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
619619
}
620620

621621
// persist meta values
622-
metaBucket, err := snapshotBucket.CreateBucketIfNotExists(boltMetaDataKey)
622+
metaBucket, err := snapshotBucket.CreateBucketIfNotExists(util.BoltMetaDataKey)
623623
if err != nil {
624624
return nil, nil, err
625625
}
626-
err = metaBucket.Put(boltMetaDataSegmentTypeKey, []byte(segPlugin.Type()))
626+
err = metaBucket.Put(util.BoltMetaDataSegmentTypeKey, []byte(segPlugin.Type()))
627627
if err != nil {
628628
return nil, nil, err
629629
}
630630
buf := make([]byte, binary.MaxVarintLen32)
631631
binary.BigEndian.PutUint32(buf, segPlugin.Version())
632-
err = metaBucket.Put(boltMetaDataSegmentVersionKey, buf)
632+
err = metaBucket.Put(util.BoltMetaDataSegmentVersionKey, buf)
633633
if err != nil {
634634
return nil, nil, err
635635
}
@@ -643,13 +643,13 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
643643
if err != nil {
644644
return nil, nil, err
645645
}
646-
err = metaBucket.Put(boltMetaDataTimeStamp, timeStampBinary)
646+
err = metaBucket.Put(util.BoltMetaDataTimeStamp, timeStampBinary)
647647
if err != nil {
648648
return nil, nil, err
649649
}
650650

651651
// persist internal values
652-
internalBucket, err := snapshotBucket.CreateBucketIfNotExists(boltInternalKey)
652+
internalBucket, err := snapshotBucket.CreateBucketIfNotExists(util.BoltInternalKey)
653653
if err != nil {
654654
return nil, nil, err
655655
}
@@ -665,7 +665,7 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
665665
val := make([]byte, 8)
666666
bytesWritten := atomic.LoadUint64(&snapshot.parent.stats.TotBytesWrittenAtIndexTime)
667667
binary.LittleEndian.PutUint64(val, bytesWritten)
668-
err = internalBucket.Put(TotBytesWrittenKey, val)
668+
err = internalBucket.Put(util.TotBytesWrittenKey, val)
669669
if err != nil {
670670
return nil, nil, err
671671
}
@@ -689,7 +689,7 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
689689
return nil, nil, fmt.Errorf("segment: %s copy err: %v", segPath, err)
690690
}
691691
filename := filepath.Base(segPath)
692-
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
692+
err = snapshotSegmentBucket.Put(util.BoltPathKey, []byte(filename))
693693
if err != nil {
694694
return nil, nil, err
695695
}
@@ -705,7 +705,7 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
705705
return nil, nil, fmt.Errorf("segment: %s persist err: %v", path, err)
706706
}
707707
newSegmentPaths[segmentSnapshot.id] = path
708-
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
708+
err = snapshotSegmentBucket.Put(util.BoltPathKey, []byte(filename))
709709
if err != nil {
710710
return nil, nil, err
711711
}
@@ -721,7 +721,7 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
721721
if err != nil {
722722
return nil, nil, fmt.Errorf("error persisting roaring bytes: %v", err)
723723
}
724-
err = snapshotSegmentBucket.Put(boltDeletedKey, roaringBuf.Bytes())
724+
err = snapshotSegmentBucket.Put(util.BoltDeletedKey, roaringBuf.Bytes())
725725
if err != nil {
726726
return nil, nil, err
727727
}
@@ -733,7 +733,19 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
733733
if err != nil {
734734
return nil, nil, err
735735
}
736-
err = snapshotSegmentBucket.Put(boltStatsKey, b)
736+
err = snapshotSegmentBucket.Put(util.BoltStatsKey, b)
737+
if err != nil {
738+
return nil, nil, err
739+
}
740+
}
741+
742+
// store updated field info
743+
if segmentSnapshot.updatedFields != nil {
744+
b, err := json.Marshal(segmentSnapshot.updatedFields)
745+
if err != nil {
746+
return nil, nil, err
747+
}
748+
err = snapshotSegmentBucket.Put(util.BoltUpdatedFieldsKey, b)
737749
if err != nil {
738750
return nil, nil, err
739751
}
@@ -832,22 +844,9 @@ func zapFileName(epoch uint64) string {
832844

833845
// bolt snapshot code
834846

835-
var (
836-
boltSnapshotsBucket = []byte{'s'}
837-
boltPathKey = []byte{'p'}
838-
boltDeletedKey = []byte{'d'}
839-
boltInternalKey = []byte{'i'}
840-
boltMetaDataKey = []byte{'m'}
841-
boltMetaDataSegmentTypeKey = []byte("type")
842-
boltMetaDataSegmentVersionKey = []byte("version")
843-
boltMetaDataTimeStamp = []byte("timeStamp")
844-
boltStatsKey = []byte("stats")
845-
TotBytesWrittenKey = []byte("TotBytesWritten")
846-
)
847-
848847
func (s *Scorch) loadFromBolt() error {
849848
err := s.rootBolt.View(func(tx *bolt.Tx) error {
850-
snapshots := tx.Bucket(boltSnapshotsBucket)
849+
snapshots := tx.Bucket(util.BoltSnapshotsBucket)
851850
if snapshots == nil {
852851
return nil
853852
}
@@ -912,7 +911,7 @@ func (s *Scorch) loadFromBolt() error {
912911
// NOTE: this is currently ONLY intended to be used by the command-line tool
913912
func (s *Scorch) LoadSnapshot(epoch uint64) (rv *IndexSnapshot, err error) {
914913
err = s.rootBolt.View(func(tx *bolt.Tx) error {
915-
snapshots := tx.Bucket(boltSnapshotsBucket)
914+
snapshots := tx.Bucket(util.BoltSnapshotsBucket)
916915
if snapshots == nil {
917916
return nil
918917
}
@@ -940,14 +939,14 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
940939
// first we look for the meta-data bucket, this will tell us
941940
// which segment type/version was used for this snapshot
942941
// all operations for this scorch will use this type/version
943-
metaBucket := snapshot.Bucket(boltMetaDataKey)
942+
metaBucket := snapshot.Bucket(util.BoltMetaDataKey)
944943
if metaBucket == nil {
945944
_ = rv.DecRef()
946945
return nil, fmt.Errorf("meta-data bucket missing")
947946
}
948-
segmentType := string(metaBucket.Get(boltMetaDataSegmentTypeKey))
947+
segmentType := string(metaBucket.Get(util.BoltMetaDataSegmentTypeKey))
949948
segmentVersion := binary.BigEndian.Uint32(
950-
metaBucket.Get(boltMetaDataSegmentVersionKey))
949+
metaBucket.Get(util.BoltMetaDataSegmentVersionKey))
951950
err := s.loadSegmentPlugin(segmentType, segmentVersion)
952951
if err != nil {
953952
_ = rv.DecRef()
@@ -957,7 +956,7 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
957956
var running uint64
958957
c := snapshot.Cursor()
959958
for k, _ := c.First(); k != nil; k, _ = c.Next() {
960-
if k[0] == boltInternalKey[0] {
959+
if k[0] == util.BoltInternalKey[0] {
961960
internalBucket := snapshot.Bucket(k)
962961
if internalBucket == nil {
963962
_ = rv.DecRef()
@@ -972,11 +971,11 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
972971
_ = rv.DecRef()
973972
return nil, err
974973
}
975-
} else if k[0] != boltMetaDataKey[0] {
974+
} else if k[0] != util.BoltMetaDataKey[0] {
976975
segmentBucket := snapshot.Bucket(k)
977976
if segmentBucket == nil {
978977
_ = rv.DecRef()
979-
return nil, fmt.Errorf("segment key, but bucket missing % x", k)
978+
return nil, fmt.Errorf("segment key, but bucket missing %x", k)
980979
}
981980
segmentSnapshot, err := s.loadSegment(segmentBucket)
982981
if err != nil {
@@ -990,53 +989,70 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
990989
}
991990
rv.segment = append(rv.segment, segmentSnapshot)
992991
rv.offsets = append(rv.offsets, running)
992+
// Merge all segment level updated field info for use during queries
993+
if segmentSnapshot.updatedFields != nil {
994+
rv.MergeUpdateFieldsInfo(segmentSnapshot.updatedFields)
995+
}
993996
running += segmentSnapshot.segment.Count()
994997
}
995998
}
996999
return rv, nil
9971000
}
9981001

9991002
func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, error) {
1000-
pathBytes := segmentBucket.Get(boltPathKey)
1003+
pathBytes := segmentBucket.Get(util.BoltPathKey)
10011004
if pathBytes == nil {
10021005
return nil, fmt.Errorf("segment path missing")
10031006
}
10041007
segmentPath := s.path + string(os.PathSeparator) + string(pathBytes)
1005-
segment, err := s.segPlugin.Open(segmentPath)
1008+
seg, err := s.segPlugin.Open(segmentPath)
10061009
if err != nil {
10071010
return nil, fmt.Errorf("error opening bolt segment: %v", err)
10081011
}
10091012

10101013
rv := &SegmentSnapshot{
1011-
segment: segment,
1014+
segment: seg,
10121015
cachedDocs: &cachedDocs{cache: nil},
10131016
cachedMeta: &cachedMeta{meta: nil},
10141017
}
1015-
deletedBytes := segmentBucket.Get(boltDeletedKey)
1018+
deletedBytes := segmentBucket.Get(util.BoltDeletedKey)
10161019
if deletedBytes != nil {
10171020
deletedBitmap := roaring.NewBitmap()
10181021
r := bytes.NewReader(deletedBytes)
10191022
_, err := deletedBitmap.ReadFrom(r)
10201023
if err != nil {
1021-
_ = segment.Close()
1024+
_ = seg.Close()
10221025
return nil, fmt.Errorf("error reading deleted bytes: %v", err)
10231026
}
10241027
if !deletedBitmap.IsEmpty() {
10251028
rv.deleted = deletedBitmap
10261029
}
10271030
}
1028-
statBytes := segmentBucket.Get(boltStatsKey)
1031+
statBytes := segmentBucket.Get(util.BoltStatsKey)
10291032
if statBytes != nil {
10301033
var statsMap map[string]map[string]uint64
10311034

10321035
err := json.Unmarshal(statBytes, &statsMap)
10331036
stats := &fieldStats{statMap: statsMap}
10341037
if err != nil {
1035-
_ = segment.Close()
1038+
_ = seg.Close()
10361039
return nil, fmt.Errorf("error reading stat bytes: %v", err)
10371040
}
10381041
rv.stats = stats
10391042
}
1043+
updatedFieldBytes := segmentBucket.Get(util.BoltUpdatedFieldsKey)
1044+
if updatedFieldBytes != nil {
1045+
var updatedFields map[string]*index.UpdateFieldInfo
1046+
1047+
err := json.Unmarshal(updatedFieldBytes, &updatedFields)
1048+
if err != nil {
1049+
_ = seg.Close()
1050+
return nil, fmt.Errorf("error reading updated field bytes: %v", err)
1051+
}
1052+
rv.updatedFields = updatedFields
1053+
// Set the value within the segment base for use during merge
1054+
rv.UpdateFieldsInfo(rv.updatedFields)
1055+
}
10401056

10411057
return rv, nil
10421058
}
@@ -1215,7 +1231,7 @@ func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) {
12151231
}
12161232
}()
12171233

1218-
snapshots := tx.Bucket(boltSnapshotsBucket)
1234+
snapshots := tx.Bucket(util.BoltSnapshotsBucket)
12191235
if snapshots == nil {
12201236
return 0, nil
12211237
}
@@ -1325,7 +1341,7 @@ func (s *Scorch) rootBoltSnapshotMetaData() ([]*snapshotMetaData, error) {
13251341
expirationDuration := time.Duration(s.numSnapshotsToKeep-1) * s.rollbackSamplingInterval
13261342

13271343
err := s.rootBolt.View(func(tx *bolt.Tx) error {
1328-
snapshots := tx.Bucket(boltSnapshotsBucket)
1344+
snapshots := tx.Bucket(util.BoltSnapshotsBucket)
13291345
if snapshots == nil {
13301346
return nil
13311347
}
@@ -1349,11 +1365,11 @@ func (s *Scorch) rootBoltSnapshotMetaData() ([]*snapshotMetaData, error) {
13491365
if snapshot == nil {
13501366
continue
13511367
}
1352-
metaBucket := snapshot.Bucket(boltMetaDataKey)
1368+
metaBucket := snapshot.Bucket(util.BoltMetaDataKey)
13531369
if metaBucket == nil {
13541370
continue
13551371
}
1356-
timeStampBytes := metaBucket.Get(boltMetaDataTimeStamp)
1372+
timeStampBytes := metaBucket.Get(util.BoltMetaDataTimeStamp)
13571373
var timeStamp time.Time
13581374
err = timeStamp.UnmarshalText(timeStampBytes)
13591375
if err != nil {
@@ -1390,7 +1406,7 @@ func (s *Scorch) rootBoltSnapshotMetaData() ([]*snapshotMetaData, error) {
13901406
func (s *Scorch) RootBoltSnapshotEpochs() ([]uint64, error) {
13911407
var rv []uint64
13921408
err := s.rootBolt.View(func(tx *bolt.Tx) error {
1393-
snapshots := tx.Bucket(boltSnapshotsBucket)
1409+
snapshots := tx.Bucket(util.BoltSnapshotsBucket)
13941410
if snapshots == nil {
13951411
return nil
13961412
}
@@ -1411,7 +1427,7 @@ func (s *Scorch) RootBoltSnapshotEpochs() ([]uint64, error) {
14111427
func (s *Scorch) loadZapFileNames() (map[string]struct{}, error) {
14121428
rv := map[string]struct{}{}
14131429
err := s.rootBolt.View(func(tx *bolt.Tx) error {
1414-
snapshots := tx.Bucket(boltSnapshotsBucket)
1430+
snapshots := tx.Bucket(util.BoltSnapshotsBucket)
14151431
if snapshots == nil {
14161432
return nil
14171433
}
@@ -1423,14 +1439,14 @@ func (s *Scorch) loadZapFileNames() (map[string]struct{}, error) {
14231439
}
14241440
segc := snapshot.Cursor()
14251441
for segk, _ := segc.First(); segk != nil; segk, _ = segc.Next() {
1426-
if segk[0] == boltInternalKey[0] {
1442+
if segk[0] == util.BoltInternalKey[0] {
14271443
continue
14281444
}
14291445
segmentBucket := snapshot.Bucket(segk)
14301446
if segmentBucket == nil {
14311447
continue
14321448
}
1433-
pathBytes := segmentBucket.Get(boltPathKey)
1449+
pathBytes := segmentBucket.Get(util.BoltPathKey)
14341450
if pathBytes == nil {
14351451
continue
14361452
}

0 commit comments

Comments
 (0)