Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ab40135
MB-57888: Index Update
Likith101 Nov 26, 2024
7740e0f
MB-57888: New apis for index update
Likith101 Jan 9, 2025
62271b2
MB-57888: Added few missing checks for index update
Likith101 Jan 13, 2025
d295df8
MB-57888: Added unit tests covering all update paths
Likith101 Jan 16, 2025
2e96445
MB-57888: Minor Fixes
Likith101 Feb 4, 2025
0410448
MB-57888: Naming changes
Likith101 Feb 7, 2025
4c5dd0b
MB-57888: Added analyser and datetime parser checks
Likith101 Feb 19, 2025
53eebc8
MB-57888: Additional checks for synonyms
Likith101 Feb 20, 2025
d5c4489
MB-57888: Api changes
Likith101 Feb 21, 2025
41ffb3b
MB-57888: Minor Code Refactoring
Likith101 Mar 11, 2025
1611ac5
MB-57888: Addressing review comments
Likith101 Mar 11, 2025
a18fe6f
MB-57888: Fixed typo
Likith101 Mar 12, 2025
829d502
MB-57888: Addressing review comments
Likith101 Mar 14, 2025
b65b4aa
MB-57888: Added checks for all dynamic mappings' analysers and dateti…
Likith101 Mar 14, 2025
2ff5417
MB-57888: Adding documentation and addressing review comments
Likith101 Mar 20, 2025
3d6cf2f
MB-57888: Addressing review comments and race conditions
Likith101 Mar 21, 2025
00d75b2
Absorb dependent zapx, scorch_segment_api, bleve_index_api commits
abhinavdangeti Sep 12, 2025
3dadd2d
Merge branch 'master' into IndexUpdate
abhinavdangeti Sep 12, 2025
96663d4
Fix missing refactor for PutUpdatedFields -> SetUpdatedFields
abhinavdangeti Sep 12, 2025
9891276
Use zapx/v16's v16.2.5
abhinavdangeti Sep 12, 2025
a4fbb0c
Fixing few test issues
Likith101 Sep 17, 2025
7fcbf77
Minor Changes
Likith101 Sep 19, 2025
4fab6ba
Addressing review comments
Likith101 Sep 30, 2025
0f10286
Merge branch 'master' into IndexUpdate
abhinavdangeti Sep 30, 2025
61c5adc
Addressing review comments
Likith101 Oct 8, 2025
c0f8c16
Addressing review comments
Likith101 Oct 8, 2025
29ff859
Adding commentary
Likith101 Oct 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func newBuilder(path string, mapping mapping.IndexMapping, config map[string]int
return nil, err
}
config["internal"] = map[string][]byte{
string(mappingInternalKey): mappingBytes,
string(util.MappingInternalKey): mappingBytes,
}

// do not use real config, as these are options for the builder,
Expand Down
41 changes: 41 additions & 0 deletions docs/index_update.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Ability to reduce downtime during index mapping updates

* *v2.5.4* (and after) will come with support to delete or modify any field mapping in the index mapping without requiring a full rebuild of the index
* We do this by storing which portions of the field has to be deleted within zap and then lazily executing the deletion during subsequent merging of the segments

## Usage

While opening an index, if an updated mapping is provided as a string under the key `updated_mapping` within the `runtimeConfig` parameter of `OpenUsing`, then we open the index and try to update it to use the new mapping provided.

If the update fails, the index is unchanged and an error is returned explaining why the update was unsuccessful.

## What can be deleted and what can't be deleted?
Fields can be partially deleted by changing their Index, Store, and DocValues parameters from true to false, or completely removed by deleting the field itself.

Additionally, document mappings can be deleted either by fully removing them from the index mapping or by setting the Enabled value to false, which deletes all fields defined within that mapping.

However, if any of the following conditions are met, the index is considered non-updatable.
* Any additional fields or enabled document mappings in the new index mapping
* Any changes to IncludeInAll, type, IncludeTermVectors and SkipFreqNorm
* Any document mapping having it's enabled value changing from false to true
* Text fields with a different analyser or date time fields with a different date time format
* Vector and VectorBase64 fields changing dims, similarity or vectorIndexOptimizedFor
* Any changes when field is part of `_all`
* Full field deletions when it is covered by any dynamic setting (Index, Store or DocValues Dynamic)
* Any changes to dynamic settings at the top level or any enabled document mapping
* If multiple fields sharing the same field name either from different type mappings or aliases are present, then any non compatible changes across all of these fields

