Skip to content

Commit fb40760

Browse files
committed
MB-57888: WIP: Index Update
1 parent 902051d commit fb40760

File tree

6 files changed

+487
-12
lines changed

6 files changed

+487
-12
lines changed

index/scorch/persister.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,18 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
619619
return nil, nil, err
620620
}
621621
}
622+
623+
// store updated field info
624+
if segmentSnapshot.updatedFields != nil {
625+
b, err := json.Marshal(segmentSnapshot.updatedFields)
626+
if err != nil {
627+
return nil, nil, err
628+
}
629+
err = snapshotSegmentBucket.Put(boltUpdatedFieldsKey, b)
630+
if err != nil {
631+
return nil, nil, err
632+
}
633+
}
622634
}
623635

624636
return filenames, newSegmentPaths, nil
@@ -722,6 +734,7 @@ var boltMetaDataSegmentTypeKey = []byte("type")
722734
var boltMetaDataSegmentVersionKey = []byte("version")
723735
var boltMetaDataTimeStamp = []byte("timeStamp")
724736
var boltStatsKey = []byte("stats")
737+
var boltUpdatedFieldsKey = []byte("fields")
725738
var TotBytesWrittenKey = []byte("TotBytesWritten")
726739

727740
func (s *Scorch) loadFromBolt() error {
@@ -860,6 +873,9 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
860873
}
861874
rv.segment = append(rv.segment, segmentSnapshot)
862875
rv.offsets = append(rv.offsets, running)
876+
if segmentSnapshot.updatedFields != nil {
877+
rv.updatedFields = segmentSnapshot.updatedFields
878+
}
863879
running += segmentSnapshot.segment.Count()
864880
}
865881
}
@@ -872,13 +888,13 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro
872888
return nil, fmt.Errorf("segment path missing")
873889
}
874890
segmentPath := s.path + string(os.PathSeparator) + string(pathBytes)
875-
segment, err := s.segPlugin.Open(segmentPath)
891+
seg, err := s.segPlugin.Open(segmentPath)
876892
if err != nil {
877893
return nil, fmt.Errorf("error opening bolt segment: %v", err)
878894
}
879895

880896
rv := &SegmentSnapshot{
881-
segment: segment,
897+
segment: seg,
882898
cachedDocs: &cachedDocs{cache: nil},
883899
cachedMeta: &cachedMeta{meta: nil},
884900
}
@@ -888,7 +904,7 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro
888904
r := bytes.NewReader(deletedBytes)
889905
_, err := deletedBitmap.ReadFrom(r)
890906
if err != nil {
891-
_ = segment.Close()
907+
_ = seg.Close()
892908
return nil, fmt.Errorf("error reading deleted bytes: %v", err)
893909
}
894910
if !deletedBitmap.IsEmpty() {
@@ -902,11 +918,22 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro
902918
err := json.Unmarshal(statBytes, &statsMap)
903919
stats := &fieldStats{statMap: statsMap}
904920
if err != nil {
905-
_ = segment.Close()
921+
_ = seg.Close()
906922
return nil, fmt.Errorf("error reading stat bytes: %v", err)
907923
}
908924
rv.stats = stats
909925
}
926+
updatedFieldBytes := segmentBucket.Get(boltUpdatedFieldsKey)
927+
if updatedFieldBytes != nil {
928+
var updatedFields map[string]index.FieldInfo
929+
930+
err := json.Unmarshal(updatedFieldBytes, &updatedFields)
931+
if err != nil {
932+
_ = seg.Close()
933+
return nil, fmt.Errorf("error reading updated field bytes: %v", err)
934+
}
935+
rv.updatedFields = updatedFields
936+
}
910937

911938
return rv, nil
912939
}

