Skip to content

Commit 59a66df

Browse files
committed
refactor file transfer
1 parent 3bfb59e commit 59a66df

File tree

7 files changed

+7
-149
lines changed

7 files changed

+7
-149
lines changed

index.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,10 @@ func (b *Batch) Index(id string, data interface{}) error {
5151
eventIndex.FireIndexEvent()
5252
}
5353
doc := document.NewDocument(id)
54-
// fmt.Printf("data is before mapping %#v\n", data)
5554
err := b.index.Mapping().MapDocument(doc, data)
5655
if err != nil {
5756
return err
5857
}
59-
// fmt.Printf("data is after mapping %#v\n", doc)
6058
b.internal.Update(doc)
6159

6260
b.lastDocSize = uint64(doc.Size() +
@@ -355,11 +353,6 @@ type IndexCopyable interface {
355353
CopyTo(d index.Directory) error
356354
}
357355

358-
type IndexFileCopyable interface {
359-
UpdateFileInBolt(key []byte, value []byte) error
360-
CopyFile(file string, d index.IndexDirectory) error
361-
}
362-
363356
// FileSystemDirectory is the default implementation for the
364357
// index.Directory interface.
365358
type FileSystemDirectory string

index/scorch/merge.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,6 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
375375
atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1)
376376
prevBytesReadTotal := cumulateBytesRead(segmentsToMerge)
377377