## How to enforce immediate deletion?
Since the deletion is only done during merging, a [force merge](https://github.com/blevesearch/bleve/blob/b82baf10b205511cf12da5cb24330abd9f5b1b74/index/scorch/merge.go#L164) may be used to completely remove the stale data.

## Sample code to update an existing index
```
newMapping := `<Updated Index Mapping>`
config := map[string]interface{}{
"updated_mapping": newMapping
}
index, err := OpenUsing("<Path to Index>", config)
if err != nil {
return err
}
```
2 changes: 2 additions & 0 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ func Open(path string) (Index, error) {
// The mapping used when it was created will be used for all Index/Search operations.
// The provided runtimeConfig can override settings
// persisted when the kvstore was created.
// If runtimeConfig has updated mapping, then an index update is attempted
// Throws an error without any changes to the index if an unupdatable mapping is provided
func OpenUsing(path string, runtimeConfig map[string]interface{}) (Index, error) {
return openIndexUsing(path, runtimeConfig)
}
Expand Down
6 changes: 6 additions & 0 deletions index/scorch/optimize_knn.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ func (o *OptimizeVR) Finish() error {
wg.Done()
}()
for field, vrs := range o.vrs {
// Early exit if the field is supposed to be completely deleted or
// if it's index data has been deleted
if info, ok := o.snapshot.updatedFields[field]; ok && (info.Deleted || info.Index) {
continue
}

vecIndex, err := segment.InterpretVectorIndex(field,
o.requiresFiltering, origSeg.deleted)
if err != nil {
Expand Down
110 changes: 63 additions & 47 deletions index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func persistToDirectory(seg segment.UnpersistedSegment, d index.Directory,
func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
segPlugin SegmentPlugin, exclude map[uint64]struct{}, d index.Directory) (
[]string, map[uint64]string, error) {
snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket)
snapshotsBucket, err := tx.CreateBucketIfNotExists(util.BoltSnapshotsBucket)
if err != nil {
return nil, nil, err
}
Expand All @@ -619,17 +619,17 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
}

// persist meta values
metaBucket, err := snapshotBucket.CreateBucketIfNotExists(boltMetaDataKey)
metaBucket, err := snapshotBucket.CreateBucketIfNotExists(util.BoltMetaDataKey)
if err != nil {
return nil, nil, err
}
err = metaBucket.Put(boltMetaDataSegmentTypeKey, []byte(segPlugin.Type()))
err = metaBucket.Put(util.BoltMetaDataSegmentTypeKey, []byte(segPlugin.Type()))
if err != nil {
return nil, nil, err
}
buf := make([]byte, binary.MaxVarintLen32)
binary.BigEndian.PutUint32(buf, segPlugin.Version())
err = metaBucket.Put(boltMetaDataSegmentVersionKey, buf)
err = metaBucket.Put(util.BoltMetaDataSegmentVersionKey, buf)
if err != nil {
return nil, nil, err
}
Expand All @@ -643,13 +643,13 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
if err != nil {
return nil, nil, err
}
err = metaBucket.Put(boltMetaDataTimeStamp, timeStampBinary)
err = metaBucket.Put(util.BoltMetaDataTimeStamp, timeStampBinary)
if err != nil {
return nil, nil, err
}

// persist internal values
internalBucket, err := snapshotBucket.CreateBucketIfNotExists(boltInternalKey)
internalBucket, err := snapshotBucket.CreateBucketIfNotExists(util.BoltInternalKey)
if err != nil {
return nil, nil, err
}
Expand All @@ -665,7 +665,7 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
val := make([]byte, 8)
bytesWritten := atomic.LoadUint64(&snapshot.parent.stats.TotBytesWrittenAtIndexTime)
binary.LittleEndian.PutUint64(val, bytesWritten)
err = internalBucket.Put(TotBytesWrittenKey, val)
err = internalBucket.Put(util.TotBytesWrittenKey, val)
if err != nil {
return nil, nil, err
}
Expand All @@ -689,7 +689,7 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
return nil, nil, fmt.Errorf("segment: %s copy err: %v", segPath, err)
}
filename := filepath.Base(segPath)
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
err = snapshotSegmentBucket.Put(util.BoltPathKey, []byte(filename))
if err != nil {
return nil, nil, err
}
Expand All @@ -705,7 +705,7 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
return nil, nil, fmt.Errorf("segment: %s persist err: %v", path, err)
}
newSegmentPaths[segmentSnapshot.id] = path
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
err = snapshotSegmentBucket.Put(util.BoltPathKey, []byte(filename))
if err != nil {
return nil, nil, err
}
Expand All @@ -721,7 +721,7 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
if err != nil {
return nil, nil, fmt.Errorf("error persisting roaring bytes: %v", err)
}
err = snapshotSegmentBucket.Put(boltDeletedKey, roaringBuf.Bytes())
err = snapshotSegmentBucket.Put(util.BoltDeletedKey, roaringBuf.Bytes())
if err != nil {
return nil, nil, err
}
Expand All @@ -733,7 +733,19 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
if err != nil {
return nil, nil, err
}
err = snapshotSegmentBucket.Put(boltStatsKey, b)
err = snapshotSegmentBucket.Put(util.BoltStatsKey, b)
if err != nil {
return nil, nil, err
}
}