index/scorch/scorch.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package scorch
1717
import (
1818
"encoding/json"
1919
"fmt"
20+
"log"
2021
"os"
2122
"path/filepath"
2223
"sync"
@@ -36,6 +37,8 @@ const Version uint8 = 2
3637

3738
var ErrClosed = fmt.Errorf("scorch closed")
3839

40+
var mappingInternalKey = []byte("_mapping")
41+
3942
type Scorch struct {
4043
nextSegmentID uint64
4144
stats Stats
@@ -882,3 +885,71 @@ func (s *Scorch) CopyReader() index.CopyReader {
882885
func (s *Scorch) FireIndexEvent() {
883886
s.fireEvent(EventKindIndexStart, 0)
884887
}
888+
889+
func (s *Scorch) UpdateFields(fieldInfo map[string]*index.FieldInfo, mappingBytes []byte) error {
890+
err := s.updateBolt(fieldInfo, mappingBytes)
891+
if err != nil {
892+
return err
893+
}
894+
return nil
895+
}
896+
897+
func (s *Scorch) updateBolt(fieldInfo map[string]*index.FieldInfo, mappingBytes []byte) error {
898+
return s.rootBolt.Update(func(tx *bolt.Tx) error {
899+
snapshots := tx.Bucket(boltSnapshotsBucket)
900+
if snapshots == nil {
901+
return nil
902+
}
903+
904+
c := snapshots.Cursor()
905+
for k, _ := c.Last(); k != nil; k, _ = c.Prev() {
906+
_, _, err := decodeUvarintAscending(k)
907+
if err != nil {
908+
log.Printf("unable to parse segment epoch %x, continuing", k)
909+
continue
910+
}
911+
snapshot := snapshots.Bucket(k)
912+
cc := snapshot.Cursor()
913+
for kk, _ := cc.First(); kk != nil; kk, _ = c.Next() {
914+
if k[0] == boltInternalKey[0] {
915+
internalBucket := snapshot.Bucket(k)
916+
if internalBucket == nil {
917+
return fmt.Errorf("segment key, but bucket missing % x", k)
918+
}
919+
err = internalBucket.Put(mappingInternalKey, mappingBytes)
920+
if err != nil {
921+
return err
922+
}
923+
} else if k[0] != boltMetaDataKey[0] {
924+
segmentBucket := snapshot.Bucket(k)
925+
if segmentBucket == nil {
926+
return fmt.Errorf("segment key, but bucket missing % x", k)
927+
}
928+
var updatedFields map[string]index.FieldInfo
929+
updatedFieldBytes := segmentBucket.Get(boltUpdatedFieldsKey)
930+
if updatedFieldBytes != nil {
931+
err := json.Unmarshal(updatedFieldBytes, &updatedFields)
932+
if err != nil {
933+
return fmt.Errorf("error reading updated field bytes: %v", err)
934+
}
935+
} else {
936+
updatedFields = make(map[string]index.FieldInfo)
937+
}
938+
for field, info := range fieldInfo {
939+
updatedFields[field] = *info
940+
}
941+
b, err := json.Marshal(updatedFields)
942+
if err != nil {
943+
return err
944+
}
945+
err = segmentBucket.Put(boltUpdatedFieldsKey, b)
946+
if err != nil {
947+
return err
948+
}
949+
}
950+
}
951+
}
952+
953+
return nil
954+
})
955+
}

index/scorch/snapshot_index.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ type IndexSnapshot struct {
8989

9090
m2 sync.Mutex // Protects the fields that follow.
9191
fieldTFRs map[string][]*IndexSnapshotTermFieldReader // keyed by field, recycled TFR's
92+
93+
updatedFields map[string]index.FieldInfo
9294
}
9395

9496
func (i *IndexSnapshot) Segments() []*SegmentSnapshot {
@@ -441,6 +443,10 @@ func (is *IndexSnapshot) Document(id string) (rv index.Document, err error) {
441443
// Keeping that TODO for now until we have a cleaner way.
442444
rvd.StoredFieldsSize += uint64(len(val))
443445

446+
if info, ok := is.updatedFields[name]; ok &&
447+
(info.All || info.Store) {
448+
return true
449+
}
444450
// copy value, array positions to preserve them beyond the scope of this callback
445451
value := append([]byte(nil), val...)
446452
arrayPos := append([]uint64(nil), pos...)
@@ -580,7 +586,15 @@ func (is *IndexSnapshot) TermFieldReader(ctx context.Context, term []byte, field
580586
segBytesRead := s.segment.BytesRead()
581587
rv.incrementBytesRead(segBytesRead)
582588
}
583-
dict, err := s.segment.Dictionary(field)
589+
590+
var dict segment.TermDictionary
591+
var err error
592+
if info, ok := is.updatedFields[field]; ok &&
593+
(info.Index || info.All) {
594+
dict = nil
595+
} else {
596+
dict, err = s.segment.Dictionary(field)
597+
}
584598
if err != nil {
585599
return nil, err
586600
}
@@ -712,14 +726,24 @@ func (is *IndexSnapshot) documentVisitFieldTermsOnSegment(
712726
}
713727
}
714728

729+
var filteredFields []string
730+
for _, field := range vFields {
731+
if info, ok := is.updatedFields[field]; ok &&
732+
(info.DocValues || info.All) {
733+
continue
734+
} else {
735+
filteredFields = append(filteredFields, field)
736+
}
737+
}
738+
715739
var errCh chan error
716740

717741
// cFields represents the fields that we'll need from the
718742
// cachedDocs, and might be optionally be provided by the caller,
719743
// if the caller happens to know we're on the same segmentIndex
720744
// from a previous invocation
721745
if cFields == nil {
722-
cFields = subtractStrings(fields, vFields)
746+
cFields = subtractStrings(fields, filteredFields)
723747

724748
if !ss.cachedDocs.hasFields(cFields) {
725749
errCh = make(chan error, 1)

index/scorch/snapshot_segment.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,13 @@ type SegmentSnapshot struct {
3535
// segment was mmaped recently, in which case
3636
// we consider the loading cost of the metadata
3737
// as part of IO stats.
38-
mmaped uint32
39-
id uint64
40-
segment segment.Segment
41-
deleted *roaring.Bitmap
42-
creator string
43-
stats *fieldStats
38+
mmaped uint32
39+
id uint64
40+
segment segment.Segment
41+
deleted *roaring.Bitmap
42+
creator string
43+
stats *fieldStats
44+
updatedFields map[string]index.FieldInfo
4445

4546
cachedMeta *cachedMeta
4647

index_impl.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,25 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde
163163
storeConfig = map[string]interface{}{}
164164
}
165165

166+
var um *mapping.IndexMappingImpl
167+
var umBytes []byte
168+
166169
storeConfig["path"] = indexStorePath(path)
167170
storeConfig["create_if_missing"] = false
168171
storeConfig["error_if_exists"] = false
169172
for rck, rcv := range runtimeConfig {
173+
if rck == "mapping" {
174+
if val, ok := rcv.([]byte); ok {
175+
err = util.UnmarshalJSON(val, &um)
176+
if err != nil {
177+
return nil, fmt.Errorf("error parsing updated mapping JSON: %v\nmapping contents:\n%s", err, val)
178+
}
179+
umBytes = val
180+
} else {
181+
return nil, fmt.Errorf("error typecasting updated mapping JSON\nmapping contents: %v", rcv)
182+
}
183+
continue
184+
}
170185
storeConfig[rck] = rcv
171186
}
172187

@@ -225,6 +240,28 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde
225240
return rv, err
226241
}
227242

243+
if um != nil {
244+
ui, ok := rv.i.(index.UpdateIndex)
245+
if !ok {
246+
return rv, fmt.Errorf("updated mapping present for unupdatable index")
247+
}
248+
249+
err = um.Validate()
250+
if err != nil {
251+
return rv, err
252+
}
253+
254+
fieldInfo, err := deletedFields(im, um)
255+
if err != nil {
256+
return rv, err
257+
}
258+
259+
err = ui.UpdateFields(fieldInfo, umBytes)
260+
if err != nil {
261+
return rv, err
262+
}
263+
}
264+
228265
rv.m = im
229266
indexStats.Register(rv)
230267
return rv, err

0 commit comments

Comments
 (0)