378-
fmt.Println("files", files)
379378
newDocNums, _, err := s.segPlugin.MergeEx(segmentsToMerge, docsToDrop, path,
380379
cw.cancelCh, s, s.segmentConfig)
381380
atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1)
@@ -541,7 +540,6 @@ func (s *Scorch) mergeAndPersistInMemorySegments(snapshot *IndexSnapshot,
541540
filename := zapFileName(newSegmentID)
542541
path := s.path + string(os.PathSeparator) + filename
543542

544-
fmt.Println("version while merging", s.segPlugin.Version())
545543
// the newly merged segment is already flushed out to disk, just needs
546544
// to be opened using mmap.
547545
newDocIDs, _, err :=

index/scorch/persister.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,6 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot, po *persiste
425425
var totSize int
426426
var numSegsToFlushOut int
427427
var totDocs uint64
428-
fmt.Println("persister doing its thing")
429428
// legacy behaviour of merge + flush of all in-memory segments in one-shot
430429
if legacyFlushBehaviour(po.MaxSizeInMemoryMergePerWorker, po.NumPersisterWorkers) {
431430
val := &flushable{
@@ -887,12 +886,7 @@ func (s *Scorch) loadFromBolt() error {
887886
s.AddEligibleForRemoval(snapshotEpoch)
888887
continue
889888
}
890-
// fmt.Println("loadFromBolt key %s", k)
891-
// if k[0] == util.BoltCentroidIndexKey[0] {
892-
// fmt.Println("loadFromBolt centroid index key", string(k))
893889

894-
// continue
895-
// }
896890
snapshot := snapshots.Bucket(k)
897891
if snapshot == nil {
898892
log.Printf("snapshot key, but bucket missing %x, continuing", k)

index/scorch/scorch.go

Lines changed: 6 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
package scorch
1616

1717
import (
18-
"bytes"
1918
"encoding/json"
2019
"fmt"
21-
"io"
2220
"os"
2321
"path/filepath"
2422
"strings"
@@ -83,7 +81,7 @@ type Scorch struct {
8381
persisterNotifier chan *epochWatcher
8482
rootBolt *bolt.DB
8583
asyncTasks sync.WaitGroup
86-
// not a real searchable segment, singleton
84+
// not a real searchable segment
8785
centroidIndex *SegmentSnapshot
8886
train chan *trainRequest
8987

@@ -184,7 +182,6 @@ func NewScorch(storeName string,
184182
}
185183
}
186184

187-
// "pretraining": true
188185
segConfig, ok := config["segmentConfig"].(map[string]interface{})
189186
if ok {
190187
rv.segmentConfig = segConfig
@@ -592,7 +589,7 @@ func (s *Scorch) trainerLoop() {
592589
// some init stuff
593590
s.segmentConfig["getCentroidIndexCallback"] = s.getCentroidIndex
594591
var totalSamplesProcessed int
595-
filename := "centroid_index"
592+
filename := index.CentroidIndexFileName
596593
path := filepath.Join(s.path, filename)
597594
for {
598595
select {
@@ -620,7 +617,7 @@ func (s *Scorch) trainerLoop() {
620617
// persist in a tmp file and then rename - is that a fair strategy?
621618
s.segmentConfig["training"] = true
622619
_, _, err := s.segPlugin.MergeEx([]segment.Segment{s.centroidIndex.segment, sampleSeg},
623-
[]*roaring.Bitmap{nil, nil}, filepath.Join(s.path, "centroid_index.tmp"), s.closeCh, nil, s.segmentConfig)
620+
[]*roaring.Bitmap{nil, nil}, filepath.Join(s.path, index.CentroidIndexFileName+".tmp"), s.closeCh, nil, s.segmentConfig)
624621
if err != nil {
625622
trainReq.ackCh <- fmt.Errorf("error merging centroid index: %v", err)
626623
close(trainReq.ackCh)
@@ -630,7 +627,7 @@ func (s *Scorch) trainerLoop() {
630627

631628
// close the existing centroid segment - it's supposed to be gc'd at this point
632629
s.centroidIndex.segment.Close()
633-
err = moveFile(filepath.Join(s.path, "centroid_index.tmp"), filepath.Join(s.path, "centroid_index"))
630+
err = moveFile(filepath.Join(s.path, index.CentroidIndexFileName+".tmp"), filepath.Join(s.path, index.CentroidIndexFileName))
634631
if err != nil {
635632
trainReq.ackCh <- fmt.Errorf("error renaming centroid index: %v", err)
636633
close(trainReq.ackCh)
@@ -691,7 +688,7 @@ func (s *Scorch) trainerLoop() {
691688
}
692689

693690
// update the centroid index pointer
694-
centroidIndex, err := s.segPlugin.OpenEx(filepath.Join(s.path, "centroid_index"), s.segmentConfig)
691+
centroidIndex, err := s.segPlugin.OpenEx(filepath.Join(s.path, index.centroidIndexFileName), s.segmentConfig)
695692
if err != nil {
696693
trainReq.ackCh <- fmt.Errorf("error opening centroid index: %v", err)
697694
close(trainReq.ackCh)
@@ -709,8 +706,6 @@ func (s *Scorch) Train(batch *index.Batch) error {
709706
// regulate the Train function
710707
s.FireIndexEvent()
711708

712-
// batch.InternalOps
713-
714709
var trainData []index.Document
715710
for key, doc := range batch.IndexOps {
716711
if doc != nil {
@@ -732,7 +727,7 @@ func (s *Scorch) Train(batch *index.Batch) error {
732727
//
733728
// note: this might index text data too, how to handle this? s.segmentConfig?
734729
// todo: updates/deletes -> data drift detection
735-
seg, n, err := s.segPlugin.NewEx(trainData, s.segmentConfig)
730+
seg, _, err := s.segPlugin.NewEx(trainData, s.segmentConfig)
736731
if err != nil {
737732
return err
738733
}
@@ -749,7 +744,6 @@ func (s *Scorch) Train(batch *index.Batch) error {
749744
return err
750745
}
751746

752-
fmt.Println("number of bytes written to centroid index", n)
753747
return err
754748
}
755749

@@ -1212,88 +1206,6 @@ func (s *Scorch) CopyReader() index.CopyReader {
12121206
return rv
12131207
}
12141208

1215-
func (s *Scorch) UpdateFileInBolt(key []byte, value []byte) error {
1216-
tx, err := s.rootBolt.Begin(true)
1217-
if err != nil {
1218-
return err
1219-
}
1220-
defer func() {
1221-
if err != nil {
1222-
_ = tx.Rollback()
1223-
}
1224-
}()
1225-
1226-
snapshotsBucket, err := tx.CreateBucketIfNotExists(util.BoltSnapshotsBucket)
1227-
if err != nil {
1228-
return err
1229-
}
1230-
1231-
// currently this is specific to centroid index file update
1232-
if bytes.Equal(key, util.BoltCentroidIndexKey) {
1233-
// todo: guard against duplicate updates
1234-
centroidBucket, err := snapshotsBucket.CreateBucketIfNotExists(util.BoltCentroidIndexKey)
1235-
if err != nil {
1236-
return err
1237-
}
1238-
if centroidBucket == nil {
1239-
return fmt.Errorf("centroid bucket not found")
1240-
}
1241-
existingValue := centroidBucket.Get(util.BoltPathKey)
1242-
if existingValue != nil {
1243-
return fmt.Errorf("key already exists %v %v", s.path, string(existingValue))
1244-
}
1245-
1246-
err = centroidBucket.Put(util.BoltPathKey, value)
1247-
if err != nil {
1248-
return err
1249-
}
1250-
}
1251-
1252-
err = tx.Commit()
1253-
if err != nil {
1254-
return err
1255-
}
1256-
1257-
err = s.rootBolt.Sync()
1258-
if err != nil {
1259-
return err
1260-
}
1261-
1262-
return nil
1263-
}
1264-
1265-
// CopyFile copies a specific file to a destination directory which has an access to a bleve index
1266-
// doing a io.Copy() isn't enough because the file needs to be tracked in bolt file as well
1267-
func (s *Scorch) CopyFile(file string, d index.IndexDirectory) error {
1268-
s.rootLock.Lock()
1269-
defer s.rootLock.Unlock()
1270-
1271-
// this code is currently specific to centroid index file but is future proofed for other files
1272-
// to be updated in the dest's bolt
1273-
if strings.HasSuffix(file, "centroid_index") {
1274-
// centroid index file - this is outside the snapshots domain so the bolt update is different
1275-
err := d.UpdateFileInBolt(util.BoltCentroidIndexKey, []byte(file))
1276-
if err != nil {
1277-
return fmt.Errorf("error updating dest index bolt: %w", err)
1278-
}
1279-
}
1280-
1281-
dest, err := d.GetWriter(filepath.Join("store", file))
1282-
if err != nil {
1283-
return err
1284-
}
1285-
1286-
source, err := os.Open(filepath.Join(s.path, file))
1287-
if err != nil {
1288-
return err
1289-
}
1290-
1291-
defer source.Close()
1292-
defer dest.Close()
1293-
_, err = io.Copy(dest, source)
1294-
return err
1295-
}
1296-
12971209
// external API to fire a scorch event (EventKindIndexStart) externally from bleve
12981210
func (s *Scorch) FireIndexEvent() {
12991211
s.fireEvent(EventKindIndexStart, 0)

index/scorch/segment_plugin.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ func RegisterSegmentPlugin(plugin SegmentPlugin, makeDefault bool) {
101101
}
102102
supportedSegmentPlugins[plugin.Type()][plugin.Version()] = plugin
103103
if makeDefault {
104-
fmt.Println("registering default segment plugin", plugin.Type(), plugin.Version())
105104
defaultSegmentPlugin = plugin
106105
}
107106
}

index/scorch/snapshot_index.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ func init() {
6666
}
6767

6868
type IndexSnapshot struct {
69-
parent *Scorch
70-
69+
parent *Scorch
7170
segment []*SegmentSnapshot
7271
offsets []uint64
7372
internal map[string][]byte

index_impl.go

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -326,13 +326,11 @@ func (i *indexImpl) Index(id string, data interface{}) (err error) {
326326

327327
i.FireIndexEvent()
328328

329-
// fmt.Printf("data is %#v\n", data)
330329
doc := document.NewDocument(id)
331330
err = i.m.MapDocument(doc, data)
332331
if err != nil {
333332
return
334333
}
335-
// fmt.Printf("data is after mapping %#v\n", doc)
336334
err = i.i.Update(doc)
337335
return
338336
}
@@ -1432,39 +1430,6 @@ func (m *searchHitSorter) Less(i, j int) bool {
14321430
return c < 0
14331431
}
14341432

1435-
func (i *indexImpl) CopyFile(file string, d index.IndexDirectory) (err error) {
1436-
i.mutex.RLock()
1437-
defer i.mutex.RUnlock()
1438-
1439-
if !i.open {
1440-
return ErrorIndexClosed
1441-
}
1442-
1443-
copyIndex, ok := i.i.(index.IndexFileCopyable)
1444-
if !ok {
1445-
return fmt.Errorf("index implementation does not support copy reader")
1446-
}
1447-
1448-
return copyIndex.CopyFile(file, d)
1449-
}
1450-
1451-
func (i *indexImpl) UpdateFileInBolt(key []byte, value []byte) error {
1452-
i.mutex.RLock()
1453-
defer i.mutex.RUnlock()
1454-
1455-
if !i.open {
1456-
return ErrorIndexClosed
1457-
}
1458-
1459-
copyIndex, ok := i.i.(index.IndexFileCopyable)
1460-
if !ok {
1461-
return fmt.Errorf("index implementation does not support file copy")
1462-
}
1463-
1464-
return copyIndex.UpdateFileInBolt(key, value)
1465-
}
1466-
1467-
// CopyTo (index.Directory, filter)
14681433
func (i *indexImpl) CopyTo(d index.Directory) (err error) {
14691434
i.mutex.RLock()
14701435
defer i.mutex.RUnlock()
@@ -1478,8 +1443,6 @@ func (i *indexImpl) CopyTo(d index.Directory) (err error) {
14781443
return fmt.Errorf("index implementation does not support copy reader")
14791444
}
14801445

1481-
// copyIndex.Copy() -> copies the centroid index
1482-
14831446
copyReader := copyIndex.CopyReader()
14841447
if copyReader == nil {
14851448
return fmt.Errorf("index's copyReader is nil")

0 commit comments

Comments
 (0)