// store updated field info
if segmentSnapshot.updatedFields != nil {
b, err := json.Marshal(segmentSnapshot.updatedFields)
if err != nil {
return nil, nil, err
}
err = snapshotSegmentBucket.Put(util.BoltUpdatedFieldsKey, b)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -832,22 +844,9 @@ func zapFileName(epoch uint64) string {

// bolt snapshot code

var (
boltSnapshotsBucket = []byte{'s'}
boltPathKey = []byte{'p'}
boltDeletedKey = []byte{'d'}
boltInternalKey = []byte{'i'}
boltMetaDataKey = []byte{'m'}
boltMetaDataSegmentTypeKey = []byte("type")
boltMetaDataSegmentVersionKey = []byte("version")
boltMetaDataTimeStamp = []byte("timeStamp")
boltStatsKey = []byte("stats")
TotBytesWrittenKey = []byte("TotBytesWritten")
)

func (s *Scorch) loadFromBolt() error {
err := s.rootBolt.View(func(tx *bolt.Tx) error {
snapshots := tx.Bucket(boltSnapshotsBucket)
snapshots := tx.Bucket(util.BoltSnapshotsBucket)
if snapshots == nil {
return nil
}
Expand Down Expand Up @@ -912,7 +911,7 @@ func (s *Scorch) loadFromBolt() error {
// NOTE: this is currently ONLY intended to be used by the command-line tool
func (s *Scorch) LoadSnapshot(epoch uint64) (rv *IndexSnapshot, err error) {
err = s.rootBolt.View(func(tx *bolt.Tx) error {
snapshots := tx.Bucket(boltSnapshotsBucket)
snapshots := tx.Bucket(util.BoltSnapshotsBucket)
if snapshots == nil {
return nil
}
Expand Down Expand Up @@ -940,14 +939,14 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
// first we look for the meta-data bucket, this will tell us
// which segment type/version was used for this snapshot
// all operations for this scorch will use this type/version
metaBucket := snapshot.Bucket(boltMetaDataKey)
metaBucket := snapshot.Bucket(util.BoltMetaDataKey)
if metaBucket == nil {
_ = rv.DecRef()
return nil, fmt.Errorf("meta-data bucket missing")
}
segmentType := string(metaBucket.Get(boltMetaDataSegmentTypeKey))
segmentType := string(metaBucket.Get(util.BoltMetaDataSegmentTypeKey))
segmentVersion := binary.BigEndian.Uint32(
metaBucket.Get(boltMetaDataSegmentVersionKey))
metaBucket.Get(util.BoltMetaDataSegmentVersionKey))
err := s.loadSegmentPlugin(segmentType, segmentVersion)
if err != nil {
_ = rv.DecRef()
Expand All @@ -957,7 +956,7 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
var running uint64
c := snapshot.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
if k[0] == boltInternalKey[0] {
if k[0] == util.BoltInternalKey[0] {
internalBucket := snapshot.Bucket(k)
if internalBucket == nil {
_ = rv.DecRef()
Expand All @@ -972,11 +971,11 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
_ = rv.DecRef()
return nil, err
}
} else if k[0] != boltMetaDataKey[0] {
} else if k[0] != util.BoltMetaDataKey[0] {
segmentBucket := snapshot.Bucket(k)
if segmentBucket == nil {
_ = rv.DecRef()
return nil, fmt.Errorf("segment key, but bucket missing % x", k)
return nil, fmt.Errorf("segment key, but bucket missing %x", k)
}
segmentSnapshot, err := s.loadSegment(segmentBucket)
if err != nil {
Expand All @@ -990,53 +989,70 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
}
rv.segment = append(rv.segment, segmentSnapshot)
rv.offsets = append(rv.offsets, running)
// Merge all segment level updated field info for use during queries
if segmentSnapshot.updatedFields != nil {
rv.MergeUpdateFieldsInfo(segmentSnapshot.updatedFields)
}
running += segmentSnapshot.segment.Count()
}
}
return rv, nil
}

func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, error) {
pathBytes := segmentBucket.Get(boltPathKey)
pathBytes := segmentBucket.Get(util.BoltPathKey)
if pathBytes == nil {
return nil, fmt.Errorf("segment path missing")
}
segmentPath := s.path + string(os.PathSeparator) + string(pathBytes)
segment, err := s.segPlugin.Open(segmentPath)
seg, err := s.segPlugin.Open(segmentPath)
if err != nil {
return nil, fmt.Errorf("error opening bolt segment: %v", err)
}

rv := &SegmentSnapshot{
segment: segment,
segment: seg,
cachedDocs: &cachedDocs{cache: nil},
cachedMeta: &cachedMeta{meta: nil},
}
deletedBytes := segmentBucket.Get(boltDeletedKey)
deletedBytes := segmentBucket.Get(util.BoltDeletedKey)
if deletedBytes != nil {
deletedBitmap := roaring.NewBitmap()
r := bytes.NewReader(deletedBytes)
_, err := deletedBitmap.ReadFrom(r)
if err != nil {
_ = segment.Close()
_ = seg.Close()
return nil, fmt.Errorf("error reading deleted bytes: %v", err)
}
if !deletedBitmap.IsEmpty() {
rv.deleted = deletedBitmap
}
}
statBytes := segmentBucket.Get(boltStatsKey)
statBytes := segmentBucket.Get(util.BoltStatsKey)
if statBytes != nil {
var statsMap map[string]map[string]uint64

err := json.Unmarshal(statBytes, &statsMap)
stats := &fieldStats{statMap: statsMap}
if err != nil {
_ = segment.Close()
_ = seg.Close()
return nil, fmt.Errorf("error reading stat bytes: %v", err)
}
rv.stats = stats
}
updatedFieldBytes := segmentBucket.Get(util.BoltUpdatedFieldsKey)
if updatedFieldBytes != nil {
var updatedFields map[string]*index.UpdateFieldInfo

err := json.Unmarshal(updatedFieldBytes, &updatedFields)
if err != nil {
_ = seg.Close()
return nil, fmt.Errorf("error reading updated field bytes: %v", err)
}
rv.updatedFields = updatedFields
// Set the value within the segment base for use during merge
rv.UpdateFieldsInfo(rv.updatedFields)
}

return rv, nil
}
Expand Down Expand Up @@ -1215,7 +1231,7 @@ func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) {
}
}()

snapshots := tx.Bucket(boltSnapshotsBucket)
snapshots := tx.Bucket(util.BoltSnapshotsBucket)
if snapshots == nil {
return 0, nil
}
Expand Down Expand Up @@ -1325,7 +1341,7 @@ func (s *Scorch) rootBoltSnapshotMetaData() ([]*snapshotMetaData, error) {
expirationDuration := time.Duration(s.numSnapshotsToKeep-1) * s.rollbackSamplingInterval

err := s.rootBolt.View(func(tx *bolt.Tx) error {
snapshots := tx.Bucket(boltSnapshotsBucket)
snapshots := tx.Bucket(util.BoltSnapshotsBucket)
if snapshots == nil {
return nil
}
Expand All @@ -1349,11 +1365,11 @@ func (s *Scorch) rootBoltSnapshotMetaData() ([]*snapshotMetaData, error) {
if snapshot == nil {
continue
}
metaBucket := snapshot.Bucket(boltMetaDataKey)
metaBucket := snapshot.Bucket(util.BoltMetaDataKey)
if metaBucket == nil {
continue
}
timeStampBytes := metaBucket.Get(boltMetaDataTimeStamp)
timeStampBytes := metaBucket.Get(util.BoltMetaDataTimeStamp)
var timeStamp time.Time
err = timeStamp.UnmarshalText(timeStampBytes)
if err != nil {
Expand Down Expand Up @@ -1390,7 +1406,7 @@ func (s *Scorch) rootBoltSnapshotMetaData() ([]*snapshotMetaData, error) {
func (s *Scorch) RootBoltSnapshotEpochs() ([]uint64, error) {
var rv []uint64
err := s.rootBolt.View(func(tx *bolt.Tx) error {
snapshots := tx.Bucket(boltSnapshotsBucket)
snapshots := tx.Bucket(util.BoltSnapshotsBucket)
if snapshots == nil {
return nil
}
Expand All @@ -1411,7 +1427,7 @@ func (s *Scorch) RootBoltSnapshotEpochs() ([]uint64, error) {
func (s *Scorch) loadZapFileNames() (map[string]struct{}, error) {
rv := map[string]struct{}{}
err := s.rootBolt.View(func(tx *bolt.Tx) error {
snapshots := tx.Bucket(boltSnapshotsBucket)
snapshots := tx.Bucket(util.BoltSnapshotsBucket)
if snapshots == nil {
return nil
}
Expand All @@ -1423,14 +1439,14 @@ func (s *Scorch) loadZapFileNames() (map[string]struct{}, error) {
}
segc := snapshot.Cursor()
for segk, _ := segc.First(); segk != nil; segk, _ = segc.Next() {
if segk[0] == boltInternalKey[0] {
if segk[0] == util.BoltInternalKey[0] {
continue
}
segmentBucket := snapshot.Bucket(segk)
if segmentBucket == nil {
continue
}
pathBytes := segmentBucket.Get(boltPathKey)
pathBytes := segmentBucket.Get(util.BoltPathKey)
if pathBytes == nil {
continue
}
Expand Down
Loading