Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,6 @@ func (e *Engine) Close() error {
return e.valueStore.Close()
}

func (e *Engine) Iter() (indexer.Iterator, error) {
return e.valueStore.Iter()
}

func (e *Engine) updateCacheStats() {
st := e.resultCache.Stats()
var prevStats *cache.Stats
Expand Down
16 changes: 0 additions & 16 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,11 @@ type Interface interface {
// Close gracefully closes the store flushing all pending data from memory,
Close() error

// Iter creates a new value store iterator.
Iter() (Iterator, error)

// Stats returns statistical information about the indexed values.
// If unsupported by the backing store, ErrStatsNotSupported is returned.
Stats() (*Stats, error)
}

// Iterator iterates multihashes and values in the value store. Any write
// operation invalidates the iterator.
type Iterator interface {
// Next returns the next multihash and the value it indexer. Returns io.EOF
// when finished iterating.
Next() (multihash.Multihash, []Value, error)

// Close closes the iterator releasing any resources that may be occupied by it.
// The iterator will no longer be usable after a call to this function and is
// discarded.
Close() error
}

// Stats provides statistics about the indexed values.
type Stats struct {
// MultihashCount is the number of unique multihashes indexed.
Expand Down
4 changes: 0 additions & 4 deletions store/dhstore/dhstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,6 @@ func (s *dhStore) Flush() error { return nil }

func (s *dhStore) Close() error { return nil }

func (s *dhStore) Iter() (indexer.Iterator, error) {
return nil, ErrNotSupported
}

func (s *dhStore) Stats() (*indexer.Stats, error) {
return nil, nil
}
Expand Down
32 changes: 0 additions & 32 deletions store/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"bytes"
"context"
"errors"
"io"
"iter"
"strings"
"sync"

Expand All @@ -31,11 +29,6 @@ type memoryStore struct {
mutex sync.Mutex
}

type memoryIter struct {
iterNext func() (string, []*indexer.Value, bool)
iterStop func()
}

var _ indexer.Interface = (*memoryStore)(nil)

// New creates a new indexer.Interface implemented by a radixtree-based value
Expand Down Expand Up @@ -188,14 +181,6 @@ func (s *memoryStore) Flush() error { return nil }

func (s *memoryStore) Close() error { return nil }

func (s *memoryStore) Iter() (indexer.Iterator, error) {
next, stop := iter.Pull2[string, []*indexer.Value](s.rtree.Iter())
return &memoryIter{
iterNext: next,
iterStop: stop,
}, nil
}

func (s *memoryStore) Stats() (*indexer.Stats, error) {
var count uint64
s.mutex.Lock()
Expand All @@ -207,23 +192,6 @@ func (s *memoryStore) Stats() (*indexer.Stats, error) {
}, nil
}

func (it *memoryIter) Next() (multihash.Multihash, []indexer.Value, error) {
key, vals, ok := it.iterNext()
if !ok {
return nil, nil, io.EOF
}
values := make([]indexer.Value, len(vals))
for i, v := range vals {
values[i] = *v
}
return multihash.Multihash(key), values, nil
}

func (it *memoryIter) Close() error {
it.iterStop()
return nil
}

func (s *memoryStore) get(k string) ([]*indexer.Value, bool) {
// Search current cache
return s.rtree.Get(k)
Expand Down
129 changes: 14 additions & 115 deletions store/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"errors"
"io"
"slices"
"time"

Expand All @@ -28,34 +27,22 @@ var (
log = logging.Logger("store/pebble")

_ indexer.Interface = (*store)(nil)
_ indexer.Iterator = (*iterator)(nil)
)

type (
store struct {
db *pebble.DB
// Only support binary format since in pebble we need the capability to merge keys and
// there is little reason for store values in any format other than binary for performance
// characteristics.
// Note, pebble is using a zero-copy variation of marshaller to allow optimizations in
// cases where the value need not to be copied. The root level binary codec copies on
// unmarshal every time.
vcodec *codec
p *pool
closed bool
comparer *pebble.Comparer
metricsCancel context.CancelFunc
}
iterator struct {
closed bool
snapshot *pebble.Snapshot
it *pebble.Iterator
vcodec *codec
p *pool
start *key
end *key
}
)
type store struct {
db *pebble.DB
// Only support binary format since in pebble we need the capability to merge keys and
// there is little reason for store values in any format other than binary for performance
// characteristics.
// Note, pebble is using a zero-copy variation of marshaller to allow optimizations in
// cases where the value need not to be copied. The root level binary codec copies on
// unmarshal every time.
vcodec *codec
p *pool
closed bool
comparer *pebble.Comparer
metricsCancel context.CancelFunc
}

// New instantiates a new instance of a store backed by Pebble.
// Note that any Merger value specified in the given options will be overridden.
Expand Down Expand Up @@ -316,91 +303,3 @@ func (s *store) Stats() (*indexer.Stats, error) {
}
return &stats, nil
}

func (s *store) Iter() (indexer.Iterator, error) {
keygen := s.p.leaseBlake3Keyer()
start, end, err := keygen.multihashesKeyRange()
if err != nil {
_ = keygen.Close()
return nil, err
}
_ = keygen.Close()
snapshot := s.db.NewSnapshot()
iter, err := snapshot.NewIter(&pebble.IterOptions{
LowerBound: start.buf,
UpperBound: end.buf,
})
if err != nil {
return nil, err
}
iter.First()
return &iterator{
snapshot: snapshot,
it: iter,
vcodec: s.vcodec,
start: start,
end: end,
p: s.p,
}, nil
}

func (i *iterator) Next() (multihash.Multihash, []indexer.Value, error) {
switch {
case i.it.Error() != nil:
return nil, nil, i.it.Error()
case !i.it.Valid():
return nil, nil, io.EOF
}
keygen := i.p.leaseBlake3Keyer()
mhk := i.p.leaseKey()
mhk.append(i.it.Key()...)
mh, err := keygen.keyToMultihash(mhk)
_ = mhk.Close()
_ = keygen.Close()
if err != nil {
return nil, nil, err
}

// We don't need to copy the value since it is only used for fetching the
// indexer.Values, and not returned to the caller.
vks, err := i.vcodec.unmarshalValueKeys(i.it.Value())
if err != nil {
return nil, nil, err
}
defer vks.Close()
vs := make([]indexer.Value, len(vks.keys))
for j, vk := range vks.keys {
bv, c, err := i.snapshot.Get(vk.buf)
if err != nil {
return nil, nil, err
}

v, err := i.vcodec.unmarshalValue(bv)
_ = c.Close()
if err != nil {
return nil, nil, err
}
vs[j] = *v
}
i.it.Next()
return mh, vs, err
}

func (i *iterator) Close() error {
_ = i.start.Close()
_ = i.end.Close()
// Check if closed already and do not re-call, since pebble
// panics if snapshot is closed more than once.
if i.closed {
return nil
}
serr := i.snapshot.Close()
ierr := i.it.Close()
i.closed = true
// Prioritise returning the iterator closure error. Because, that error
// is more likely to be meaningful to the caller.
if ierr != nil {
return ierr
}
return serr
}
47 changes: 0 additions & 47 deletions store/test/tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package test

import (
"context"
"io"
"sync"
"testing"

Expand Down Expand Up @@ -131,52 +130,6 @@ func E2ETest(t *testing.T, s indexer.Interface) {
t.Fatal("Got wrong value for single multihash")
}

// Iterate values
t.Log("Iterating values")
var indexCount int
seen := make(map[string]struct{})
iter, err := s.Iter()
if err != nil {
t.Fatal(err)
}
defer iter.Close()

for {
m, _, err := iter.Next()
if err != nil {
if err == io.EOF {
break
}
t.Fatalf("Iteration error: %s", err)
}

mb58 := m.B58String()
t.Logf("Visited: %s", mb58)
_, already := seen[mb58]
if already {
t.Errorf("Error: multihash already seen: %q", mb58)
} else {
seen[mb58] = struct{}{}
}
indexCount++
}
t.Logf("Visited %d multihashes", indexCount)
if indexCount != len(batch)+1 {
t.Errorf("Wrong iteration count: expected %d, got %d", len(batch)+1, indexCount)
}
for i := range batch {
b58 := batch[i].B58String()
_, ok := seen[b58]
if !ok {
t.Fatalf("Did not iterate multihash %s", b58)
}
}

_, _, err = iter.Next()
if err != io.EOF {
t.Fatal("caling iter.Next() after iteration finished should yield same result")
}

// Get a key that is not set
t.Log("Get non-existing key")
_, found, err = s.Get(noadd)
Expand Down
Loading