From 8e971228b3538025b6c7e88ff6dce278fa6a0b18 Mon Sep 17 00:00:00 2001 From: Stefano Scafiti Date: Fri, 1 Mar 2024 23:26:36 +0100 Subject: [PATCH 1/7] Prevent access to deleted entries --- embedded/document/engine.go | 6 +- embedded/document/engine_test.go | 4 +- embedded/store/deleted_entries_test.go | 146 +++++++++++++++++++++++++ embedded/store/immustore.go | 53 +++++++-- embedded/store/indexer.go | 16 +-- embedded/store/indexer_test.go | 10 +- embedded/store/key_reader.go | 13 ++- embedded/store/tx.go | 1 + embedded/tbtree/history_reader.go | 2 +- embedded/tbtree/reader.go | 4 +- embedded/tbtree/snapshot.go | 16 +-- embedded/tbtree/snapshot_test.go | 10 +- embedded/tbtree/tbtree.go | 137 ++++++++++++----------- embedded/tbtree/tbtree_test.go | 54 ++++----- pkg/database/database.go | 9 +- pkg/database/database_test.go | 22 +++- 16 files changed, 362 insertions(+), 141 deletions(-) create mode 100644 embedded/store/deleted_entries_test.go diff --git a/embedded/document/engine.go b/embedded/document/engine.go index 97e066ea28..008c876f38 100644 --- a/embedded/document/engine.go +++ b/embedded/document/engine.go @@ -5,7 +5,7 @@ SPDX-License-Identifier: BUSL-1.1 you may not use this file except in compliance with the License. You may obtain a copy of the License at - https://mariadb.com/bsl11/ + https://mariadb.com/bsl11/ Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -1107,6 +1107,10 @@ func (e *Engine) AuditDocument(ctx context.Context, collectionName string, docID for _, valRef := range valRefs { docAtRevision, err := e.getDocument(searchKey, valRef, includePayload) + if errors.Is(err, store.ErrValueDeleted) { + continue + } + if err != nil { return nil, err } diff --git a/embedded/document/engine_test.go b/embedded/document/engine_test.go index 4be3987a28..aafee660d9 100644 --- a/embedded/document/engine_test.go +++ b/embedded/document/engine_test.go @@ -5,7 +5,7 @@ SPDX-License-Identifier: BUSL-1.1 you may not use this file except in compliance with the License. You may obtain a copy of the License at - https://mariadb.com/bsl11/ + https://mariadb.com/bsl11/ Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -628,7 +628,7 @@ func TestDocumentAudit(t *testing.T) { res, err = engine.AuditDocument(context.Background(), collectionName, docID, false, 0, 10, true) require.NoError(t, err) - require.Len(t, res, 3) + require.Len(t, res, 1) } func TestQueryDocuments(t *testing.T) { diff --git a/embedded/store/deleted_entries_test.go b/embedded/store/deleted_entries_test.go new file mode 100644 index 0000000000..1b075d2e17 --- /dev/null +++ b/embedded/store/deleted_entries_test.go @@ -0,0 +1,146 @@ +package store + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDeletedEntryIsNotAccesibleFromReadEntry(t *testing.T) { + var key = []byte("test-key") + + immuStore, err := Open(t.TempDir(), DefaultOptions()) + require.NoError(t, err) + require.NotNil(t, immuStore) + + defer immustoreClose(t, immuStore) + + setKeys(t, immuStore, key, 1) + + entry, _, err := immuStore.ReadTxEntry(1, key, true) + require.NoError(t, err) + + val, err := immuStore.ReadValue(entry) + require.NoError(t, err) + require.Equal(t, val, []byte("test-value-0")) + + deleteKey(t, immuStore, key) + + val, err = immuStore.ReadValue(entry) + require.ErrorIs(t, err, ErrValueDeleted) + require.Nil(t, val) +} + +func TestDeletedEntriesAreNotAccessibleFromGetBetween(t *testing.T) { + var ( + nRecordsBeforeDelete = 100 + key = []byte("test-key") + ) + + immuStore, err := Open(t.TempDir(), DefaultOptions()) + require.NoError(t, err) + require.NotNil(t, immuStore) + + defer immustoreClose(t, immuStore) + + setKeys(t, immuStore, key, nRecordsBeforeDelete) + deleteKey(t, immuStore, key) + + for i := 0; i < nRecordsBeforeDelete; i++ { + valRef, err := immuStore.GetBetween(context.Background(), key, 0, uint64(i)) + require.NoError(t, err) + + val, err := valRef.Resolve() + require.Nil(t, val) + require.ErrorIs(t, err, ErrValueDeleted) + } + + valRef, err := immuStore.GetBetween(context.Background(), key, 0, uint64(nRecordsBeforeDelete+1)) + require.NoError(t, err) + + val, err := valRef.Resolve() + require.NoError(t, err) + require.Nil(t, val) + + md := valRef.KVMetadata() + require.NotNil(t, md) + require.True(t, md.Deleted()) +} + +func TestDeletedEntriesAreNotAccessibleFromHistory(t *testing.T) { + var ( + nRecordsBeforeDelete = 100 + nRecordsAfterDelete = 10 + key = []byte("test-key") + ) + + immuStore, err := Open(t.TempDir(), DefaultOptions()) + require.NoError(t, err) + require.NotNil(t, immuStore) + + defer immustoreClose(t, immuStore) + + setKeys(t, immuStore, key, nRecordsBeforeDelete) + + valRefs, _, err := immuStore.History(key, 0, false, nRecordsBeforeDelete) + require.NoError(t, err) + + assertValuesAreVisible(t, valRefs) + + deleteKey(t, immuStore, key) + setKeys(t, immuStore, key, nRecordsAfterDelete) + + // requested deleted records + non deleted records + valRefs, _, err = immuStore.History(key, uint64(nRecordsBeforeDelete/2), false, nRecordsBeforeDelete+1+nRecordsAfterDelete) + require.NoError(t, err) + + assertValuesAreDeleted(t, valRefs[:(nRecordsBeforeDelete/2)]) + + value, err := valRefs[nRecordsBeforeDelete/2].Resolve() + require.NoError(t, err) + require.Nil(t, value) + + assertValuesAreVisible(t, valRefs[(nRecordsBeforeDelete/2+1):]) +} + +func assertValuesAreVisible(t *testing.T, valRefs []ValueRef) { + for i, valRef := range valRefs { + value, err := valRef.Resolve() + require.NoError(t, err) + require.Equal(t, []byte(fmt.Sprintf("test-value-%d", i)), value) + } +} + +func assertValuesAreDeleted(t *testing.T, valRefs []ValueRef) { + for i := 0; i < len(valRefs)-1; i++ { + value, err := valRefs[i].Resolve() + require.Nil(t, value) + require.ErrorIs(t, err, ErrValueDeleted) + } +} + +func setKeys(t *testing.T, store *ImmuStore, key []byte, nRecords int) { + for i := 0; i < nRecords; i++ { + tx, err := store.NewWriteOnlyTx(context.Background()) + require.NoError(t, err) + + err = tx.Set(key, nil, []byte(fmt.Sprintf("test-value-%d", i))) + require.NoError(t, err) + + _, err = tx.Commit(context.Background()) + require.NoError(t, err) + } +} + +func deleteKey(t *testing.T, store *ImmuStore, key []byte) { + tx, err := store.NewTx(context.Background(), DefaultTxOptions()) + require.NoError(t, err) + + err = tx.Delete(context.Background(), key) + require.NoError(t, err) + + _, err = tx.Commit(context.Background()) + require.NoError(t, err) +} diff --git a/embedded/store/immustore.go b/embedded/store/immustore.go index 0903a13717..23b9c690ea 100644 --- a/embedded/store/immustore.go +++ b/embedded/store/immustore.go @@ -917,12 +917,20 @@ func (s *ImmuStore) GetBetween(ctx context.Context, key []byte, initialTxID uint return nil, err } - indexedVal, tx, hc, err := indexer.GetBetween(key, initialTxID, finalTxID) + indexedVal, tx, lastDeleteAtTx, hc, err := indexer.GetBetween(key, initialTxID, finalTxID) if err != nil { return nil, err } - return s.valueRefFrom(tx, hc, indexedVal) + valRef, err = s.valueRefFrom(tx, hc, indexedVal) + if err != nil { + return nil, err + } + + if lastDeleteAtTx > finalTxID { + return &deletedValueRef{ValueRef: valRef}, nil + } + return valRef, nil } func (s *ImmuStore) Get(ctx context.Context, key []byte) (valRef ValueRef, err error) { @@ -938,7 +946,7 @@ func (s *ImmuStore) GetWithFilters(ctx context.Context, key []byte, filters ...F return nil, err } - indexedVal, tx, hc, err := indexer.Get(key) + indexedVal, tx, _, hc, err := indexer.Get(key) if err != nil { return nil, err } @@ -1010,11 +1018,10 @@ func (s *ImmuStore) History(key []byte, offset uint64, descOrder bool, limit int if errors.Is(err, ErrIndexNotFound) { return nil, 0, ErrKeyNotFound } - return nil, 0, err } - timedValues, hCount, err := indexer.History(key, offset, descOrder, limit) + timedValues, lastDeleteAtTx, hCount, err := indexer.History(key, offset, descOrder, limit) if err != nil { return nil, 0, err } @@ -1041,6 +1048,15 @@ func (s *ImmuStore) History(key []byte, offset uint64, descOrder bool, limit int } } + for i, valRef := range valRefs { + md := valRef.KVMetadata() + isExpired := md != nil && md.ExpiredAt(time.Now()) + isDeleted := lastDeleteAtTx > valRef.Tx() + + if isDeleted || isExpired { + valRefs[i] = &deletedValueRef{ValueRef: valRef} + } + } return valRefs, hCount, nil } @@ -3063,7 +3079,7 @@ func (s *ImmuStore) ReadTxEntry(txID uint64, key []byte, skipIntegrityCheck bool return nil, nil, err } - e := &TxEntry{k: make([]byte, s.maxKeyLen)} + e := &TxEntry{txID: txID, k: make([]byte, s.maxKeyLen)} for i := 0; i < header.NEntries; i++ { err = tdr.readEntry(e) @@ -3078,7 +3094,7 @@ func (s *ImmuStore) ReadTxEntry(txID uint64, key []byte, skipIntegrityCheck bool ret = e // Allocate new placeholder for scanning the rest of entries - e = &TxEntry{k: make([]byte, s.maxKeyLen)} + e = &TxEntry{txID: txID, k: make([]byte, s.maxKeyLen)} } } if ret == nil { @@ -3113,6 +3129,25 @@ func (s *ImmuStore) ReadValue(entry *TxEntry) ([]byte, error) { return nil, ErrExpiredEntry } + indexer, err := s.getIndexerFor(entry.key()) + if err != nil && !errors.Is(err, ErrIndexNotFound) { + return nil, err + } + + if !errors.Is(err, ErrIndexNotFound) { + _, _, lastDeleteAtTx, _, err := indexer.Get(entry.key()) + if err != nil { + if errors.Is(err, ErrKeyNotFound) { + return nil, nil + } + return nil, err + } + + if lastDeleteAtTx > entry.txID { + return nil, ErrValueDeleted + } + } + if entry.vLen == 0 { // while not required, nil is returned instead of an empty slice @@ -3124,11 +3159,9 @@ func (s *ImmuStore) ReadValue(entry *TxEntry) ([]byte, error) { b := make([]byte, entry.vLen) - _, err := s.readValueAt(b, entry.vOff, entry.hVal, false) - if err != nil { + if _, err := s.readValueAt(b, entry.vOff, entry.hVal, false); err != nil { return nil, err } - return b, nil } diff --git a/embedded/store/indexer.go b/embedded/store/indexer.go index 178869f373..95ed35c75f 100644 --- a/embedded/store/indexer.go +++ b/embedded/store/indexer.go @@ -196,34 +196,34 @@ func (idx *indexer) SyncSnapshot() (*tbtree.Snapshot, error) { return idx.index.SyncSnapshot() } -func (idx *indexer) Get(key []byte) (value []byte, tx uint64, hc uint64, err error) { +func (idx *indexer) Get(key []byte) (value []byte, tx uint64, lastDeleteAtTx uint64, hc uint64, err error) { idx.rwmutex.RLock() defer idx.rwmutex.RUnlock() if idx.closed { - return nil, 0, 0, ErrAlreadyClosed + return nil, 0, 0, 0, ErrAlreadyClosed } return idx.index.Get(key) } -func (idx *indexer) GetBetween(key []byte, initialTxID uint64, finalTxID uint64) (value []byte, tx uint64, hc uint64, err error) { +func (idx *indexer) GetBetween(key []byte, initialTxID uint64, finalTxID uint64) (value []byte, tx uint64, lastDeleteAtTs uint64, hc uint64, err error) { idx.rwmutex.RLock() defer idx.rwmutex.RUnlock() if idx.closed { - return nil, 0, 0, ErrAlreadyClosed + return nil, 0, 0, 0, ErrAlreadyClosed } return idx.index.GetBetween(key, initialTxID, finalTxID) } -func (idx *indexer) History(key []byte, offset uint64, descOrder bool, limit int) (timedValues []tbtree.TimedValue, hCount uint64, err error) { +func (idx *indexer) History(key []byte, offset uint64, descOrder bool, limit int) (timedValues []tbtree.TimedValue, lastDeleteAtTx uint64, hCount uint64, err error) { idx.rwmutex.RLock() defer idx.rwmutex.RUnlock() if idx.closed { - return nil, 0, ErrAlreadyClosed + return nil, 0, 0, ErrAlreadyClosed } return idx.index.History(key, offset, descOrder, limit) @@ -566,6 +566,7 @@ func (idx *indexer) indexSince(txID uint64) error { idx._kvs[indexableEntries].K = targetKey idx._kvs[indexableEntries].V = b[:n] idx._kvs[indexableEntries].T = txID + uint64(i) + idx._kvs[indexableEntries].Deleted = e.md != nil && e.md.Deleted() indexableEntries++ @@ -584,7 +585,7 @@ func (idx *indexer) indexSince(txID uint64) error { } // the previous entry as of txID must be deleted from the target index - _, prevTxID, _, err := sourceIndexer.index.GetBetween(sourceKey, 1, txID-1) + _, prevTxID, _, _, err := sourceIndexer.index.GetBetween(sourceKey, 1, txID-1) if err == nil { prevEntry, prevTxHdr, err := idx.store.ReadTxEntry(prevTxID, e.key(), false) if err != nil { @@ -635,6 +636,7 @@ func (idx *indexer) indexSince(txID uint64) error { idx._kvs[indexableEntries].K = targetPrevKey idx._kvs[indexableEntries].V = b[:n] idx._kvs[indexableEntries].T = txID + uint64(i) + idx._kvs[indexableEntries].Deleted = prevEntry.md != nil && prevEntry.md.Deleted() indexableEntries++ } else if !errors.Is(err, ErrKeyNotFound) { diff --git a/embedded/store/indexer_test.go b/embedded/store/indexer_test.go index c0bb302a43..d6a9baabc2 100644 --- a/embedded/store/indexer_test.go +++ b/embedded/store/indexer_test.go @@ -47,13 +47,13 @@ func TestClosedIndexerFailures(t *testing.T) { err = indexer.Close() require.NoError(t, err) - v, tx, hc, err := indexer.Get(nil) + v, tx, _, hc, err := indexer.Get(nil) require.Zero(t, v) require.Zero(t, tx) require.Zero(t, hc) require.ErrorIs(t, err, ErrAlreadyClosed) - txs, hCount, err := indexer.History(nil, 0, false, 0) + txs, _, hCount, err := indexer.History(nil, 0, false, 0) require.Zero(t, txs) require.Zero(t, hCount) require.ErrorIs(t, err, ErrAlreadyClosed) @@ -190,11 +190,11 @@ func TestClosedIndexer(t *testing.T) { var err error dummy := []byte("dummy") - _, _, _, err = i.Get(dummy) + _, _, _, _, err = i.Get(dummy) assert.Error(t, err) assert.ErrorIs(t, err, ErrAlreadyClosed) - _, _, err = i.History(dummy, 0, false, 0) + _, _, _, err = i.History(dummy, 0, false, 0) assert.Error(t, err) assert.ErrorIs(t, err, ErrAlreadyClosed) @@ -206,7 +206,7 @@ func TestClosedIndexer(t *testing.T) { assert.Error(t, err) assert.ErrorIs(t, err, ErrAlreadyClosed) - _, _, _, err = i.GetBetween(dummy, 1, 2) + _, _, _, _, err = i.GetBetween(dummy, 1, 2) assert.ErrorIs(t, err, ErrAlreadyClosed) _, _, _, _, err = i.GetWithPrefix(dummy, dummy) diff --git a/embedded/store/key_reader.go b/embedded/store/key_reader.go index 2e31a44fb7..e3a25daff4 100644 --- a/embedded/store/key_reader.go +++ b/embedded/store/key_reader.go @@ -20,12 +20,15 @@ import ( "context" "crypto/sha256" "encoding/binary" + "errors" "fmt" "time" "github.com/codenotary/immudb/embedded/tbtree" ) +var ErrValueDeleted = errors.New("value has been deleted") + type Snapshot struct { st *ImmuStore prefix []byte @@ -165,7 +168,7 @@ func (s *Snapshot) GetWithPrefixAndFilters(ctx context.Context, prefix []byte, n } func (s *Snapshot) History(key []byte, offset uint64, descOrder bool, limit int) (valRefs []ValueRef, hCount uint64, err error) { - timedValues, hCount, err := s.snap.History(key, offset, descOrder, limit) + timedValues, _, hCount, err := s.snap.History(key, offset, descOrder, limit) if err != nil { return nil, 0, err } @@ -258,6 +261,14 @@ type valueRef struct { st *ImmuStore } +type deletedValueRef struct { + ValueRef +} + +func (ref *deletedValueRef) Resolve() (val []byte, err error) { + return nil, ErrValueDeleted +} + func (st *ImmuStore) valueRefFrom(tx, hc uint64, indexedVal []byte) (ValueRef, error) { // vLen + vOff + vHash const valrLen = lszSize + offsetSize + sha256.Size diff --git a/embedded/store/tx.go b/embedded/store/tx.go index 6323b2c775..af730bfc06 100644 --- a/embedded/store/tx.go +++ b/embedded/store/tx.go @@ -630,6 +630,7 @@ func (t *txDataReader) buildAndValidateHtree(htree *htree.HTree) error { } type TxEntry struct { + txID uint64 // NOTE: this is only set by ReadTxEntry() method to prevent that deleted records can be read k []byte kLen int md *KVMetadata diff --git a/embedded/tbtree/history_reader.go b/embedded/tbtree/history_reader.go index 7dba145215..def79fa353 100644 --- a/embedded/tbtree/history_reader.go +++ b/embedded/tbtree/history_reader.go @@ -58,7 +58,7 @@ func (r *HistoryReader) Read() ([]TimedValue, error) { return nil, ErrAlreadyClosed } - timedValues, _, err := r.snapshot.History(r.key, r.offset, r.descOrder, r.readLimit) + timedValues, _, _, err := r.snapshot.History(r.key, r.offset, r.descOrder, r.readLimit) if err != nil { return nil, err } diff --git a/embedded/tbtree/reader.go b/embedded/tbtree/reader.go index d71a9ea25c..3dfe7cb5cf 100644 --- a/embedded/tbtree/reader.go +++ b/embedded/tbtree/reader.go @@ -153,7 +153,7 @@ func (r *Reader) ReadBetween(initialTs, finalTs uint64) (key []byte, value []byt continue } - value, ts, hc, err := leafValue.lastUpdateBetween(r.snapshot.t.hLog, initialTs, finalTs) + value, ts, _, hc, err := leafValue.lastUpdateBetween(r.snapshot.t.hLog, initialTs, finalTs) if err == nil { return cp(leafValue.key), cp(value), ts, hc, nil } @@ -265,7 +265,7 @@ func (r *Reader) Read() (key []byte, value []byte, ts, hc uint64, err error) { return cp(leafValue.key), cp(leafValue.timedValue().Value), leafValue.timedValue().Ts, leafValue.historyCount(), nil } - tvs, hc, err := r.leafValue.history(r.leafValue.key, uint64(r.hoff), r.descOrder, 1, r.leafNode.t.hLog) + tvs, _, hc, err := r.leafValue.history(r.leafValue.key, uint64(r.hoff), r.descOrder, 1, r.leafNode.t.hLog) if errors.Is(err, ErrNoMoreEntries) { r.leafValue = nil r.hoff = 0 diff --git a/embedded/tbtree/snapshot.go b/embedded/tbtree/snapshot.go index ef758e82dc..000feffbf6 100644 --- a/embedded/tbtree/snapshot.go +++ b/embedded/tbtree/snapshot.go @@ -121,7 +121,7 @@ func (s *Snapshot) Get(key []byte) (value []byte, ts uint64, hc uint64, err erro } // Delegate the retrieval to the root node - v, ts, hc, err := s.root.get(key) + v, ts, _, hc, err := s.root.get(key) return cp(v), ts, hc, err } @@ -137,34 +137,34 @@ func (s *Snapshot) GetBetween(key []byte, initialTs, finalTs uint64) (value []by return nil, 0, 0, ErrIllegalArguments } - v, ts, hc, err := s.root.getBetween(key, initialTs, finalTs) + v, ts, _, hc, err := s.root.getBetween(key, initialTs, finalTs) return cp(v), ts, hc, err } // History retrieves the history of a key in the snapshot. // It locks the snapshot for reading, and delegates the history retrieval to the root node. -// The method returns an array of timestamps, the hash count, and an error. +// The method returns an array of timestamps, the id of the last transaction at which the key was deleted, the hash count, and an error. // Example usage: // -// timestamps, hashCount, err := snapshot.History([]byte("key"), 0, true, 10) -func (s *Snapshot) History(key []byte, offset uint64, descOrder bool, limit int) (timedValues []TimedValue, hCount uint64, err error) { +// timestamps, lastDeleteAtTx, hashCount, err := snapshot.History([]byte("key"), 0, true, 10) +func (s *Snapshot) History(key []byte, offset uint64, descOrder bool, limit int) (timedValues []TimedValue, lastDeleteTs uint64, hCount uint64, err error) { // Acquire a read lock on the snapshot s.mutex.RLock() defer s.mutex.RUnlock() // Check if the snapshot is closed if s.closed { - return nil, 0, ErrAlreadyClosed + return nil, 0, 0, ErrAlreadyClosed } // Check if the key argument is nil if key == nil { - return nil, 0, ErrIllegalArguments + return nil, 0, 0, ErrIllegalArguments } // Check if the limit argument is less than 1 if limit < 1 { - return nil, 0, ErrIllegalArguments + return nil, 0, 0, ErrIllegalArguments } // Delegate the history retrieval to the root node diff --git a/embedded/tbtree/snapshot_test.go b/embedded/tbtree/snapshot_test.go index 4060b9917c..8db9b81a18 100644 --- a/embedded/tbtree/snapshot_test.go +++ b/embedded/tbtree/snapshot_test.go @@ -57,10 +57,10 @@ func TestSnapshotSerialization(t *testing.T) { _, _, _, err = snapshot.GetBetween(nil, 1, 2) require.ErrorIs(t, err, ErrIllegalArguments) - _, _, err = snapshot.History(nil, 0, false, 1) + _, _, _, err = snapshot.History(nil, 0, false, 1) require.ErrorIs(t, err, ErrIllegalArguments) - _, _, err = snapshot.History([]byte{}, 0, false, 0) + _, _, _, err = snapshot.History([]byte{}, 0, false, 0) require.ErrorIs(t, err, ErrIllegalArguments) err = snapshot.Close() @@ -110,7 +110,7 @@ func TestSnapshotClosing(t *testing.T) { _, _, _, err = snapshot.GetBetween([]byte{}, 1, 1) require.ErrorIs(t, err, ErrAlreadyClosed) - _, _, err = snapshot.History([]byte{}, 0, false, 1) + _, _, _, err = snapshot.History([]byte{}, 0, false, 1) require.ErrorIs(t, err, ErrAlreadyClosed) _, _, _, _, err = snapshot.GetWithPrefix([]byte{}, nil) @@ -231,10 +231,10 @@ func TestSnapshotIsolation(t *testing.T) { _, _, _, err = snap2.GetBetween([]byte("key1_snap1"), 1, snap2.Ts()) require.ErrorIs(t, err, ErrKeyNotFound) - _, _, _, err = tbtree.Get([]byte("key1_snap1")) + _, _, _, _, err = tbtree.Get([]byte("key1_snap1")) require.ErrorIs(t, err, ErrKeyNotFound) - _, _, _, err = tbtree.Get([]byte("key1_snap2")) + _, _, _, _, err = tbtree.Get([]byte("key1_snap2")) require.ErrorIs(t, err, ErrKeyNotFound) err = snap1.Close() diff --git a/embedded/tbtree/tbtree.go b/embedded/tbtree/tbtree.go index c21ef661ea..9fbdff5007 100644 --- a/embedded/tbtree/tbtree.go +++ b/embedded/tbtree/tbtree.go @@ -225,9 +225,9 @@ type pathNode struct { type node interface { insert(kvts []*KVT) ([]node, int, error) - get(key []byte) (value []byte, ts uint64, hc uint64, err error) - getBetween(key []byte, initialTs, finalTs uint64) (value []byte, ts uint64, hc uint64, err error) - history(key []byte, offset uint64, descOrder bool, limit int) ([]TimedValue, uint64, error) + get(key []byte) (value []byte, ts uint64, lastDeleteAtTs uint64, hc uint64, err error) + getBetween(key []byte, initialTs, finalTs uint64) (value []byte, ts uint64, lastDeleteAtTs uint64, hc uint64, err error) + history(key []byte, offset uint64, descOrder bool, limit int) ([]TimedValue, uint64, uint64, error) findLeafNode(seekKey []byte, path path, offset int, neqKey []byte, descOrder bool) (path, *leafNode, int, error) minKey() []byte ts() uint64 @@ -277,10 +277,11 @@ type nodeRef struct { } type leafValue struct { - key []byte - timedValues []TimedValue - hOff int64 - hCount uint64 + key []byte + timedValues []TimedValue + hOff int64 + hCount uint64 + lastDeleteAtTs uint64 } type TimedValue struct { @@ -932,51 +933,51 @@ func (t *TBtree) readLeafNodeFrom(r *appendable.Reader) (*leafNode, error) { return l, nil } -func (t *TBtree) Get(key []byte) (value []byte, ts uint64, hc uint64, err error) { +func (t *TBtree) Get(key []byte) (value []byte, ts uint64, lastDeleteAtTs uint64, hc uint64, err error) { t.rwmutex.RLock() defer t.rwmutex.RUnlock() if t.closed { - return nil, 0, 0, ErrAlreadyClosed + return nil, 0, 0, 0, ErrAlreadyClosed } if key == nil { - return nil, 0, 0, ErrIllegalArguments + return nil, 0, 0, 0, ErrIllegalArguments } - v, ts, hc, err := t.root.get(key) - return cp(v), ts, hc, err + v, ts, lastDeleteAtTs, hc, err := t.root.get(key) + return cp(v), ts, lastDeleteAtTs, hc, err } -func (t *TBtree) GetBetween(key []byte, initialTs, finalTs uint64) (value []byte, ts uint64, hc uint64, err error) { +func (t *TBtree) GetBetween(key []byte, initialTs, finalTs uint64) (value []byte, ts uint64, lastDeleteAtTx uint64, hc uint64, err error) { t.rwmutex.RLock() defer t.rwmutex.RUnlock() if t.closed { - return nil, 0, 0, ErrAlreadyClosed + return nil, 0, 0, 0, ErrAlreadyClosed } if key == nil { - return nil, 0, 0, ErrIllegalArguments + return nil, 0, 0, 0, ErrIllegalArguments } return t.root.getBetween(key, initialTs, finalTs) } -func (t *TBtree) History(key []byte, offset uint64, descOrder bool, limit int) (tvs []TimedValue, hCount uint64, err error) { +func (t *TBtree) History(key []byte, offset uint64, descOrder bool, limit int) (tvs []TimedValue, lastDeleteAtTs uint64, hCount uint64, err error) { t.rwmutex.RLock() defer t.rwmutex.RUnlock() if t.closed { - return nil, 0, ErrAlreadyClosed + return nil, 0, 0, ErrAlreadyClosed } if key == nil { - return nil, 0, ErrIllegalArguments + return nil, 0, 0, ErrIllegalArguments } if limit < 1 { - return nil, 0, ErrIllegalArguments + return nil, 0, 0, ErrIllegalArguments } return t.root.history(key, offset, descOrder, limit) @@ -1579,9 +1580,10 @@ func (t *TBtree) IncreaseTs(ts uint64) error { } type KVT struct { - K []byte - V []byte - T uint64 + K []byte + V []byte + T uint64 + Deleted bool } func (t *TBtree) lock() { @@ -1613,7 +1615,6 @@ func (t *TBtree) Insert(key []byte, value []byte) error { func (t *TBtree) BulkInsert(kvts []*KVT) error { t.lock() defer t.unlock() - return t.bulkInsert(kvts) } @@ -1663,9 +1664,10 @@ func (t *TBtree) bulkInsert(kvts []*KVT) error { } immutableKVTs[i] = &KVT{ - K: k, - V: v, - T: t, + K: k, + V: v, + T: t, + Deleted: kvt.Deleted, } if t > newTs { @@ -1945,15 +1947,15 @@ func (n *innerNode) updateOnInsert(kvts []*KVT) (nodes []node, depth int, err er return nodes, depth + 1, nil } -func (n *innerNode) get(key []byte) (value []byte, ts uint64, hc uint64, err error) { +func (n *innerNode) get(key []byte) (value []byte, ts uint64, lastDeleteAtTs uint64, hc uint64, err error) { return n.nodes[n.indexOf(key)].get(key) } -func (n *innerNode) getBetween(key []byte, initialTs, finalTs uint64) (value []byte, ts uint64, hc uint64, err error) { +func (n *innerNode) getBetween(key []byte, initialTs, finalTs uint64) (value []byte, ts uint64, lastDeleteAtTs uint64, hc uint64, err error) { return n.nodes[n.indexOf(key)].getBetween(key, initialTs, finalTs) } -func (n *innerNode) history(key []byte, offset uint64, descOrder bool, limit int) ([]TimedValue, uint64, error) { +func (n *innerNode) history(key []byte, offset uint64, descOrder bool, limit int) ([]TimedValue, uint64, uint64, error) { return n.nodes[n.indexOf(key)].history(key, offset, descOrder, limit) } @@ -2153,26 +2155,26 @@ func (r *nodeRef) insert(kvts []*KVT) (nodes []node, depth int, err error) { return n.insert(kvts) } -func (r *nodeRef) get(key []byte) (value []byte, ts uint64, hc uint64, err error) { +func (r *nodeRef) get(key []byte) (value []byte, ts uint64, lastDeleteAtTs uint64, hc uint64, err error) { n, err := r.t.nodeAt(r.off, true) if err != nil { - return nil, 0, 0, err + return nil, 0, 0, 0, err } return n.get(key) } -func (r *nodeRef) getBetween(key []byte, initialTs, finalTs uint64) (value []byte, ts uint64, hc uint64, err error) { +func (r *nodeRef) getBetween(key []byte, initialTs, finalTs uint64) (value []byte, ts uint64, lastDeleteAtTs uint64, hc uint64, err error) { n, err := r.t.nodeAt(r.off, true) if err != nil { - return nil, 0, 0, err + return nil, 0, 0, 0, err } return n.getBetween(key, initialTs, finalTs) } -func (r *nodeRef) history(key []byte, offset uint64, descOrder bool, limit int) ([]TimedValue, uint64, error) { +func (r *nodeRef) history(key []byte, offset uint64, descOrder bool, limit int) ([]TimedValue, uint64, uint64, error) { n, err := r.t.nodeAt(r.off, true) if err != nil { - return nil, 0, err + return nil, 0, 0, err } return n.history(key, offset, descOrder, limit) } @@ -2268,17 +2270,24 @@ func (l *leafNode) updateOnInsert(kvts []*KVT) (nodes []node, depth int, err err if kvt.T > lv.timedValue().Ts { lv.timedValues = append([]TimedValue{{Value: kvt.V, Ts: kvt.T}}, lv.timedValues...) + if kvt.Deleted { + lv.lastDeleteAtTs = kvt.T + } } } else { values := make([]*leafValue, len(l.values)+1) copy(values, l.values[:i]) - values[i] = &leafValue{ + lf := &leafValue{ key: kvt.K, timedValues: []TimedValue{{Value: kvt.V, Ts: kvt.T}}, } + if kvt.Deleted { + lf.lastDeleteAtTs = kvt.T + } + values[i] = lf copy(values[i+1:], l.values[i:]) l.values = values @@ -2294,24 +2303,24 @@ func (l *leafNode) updateOnInsert(kvts []*KVT) (nodes []node, depth int, err err return nodes, 1, err } -func (l *leafNode) get(key []byte) (value []byte, ts uint64, hc uint64, err error) { +func (l *leafNode) get(key []byte) (value []byte, ts uint64, lastDeleteAtTx uint64, hc uint64, err error) { i, found := l.indexOf(key) if !found { - return nil, 0, 0, ErrKeyNotFound + return nil, 0, 0, 0, ErrKeyNotFound } leafValue := l.values[i] timedValue := leafValue.timedValue() - return timedValue.Value, timedValue.Ts, leafValue.historyCount(), nil + return timedValue.Value, timedValue.Ts, leafValue.lastDeleteAtTs, leafValue.historyCount(), nil } -func (l *leafNode) getBetween(key []byte, initialTs, finalTs uint64) (value []byte, ts uint64, hc uint64, err error) { +func (l *leafNode) getBetween(key []byte, initialTs, finalTs uint64) (value []byte, ts uint64, lastDeleteAtTs uint64, hc uint64, err error) { i, found := l.indexOf(key) if !found { - return nil, 0, 0, ErrKeyNotFound + return nil, 0, 0, 0, ErrKeyNotFound } leafValue := l.values[i] @@ -2319,11 +2328,11 @@ func (l *leafNode) getBetween(key []byte, initialTs, finalTs uint64) (value []by return leafValue.lastUpdateBetween(l.t.hLog, initialTs, finalTs) } -func (l *leafNode) history(key []byte, offset uint64, desc bool, limit int) ([]TimedValue, uint64, error) { +func (l *leafNode) history(key []byte, offset uint64, desc bool, limit int) ([]TimedValue, uint64, uint64, error) { i, found := l.indexOf(key) if !found { - return nil, 0, ErrKeyNotFound + return nil, 0, 0, ErrKeyNotFound } leafValue := l.values[i] @@ -2331,15 +2340,15 @@ func (l *leafNode) history(key []byte, offset uint64, desc bool, limit int) ([]T return leafValue.history(key, offset, desc, limit, l.t.hLog) } -func (lv *leafValue) history(key []byte, offset uint64, desc bool, limit int, hLog appendable.Appendable) ([]TimedValue, uint64, error) { +func (lv *leafValue) history(key []byte, offset uint64, desc bool, limit int, hLog appendable.Appendable) ([]TimedValue, uint64, uint64, error) { hCount := lv.historyCount() if offset == hCount { - return nil, 0, ErrNoMoreEntries + return nil, 0, 0, ErrNoMoreEntries } if offset > hCount { - return nil, 0, ErrOffsetOutOfRange + return nil, 0, 0, ErrOffsetOutOfRange } timedValuesLen := limit @@ -2377,24 +2386,24 @@ func (lv *leafValue) history(key []byte, offset uint64, desc bool, limit int, hL hc, err := r.ReadUint32() if err != nil { - return nil, 0, err + return nil, 0, 0, err } for i := 0; i < int(hc) && tssOff < timedValuesLen; i++ { valueLen, err := r.ReadUint16() if err != nil { - return nil, 0, err + return nil, 0, 0, err } value := make([]byte, valueLen) _, err = r.Read(value) if err != nil { - return nil, 0, err + return nil, 0, 0, err } ts, err := r.ReadUint64() if err != nil { - return nil, 0, err + return nil, 0, 0, err } if ti < initAt { @@ -2413,13 +2422,13 @@ func (lv *leafValue) history(key []byte, offset uint64, desc bool, limit int, hL prevOff, err := r.ReadUint64() if err != nil { - return nil, 0, err + return nil, 0, 0, err } hOff = int64(prevOff) } - return timedValues, hCount, nil + return timedValues, lv.lastDeleteAtTs, hCount, nil } func (l *leafNode) findLeafNode(seekKey []byte, path path, _ int, neqKey []byte, descOrder bool) (path, *leafNode, int, error) { @@ -2623,18 +2632,18 @@ func (lv *leafValue) size() int { return 16 + len(lv.key) + len(lv.timedValue().Value) } -func (lv *leafValue) lastUpdateBetween(hLog appendable.Appendable, initialTs, finalTs uint64) (value []byte, ts uint64, hc uint64, err error) { +func (lv *leafValue) lastUpdateBetween(hLog appendable.Appendable, initialTs, finalTs uint64) (value []byte, ts uint64, lastDeleteAtTs uint64, hc uint64, err error) { if initialTs > finalTs { - return nil, 0, 0, ErrIllegalArguments + return nil, 0, 0, 0, ErrIllegalArguments } for i, tv := range lv.timedValues { if tv.Ts < initialTs { - return nil, 0, 0, ErrKeyNotFound + return nil, 0, 0, 0, ErrKeyNotFound } if finalTs == 0 || tv.Ts <= finalTs { - return tv.Value, tv.Ts, lv.historyCount() - uint64(i), nil + return tv.Value, tv.Ts, lv.lastDeleteAtTs, lv.historyCount() - uint64(i), nil } } @@ -2646,32 +2655,32 @@ func (lv *leafValue) lastUpdateBetween(hLog appendable.Appendable, initialTs, fi hc, err := r.ReadUint32() if err != nil { - return nil, 0, 0, err + return nil, 0, 0, 0, err } for j := 0; j < int(hc); j++ { valueLen, err := r.ReadUint16() if err != nil { - return nil, 0, 0, err + return nil, 0, 0, 0, err } value := make([]byte, valueLen) _, err = r.Read(value) if err != nil { - return nil, 0, 0, err + return nil, 0, 0, 0, err } ts, err := r.ReadUint64() if err != nil { - return nil, 0, 0, err + return nil, 0, 0, 0, err } if ts < initialTs { - return nil, 0, 0, ErrKeyNotFound + return nil, 0, 0, 0, ErrKeyNotFound } if ts <= finalTs { - return value, ts, lv.hCount - skippedUpdates, nil + return value, ts, lv.lastDeleteAtTs, lv.hCount - skippedUpdates, nil } skippedUpdates++ @@ -2679,11 +2688,11 @@ func (lv *leafValue) lastUpdateBetween(hLog appendable.Appendable, initialTs, fi prevOff, err := r.ReadUint64() if err != nil { - return nil, 0, 0, err + return nil, 0, 0, 0, err } hOff = int64(prevOff) } - return nil, 0, 0, ErrKeyNotFound + return nil, 0, 0, 0, ErrKeyNotFound } diff --git a/embedded/tbtree/tbtree_test.go b/embedded/tbtree/tbtree_test.go index d277bcecc7..c8f0e0e401 100644 --- a/embedded/tbtree/tbtree_test.go +++ b/embedded/tbtree/tbtree_test.go @@ -261,7 +261,7 @@ func TestEdgeCases(t *testing.T) { err = tree.Insert(make([]byte, tree.maxKeySize), make([]byte, tree.maxValueSize+1)) require.ErrorIs(t, err, ErrorMaxValueSizeExceeded) - _, _, _, err = tree.Get(nil) + _, _, _, _, err = tree.Get(nil) require.ErrorIs(t, err, ErrIllegalArguments) for i := 0; i < 100; i++ { @@ -269,12 +269,12 @@ func TestEdgeCases(t *testing.T) { require.NoError(t, err) } - tss, hCount, err := tree.History(make([]byte, 1), 0, true, 10) + tss, _, hCount, err := tree.History(make([]byte, 1), 0, true, 10) require.NoError(t, err) require.Len(t, tss, 10) require.EqualValues(t, 100, hCount) - tss, hCount, err = tree.History(make([]byte, 1), 0, false, 10) + tss, _, hCount, err = tree.History(make([]byte, 1), 0, false, 10) require.NoError(t, err) require.Len(t, tss, 10) require.EqualValues(t, 100, hCount) @@ -285,10 +285,10 @@ func TestEdgeCases(t *testing.T) { s1, err := tree.Snapshot() require.NoError(t, err) - _, _, err = s1.History(make([]byte, 1), 0, false, 100) + _, _, _, err = s1.History(make([]byte, 1), 0, false, 100) require.NoError(t, err) - _, _, err = s1.History(make([]byte, 1), 101, false, 100) + _, _, _, err = s1.History(make([]byte, 1), 101, false, 100) require.ErrorIs(t, err, ErrOffsetOutOfRange) _, err = tree.Snapshot() @@ -307,7 +307,7 @@ func TestEdgeCases(t *testing.T) { s1, err := tree.Snapshot() require.NoError(t, err) - _, _, err = s1.History([]byte{2}, 0, false, 1) + _, _, _, err = s1.History([]byte{2}, 0, false, 1) require.ErrorIs(t, err, ErrKeyNotFound) err = s1.Close() @@ -356,7 +356,7 @@ func monotonicInsertions(t *testing.T, tbtree *TBtree, itCount int, kCount int, require.Equal(t, uint64(i), hc) - _, _, _, err := tbtree.GetBetween(k, 1, ts1) + _, _, _, _, err := tbtree.GetBetween(k, 1, ts1) require.NoError(t, err) } @@ -477,7 +477,7 @@ func randomInsertions(t *testing.T, tbtree *TBtree, kCount int, override bool) { require.NotZero(t, ts1) require.NotZero(t, hc1) - v0, ts0, hc0, err := tbtree.Get(k) + v0, ts0, _, hc0, err := tbtree.Get(k) require.NoError(t, err) require.Equal(t, v, v0) require.Equal(t, ts, ts0) @@ -505,7 +505,7 @@ func randomInsertions(t *testing.T, tbtree *TBtree, kCount int, override bool) { require.Equal(t, uint64(1), hc1) } - tvs, _, err := snapshot.History(k, 0, true, 1) + tvs, _, _, err := snapshot.History(k, 0, true, 1) require.NoError(t, err) require.Equal(t, ts, tvs[0].Ts) @@ -901,7 +901,7 @@ func TestTBTreeHistory(t *testing.T) { tbtree, err = Open(dir, opts) require.NoError(t, err) - tss, hCount, err := tbtree.History([]byte("k0"), 0, false, 10) + tss, _, hCount, err := tbtree.History([]byte("k0"), 0, false, 10) require.NoError(t, err) require.Equal(t, 2, len(tss)) require.EqualValues(t, 2, hCount) @@ -931,10 +931,10 @@ func TestTBTreeInsertionInAscendingOrder(t *testing.T) { _, _, err = tbtree.Flush() require.NoError(t, err) - _, _, err = tbtree.History(nil, 0, false, 10) + _, _, _, err = tbtree.History(nil, 0, false, 10) require.ErrorIs(t, err, ErrIllegalArguments) - _, _, err = tbtree.History([]byte("key"), 0, false, 0) + _, _, _, err = tbtree.History([]byte("key"), 0, false, 0) require.ErrorIs(t, err, ErrIllegalArguments) _, _, _, _, err = tbtree.GetWithPrefix([]byte("key"), []byte("longerkey")) @@ -946,16 +946,16 @@ func TestTBTreeInsertionInAscendingOrder(t *testing.T) { _, _, err = tbtree.Flush() require.ErrorIs(t, err, ErrAlreadyClosed) - _, _, err = tbtree.History([]byte("key"), 0, false, 10) + _, _, _, err = tbtree.History([]byte("key"), 0, false, 10) require.ErrorIs(t, err, ErrAlreadyClosed) err = tbtree.Close() require.ErrorIs(t, err, ErrAlreadyClosed) - _, _, _, err = tbtree.Get([]byte("key")) + _, _, _, _, err = tbtree.Get([]byte("key")) require.ErrorIs(t, err, ErrAlreadyClosed) - _, _, _, err = tbtree.GetBetween([]byte("key"), 1, 2) + _, _, _, _, err = tbtree.GetBetween([]byte("key"), 1, 2) require.ErrorIs(t, err, ErrAlreadyClosed) _, _, _, _, err = tbtree.GetWithPrefix([]byte("key"), nil) @@ -1151,10 +1151,10 @@ func TestTBTreeReOpen(t *testing.T) { tbtree, err := Open(dir, opts) require.NoError(t, err) - _, _, _, err = tbtree.Get([]byte("k0")) + _, _, _, _, err = tbtree.Get([]byte("k0")) require.NoError(t, err) - _, _, _, err = tbtree.Get([]byte("k1")) + _, _, _, _, err = tbtree.Get([]byte("k1")) require.NoError(t, err) root, isInnerNode := tbtree.root.(*innerNode) @@ -1178,13 +1178,13 @@ func TestTBTreeReOpen(t *testing.T) { _, _, err = childNodeRef.insert(nil) require.ErrorIs(t, err, singleapp.ErrNegativeOffset) - _, _, _, err = childNodeRef.get(nil) + _, _, _, _, err = childNodeRef.get(nil) require.ErrorIs(t, err, singleapp.ErrNegativeOffset) - _, _, _, err = childNodeRef.getBetween(nil, 1, 1) + _, _, _, _, err = childNodeRef.getBetween(nil, 1, 1) require.ErrorIs(t, err, singleapp.ErrNegativeOffset) - _, _, err = childNodeRef.history(nil, 0, true, 1) + _, _, _, err = childNodeRef.history(nil, 0, true, 1) require.ErrorIs(t, err, singleapp.ErrNegativeOffset) _, _, _, err = childNodeRef.findLeafNode(nil, nil, 0, nil, true) @@ -1220,7 +1220,7 @@ func TestTBTreeSelfHealingHistory(t *testing.T) { tbtree, err = Open(dir, DefaultOptions()) require.NoError(t, err) - _, _, err = tbtree.History([]byte("k0"), 0, true, 2) + _, _, _, err = tbtree.History([]byte("k0"), 0, true, 2) require.ErrorIs(t, err, ErrKeyNotFound) err = tbtree.Close() @@ -1249,7 +1249,7 @@ func TestTBTreeSelfHealingNodes(t *testing.T) { tbtree, err = Open(dir, DefaultOptions()) require.NoError(t, err) - _, _, _, err = tbtree.Get([]byte("k0")) + _, _, _, _, err = tbtree.Get([]byte("k0")) require.ErrorIs(t, err, ErrKeyNotFound) err = tbtree.Close() @@ -1502,12 +1502,12 @@ func TestLastUpdateBetween(t *testing.T) { require.NotNil(t, leaf) require.GreaterOrEqual(t, len(leaf.values), off) - _, _, _, err = leaf.values[off].lastUpdateBetween(nil, 1, 0) + _, _, _, _, err = leaf.values[off].lastUpdateBetween(nil, 1, 0) require.ErrorIs(t, err, ErrIllegalArguments) for i := 0; i < keyUpdatesCount; i++ { for f := i; f < keyUpdatesCount; f++ { - _, tx, hc, err := leaf.values[off].lastUpdateBetween(nil, uint64(i+1), uint64(f+1)) + _, tx, _, hc, err := leaf.values[off].lastUpdateBetween(nil, uint64(i+1), uint64(f+1)) require.NoError(t, err) require.Equal(t, uint64(f+1), hc) require.Equal(t, uint64(f+1), tx) @@ -1538,7 +1538,7 @@ func TestMultiTimedBulkInsertion(t *testing.T) { require.NoError(t, err) for _, kvt := range kvts { - v, ts, hc, err := tbtree.Get(kvt.K) + v, ts, _, hc, err := tbtree.Get(kvt.K) require.NoError(t, err) require.Equal(t, kvt.V, v) require.Equal(t, uint64(1), hc) @@ -1566,7 +1566,7 @@ func TestMultiTimedBulkInsertion(t *testing.T) { err = tbtree.BulkInsert(kvts) require.NoError(t, err) - v, ts, hc, err := tbtree.Get([]byte("key1_1")) + v, ts, _, hc, err := tbtree.Get([]byte("key1_1")) require.NoError(t, err) require.Equal(t, []byte("value2_1"), v) require.Equal(t, uint64(2), hc) @@ -1596,7 +1596,7 @@ func TestMultiTimedBulkInsertion(t *testing.T) { require.ErrorIs(t, err, ErrIllegalArguments) // rollback to latest snapshot should be made if insertion fails - _, _, _, err := tbtree.Get([]byte("key1_2")) + _, _, _, _, err := tbtree.Get([]byte("key1_2")) require.ErrorIs(t, err, ErrKeyNotFound) require.Equal(t, initialTs, tbtree.Ts()) diff --git a/pkg/database/database.go b/pkg/database/database.go index e9e97760df..fc5e12c4a7 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -1564,10 +1564,15 @@ func (d *db) History(ctx context.Context, req *schema.HistoryRequest) (*schema.E for i, valRef := range valRefs { val, err := valRef.Resolve() - if err != nil && err != store.ErrExpiredEntry { + if err != nil && err != store.ErrExpiredEntry && err != store.ErrValueDeleted { return nil, err } - if len(val) > 0 { + + if err == store.ErrValueDeleted { + hVal := valRef.HVal() + val = hVal[:] + err = nil + } else if len(val) > 0 { val = TrimPrefix(val) } diff --git a/pkg/database/database_test.go b/pkg/database/database_test.go index 8e157cbf96..a96192dd11 100644 --- a/pkg/database/database_test.go +++ b/pkg/database/database_test.go @@ -391,9 +391,9 @@ func TestDelete(t *testing.T) { }, }, }) - require.NoError(t, err) - require.NotNil(t, tx) - require.Empty(t, tx.KvEntries) + + require.ErrorIs(t, err, store.ErrValueDeleted) + require.Nil(t, tx) } func TestCurrentState(t *testing.T) { @@ -1010,14 +1010,24 @@ func TestHistory(t *testing.T) { }) require.ErrorIs(t, err, ErrResultSizeLimitReached) - for i, val := range inc.Entries { + // TODO: put additional set + deleteFound := false + for i := range inc.Entries { + val := inc.Entries[len(inc.Entries)-1-i] + require.Equal(t, kvs[0].Key, val.Key) if val.GetMetadata().GetDeleted() { require.Empty(t, val.Value) + deleteFound = true } else { - require.Equal(t, kvs[0].Value, val.Value) + if deleteFound { + hVal := sha256.Sum256(WrapWithPrefix(kvs[0].Value, PlainValuePrefix)) + require.Equal(t, hVal[:], val.Value) + } else { + require.Equal(t, kvs[0].Value, val.Value) + } } - require.EqualValues(t, i+1, val.Revision) + require.EqualValues(t, uint64(len(inc.Entries))-(uint64(i)), val.Revision) } dec, err := db.History(context.Background(), &schema.HistoryRequest{ From 882a211fc2589787ddbee15d98c9e18e26386ed2 Mon Sep 17 00:00:00 2001 From: Stefano Scafiti Date: Sat, 2 Mar 2024 10:19:29 +0100 Subject: [PATCH 2/7] Improve TestHistory --- pkg/database/database_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/database/database_test.go b/pkg/database/database_test.go index a96192dd11..1a3c2b5ef4 100644 --- a/pkg/database/database_test.go +++ b/pkg/database/database_test.go @@ -1004,13 +1004,14 @@ func TestHistory(t *testing.T) { }) require.ErrorIs(t, err, ErrResultSizeLimitExceeded) + db.maxResultSize = 3 + inc, err := db.History(context.Background(), &schema.HistoryRequest{ Key: kvs[0].Key, SinceTx: lastTx, }) require.ErrorIs(t, err, ErrResultSizeLimitReached) - // TODO: put additional set deleteFound := false for i := range inc.Entries { val := inc.Entries[len(inc.Entries)-1-i] @@ -1030,6 +1031,8 @@ func TestHistory(t *testing.T) { require.EqualValues(t, uint64(len(inc.Entries))-(uint64(i)), val.Revision) } + db.maxResultSize = 2 + dec, err := db.History(context.Background(), &schema.HistoryRequest{ Key: kvs[0].Key, SinceTx: lastTx, From 4dec0374962da640dff009bcca541188bf22482f Mon Sep 17 00:00:00 2001 From: Stefano Scafiti Date: Sat, 2 Mar 2024 11:28:59 +0100 Subject: [PATCH 3/7] Add TestExpiredEntryIsNotAccessibleFromHistory --- embedded/store/deleted_entries_test.go | 96 +++++++++++++++++++++++--- 1 file changed, 86 insertions(+), 10 deletions(-) diff --git a/embedded/store/deleted_entries_test.go b/embedded/store/deleted_entries_test.go index 1b075d2e17..26d4630a16 100644 --- a/embedded/store/deleted_entries_test.go +++ b/embedded/store/deleted_entries_test.go @@ -3,7 +3,9 @@ package store import ( "context" "fmt" + "math/rand" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -86,30 +88,104 @@ func TestDeletedEntriesAreNotAccessibleFromHistory(t *testing.T) { valRefs, _, err := immuStore.History(key, 0, false, nRecordsBeforeDelete) require.NoError(t, err) - - assertValuesAreVisible(t, valRefs) + assertValuesAreVisible(t, valRefs, false) deleteKey(t, immuStore, key) setKeys(t, immuStore, key, nRecordsAfterDelete) - // requested deleted records + non deleted records - valRefs, _, err = immuStore.History(key, uint64(nRecordsBeforeDelete/2), false, nRecordsBeforeDelete+1+nRecordsAfterDelete) - require.NoError(t, err) + t.Run("requesting deleted records only", func(t *testing.T) { + valRefs, _, err = immuStore.History(key, uint64(nRecordsBeforeDelete/2), false, nRecordsBeforeDelete/2) + require.NoError(t, err) + + assertValuesAreDeleted(t, valRefs) + + valRefs, _, err = immuStore.History(key, uint64(nRecordsAfterDelete+1), true, nRecordsBeforeDelete) + require.NoError(t, err) - assertValuesAreDeleted(t, valRefs[:(nRecordsBeforeDelete/2)]) + assertValuesAreDeleted(t, valRefs) + }) + + t.Run("requesting deleted and non deleted records", func(t *testing.T) { + valRefs, _, err = immuStore.History(key, uint64(nRecordsBeforeDelete/2), false, nRecordsBeforeDelete+1+nRecordsAfterDelete) + require.NoError(t, err) + + assertValuesAreDeleted(t, valRefs[:(nRecordsBeforeDelete/2)]) + assertIsDeleteEntry(t, valRefs[nRecordsBeforeDelete/2]) + assertValuesAreVisible(t, valRefs[(nRecordsBeforeDelete/2+1):], false) + + valRefs, _, err = immuStore.History(key, 0, true, nRecordsBeforeDelete+1+nRecordsAfterDelete) + require.NoError(t, err) - value, err := valRefs[nRecordsBeforeDelete/2].Resolve() + assertValuesAreVisible(t, valRefs[:nRecordsAfterDelete], true) + assertIsDeleteEntry(t, valRefs[nRecordsAfterDelete]) + assertValuesAreDeleted(t, valRefs[nRecordsAfterDelete+1:]) + }) +} + +func assertIsDeleteEntry(t *testing.T, valRef ValueRef) { + require.NotNil(t, valRef.KVMetadata()) + require.True(t, valRef.KVMetadata().Deleted()) + + value, err := valRef.Resolve() require.NoError(t, err) require.Nil(t, value) +} + +func TestExpiredEntryIsNotAccessibleFromHistory(t *testing.T) { + var ( + key = []byte("test-key") + nRecords = 100 + ) - assertValuesAreVisible(t, valRefs[(nRecordsBeforeDelete/2+1):]) + immuStore, err := Open(t.TempDir(), DefaultOptions()) + require.NoError(t, err) + require.NotNil(t, immuStore) + + defer immustoreClose(t, immuStore) + + expiredRecordIndex := rand.Intn(nRecords) + setKeys(t, immuStore, key, expiredRecordIndex) + + tx, err := immuStore.NewWriteOnlyTx(context.Background()) + require.NoError(t, err) + + md := NewKVMetadata() + err = md.ExpiresAt(time.Now().Add(-time.Second)) + require.NoError(t, err) + + err = tx.Set(key, md, []byte("expired-value")) + require.NoError(t, err) + + _, err = tx.Commit(context.Background()) + require.NoError(t, err) + + setKeys(t, immuStore, key, nRecords-expiredRecordIndex-1) + + valRefs, _, err := immuStore.History(key, 0, false, nRecords) + require.NoError(t, err) + require.Len(t, valRefs, nRecords) + + for i, valRef := range valRefs { + value, err := valRef.Resolve() + if i == expiredRecordIndex { + require.ErrorIs(t, err, ErrValueDeleted) + require.Nil(t, value) + } else { + require.NoError(t, err) + } + } } -func assertValuesAreVisible(t *testing.T, valRefs []ValueRef) { +func assertValuesAreVisible(t *testing.T, valRefs []ValueRef, reverse bool) { for i, valRef := range valRefs { value, err := valRef.Resolve() require.NoError(t, err) - require.Equal(t, []byte(fmt.Sprintf("test-value-%d", i)), value) + + expectedIdx := i + if reverse { + expectedIdx = len(valRefs) - 1 - i + } + require.Equal(t, []byte(fmt.Sprintf("test-value-%d", expectedIdx)), value) } } From c421802fd18e0aa2ebdaf5190d95dd49c2fac9e9 Mon Sep 17 00:00:00 2001 From: Stefano Scafiti Date: Sat, 2 Mar 2024 12:02:49 +0100 Subject: [PATCH 4/7] Test update of lastDeleteAtTs in bulkInsertion --- embedded/tbtree/tbtree_test.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/embedded/tbtree/tbtree_test.go b/embedded/tbtree/tbtree_test.go index c8f0e0e401..6a0b8f4d59 100644 --- a/embedded/tbtree/tbtree_test.go +++ b/embedded/tbtree/tbtree_test.go @@ -1522,6 +1522,34 @@ func TestMultiTimedBulkInsertion(t *testing.T) { tbtree, err := Open(t.TempDir(), DefaultOptions()) require.NoError(t, err) + t.Run("bulk insertion should update lastDeleteAtTs", func(t *testing.T) { + currTs := tbtree.Ts() + + kvts := []*KVT{ + {K: []byte("key"), V: []byte("value1"), T: currTs + 1}, + {K: []byte("key"), V: []byte("value2"), T: currTs + 2, Deleted: true}, + {K: []byte("key"), V: []byte("value3"), T: currTs + 3}, + {K: []byte("key"), V: []byte("value4"), T: currTs + 4}, + } + + err = tbtree.BulkInsert(kvts) + require.NoError(t, err) + + _, _, lastDeleteAtTs, _, err := tbtree.Get([]byte("key")) + require.NoError(t, err) + require.Equal(t, currTs+2, lastDeleteAtTs) + + err = tbtree.BulkInsert([]*KVT{ + {K: []byte("key"), V: []byte("value1"), T: currTs + 5}, + {K: []byte("key"), V: []byte("value2"), T: currTs + 6, Deleted: true}, + }) + require.NoError(t, err) + + _, _, lastDeleteAtTs, _, err = tbtree.Get([]byte("key")) + require.NoError(t, err) + require.Equal(t, currTs+6, lastDeleteAtTs) + }) + t.Run("multi-timed bulk insertion should succeed", func(t *testing.T) { currTs := tbtree.Ts() From 4950c1b8d96f3a6cbf6943e5b67ed1fc383d24c7 Mon Sep 17 00:00:00 2001 From: Stefano Scafiti Date: Sat, 2 Mar 2024 12:55:36 +0100 Subject: [PATCH 5/7] Rename vars --- embedded/document/engine.go | 2 +- embedded/store/deleted_entries_test.go | 8 ++++---- embedded/store/immustore.go | 7 ++++--- embedded/store/key_reader.go | 13 +++++++------ pkg/database/database.go | 5 +++-- pkg/database/database_test.go | 2 +- 6 files changed, 20 insertions(+), 17 deletions(-) diff --git a/embedded/document/engine.go b/embedded/document/engine.go index 008c876f38..fb120f924b 100644 --- a/embedded/document/engine.go +++ b/embedded/document/engine.go @@ -1107,7 +1107,7 @@ func (e *Engine) AuditDocument(ctx context.Context, collectionName string, docID for _, valRef := range valRefs { docAtRevision, err := e.getDocument(searchKey, valRef, includePayload) - if errors.Is(err, store.ErrValueDeleted) { + if errors.Is(err, store.ErrDeletedEntry) { continue } diff --git a/embedded/store/deleted_entries_test.go b/embedded/store/deleted_entries_test.go index 26d4630a16..92f3aafc69 100644 --- a/embedded/store/deleted_entries_test.go +++ b/embedded/store/deleted_entries_test.go @@ -31,7 +31,7 @@ func TestDeletedEntryIsNotAccesibleFromReadEntry(t *testing.T) { deleteKey(t, immuStore, key) val, err = immuStore.ReadValue(entry) - require.ErrorIs(t, err, ErrValueDeleted) + require.ErrorIs(t, err, ErrDeletedEntry) require.Nil(t, val) } @@ -56,7 +56,7 @@ func TestDeletedEntriesAreNotAccessibleFromGetBetween(t *testing.T) { val, err := valRef.Resolve() require.Nil(t, val) - require.ErrorIs(t, err, ErrValueDeleted) + require.ErrorIs(t, err, ErrDeletedEntry) } valRef, err := immuStore.GetBetween(context.Background(), key, 0, uint64(nRecordsBeforeDelete+1)) @@ -168,7 +168,7 @@ func TestExpiredEntryIsNotAccessibleFromHistory(t *testing.T) { for i, valRef := range valRefs { value, err := valRef.Resolve() if i == expiredRecordIndex { - require.ErrorIs(t, err, ErrValueDeleted) + require.ErrorIs(t, err, ErrExpiredEntry) require.Nil(t, value) } else { require.NoError(t, err) @@ -193,7 +193,7 @@ func assertValuesAreDeleted(t *testing.T, valRefs []ValueRef) { for i := 0; i < len(valRefs)-1; i++ { value, err := valRefs[i].Resolve() require.Nil(t, value) - require.ErrorIs(t, err, ErrValueDeleted) + require.ErrorIs(t, err, ErrDeletedEntry) } } diff --git a/embedded/store/immustore.go b/embedded/store/immustore.go index 23b9c690ea..5161e46f81 100644 --- a/embedded/store/immustore.go +++ b/embedded/store/immustore.go @@ -78,6 +78,7 @@ var ErrTxSizeGreaterThanMaxTxSize = errors.New("tx size greater than max tx size var ErrCorruptedAHtree = errors.New("appendable hash tree is corrupted") var ErrKeyNotFound = tbtree.ErrKeyNotFound // TODO: define error in store layer var ErrExpiredEntry = fmt.Errorf("%w: expired entry", ErrKeyNotFound) +var ErrDeletedEntry = fmt.Errorf("%w: deleted entry", ErrKeyNotFound) var ErrKeyAlreadyExists = errors.New("key already exists") var ErrTxNotFound = errors.New("tx not found") var ErrNoMoreEntries = tbtree.ErrNoMoreEntries // TODO: define error in store layer @@ -928,7 +929,7 @@ func (s *ImmuStore) GetBetween(ctx context.Context, key []byte, initialTxID uint } if lastDeleteAtTx > finalTxID { - return &deletedValueRef{ValueRef: valRef}, nil + return &unreadableValueRef{ValueRef: valRef}, nil } return valRef, nil } @@ -1054,7 +1055,7 @@ func (s *ImmuStore) History(key []byte, offset uint64, descOrder bool, limit int isDeleted := lastDeleteAtTx > valRef.Tx() if isDeleted || isExpired { - valRefs[i] = &deletedValueRef{ValueRef: valRef} + valRefs[i] = &unreadableValueRef{ValueRef: valRef} } } return valRefs, hCount, nil @@ -3144,7 +3145,7 @@ func (s *ImmuStore) ReadValue(entry *TxEntry) ([]byte, error) { } if lastDeleteAtTx > entry.txID { - return nil, ErrValueDeleted + return nil, ErrDeletedEntry } } diff --git a/embedded/store/key_reader.go b/embedded/store/key_reader.go index e3a25daff4..20b1a85ec4 100644 --- a/embedded/store/key_reader.go +++ b/embedded/store/key_reader.go @@ -20,15 +20,12 @@ import ( "context" "crypto/sha256" "encoding/binary" - "errors" "fmt" "time" "github.com/codenotary/immudb/embedded/tbtree" ) -var ErrValueDeleted = errors.New("value has been deleted") - type Snapshot struct { st *ImmuStore prefix []byte @@ -261,12 +258,16 @@ type valueRef struct { st *ImmuStore } -type deletedValueRef struct { +type unreadableValueRef struct { ValueRef } -func (ref *deletedValueRef) Resolve() (val []byte, err error) { - return nil, ErrValueDeleted +func (ref *unreadableValueRef) Resolve() (val []byte, err error) { + md := ref.ValueRef.KVMetadata() + if md != nil && md.ExpiredAt(time.Now()) { + return nil, ErrExpiredEntry + } + return nil, ErrDeletedEntry } func (st *ImmuStore) valueRefFrom(tx, hc uint64, indexedVal []byte) (ValueRef, error) { diff --git a/pkg/database/database.go b/pkg/database/database.go index fc5e12c4a7..04bda8416e 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -1564,11 +1564,12 @@ func (d *db) History(ctx context.Context, req *schema.HistoryRequest) (*schema.E for i, valRef := range valRefs { val, err := valRef.Resolve() - if err != nil && err != store.ErrExpiredEntry && err != store.ErrValueDeleted { + valueAccessible := err != store.ErrExpiredEntry && err != store.ErrDeletedEntry + if err != nil && valueAccessible { return nil, err } - if err == store.ErrValueDeleted { + if !valueAccessible { hVal := valRef.HVal() val = hVal[:] err = nil diff --git a/pkg/database/database_test.go b/pkg/database/database_test.go index 1a3c2b5ef4..625a2c73ed 100644 --- a/pkg/database/database_test.go +++ b/pkg/database/database_test.go @@ -392,7 +392,7 @@ func TestDelete(t *testing.T) { }, }) - require.ErrorIs(t, err, store.ErrValueDeleted) + require.ErrorIs(t, err, store.ErrDeletedEntry) require.Nil(t, tx) } From 8f16ac61e578a59050ff191bbc987f08d549ec16 Mon Sep 17 00:00:00 2001 From: Stefano Scafiti Date: Sun, 3 Mar 2024 19:11:35 +0100 Subject: [PATCH 6/7] Remove unnecessary assigment --- pkg/database/database.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/database/database.go b/pkg/database/database.go index 04bda8416e..4f4200ea21 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -1572,7 +1572,6 @@ func (d *db) History(ctx context.Context, req *schema.HistoryRequest) (*schema.E if !valueAccessible { hVal := valRef.HVal() val = hVal[:] - err = nil } else if len(val) > 0 { val = TrimPrefix(val) } From e16bd0896c271bc4b89c5d6e5e970af5a92a9d08 Mon Sep 17 00:00:00 2001 From: Stefano Scafiti Date: Mon, 4 Mar 2024 01:41:11 +0100 Subject: [PATCH 7/7] Set lastDeleteAtTs properly in leafValue --- embedded/store/indexer.go | 10 +++++++++- embedded/tbtree/options.go | 8 +++++++- embedded/tbtree/reader.go | 10 +++++++--- embedded/tbtree/tbtree.go | 25 +++++++++++++++++-------- 4 files changed, 40 insertions(+), 13 deletions(-) diff --git a/embedded/store/indexer.go b/embedded/store/indexer.go index 95ed35c75f..3aa277d4f5 100644 --- a/embedded/store/indexer.go +++ b/embedded/store/indexer.go @@ -117,7 +117,15 @@ func newIndexer(path string, store *ImmuStore, opts *Options) (*indexer, error) WithCommitLogMaxOpenedFiles(opts.IndexOpts.CommitLogMaxOpenedFiles). WithRenewSnapRootAfter(opts.IndexOpts.RenewSnapRootAfter). WithCompactionThld(opts.IndexOpts.CompactionThld). - WithDelayDuringCompaction(opts.IndexOpts.DelayDuringCompaction) + WithDelayDuringCompaction(opts.IndexOpts.DelayDuringCompaction). + WithIsDeletedValueFunc(func(b []byte) bool { + valRef, err := store.valueRefFrom(0, 0, b) + if err != nil { + return false + } + md := valRef.KVMetadata() + return md != nil && md.Deleted() + }) if opts.appFactory != nil { indexOpts.WithAppFactory(func(rootPath, subPath string, appOpts *multiapp.Options) (appendable.Appendable, error) { diff --git a/embedded/tbtree/options.go b/embedded/tbtree/options.go index 4a7fd45b98..0d4c581c63 100644 --- a/embedded/tbtree/options.go +++ b/embedded/tbtree/options.go @@ -79,7 +79,8 @@ type Options struct { maxValueSize int fileSize int - appFactory AppFactoryFunc + appFactory AppFactoryFunc + isDeletedValue func([]byte) bool } func DefaultOptions() *Options { @@ -284,3 +285,8 @@ func (opts *Options) WithDelayDuringCompaction(delay time.Duration) *Options { opts.delayDuringCompaction = delay return opts } + +func (opts *Options) WithIsDeletedValueFunc(isDeletedValue func([]byte) bool) *Options { + opts.isDeletedValue = isDeletedValue + return opts +} diff --git a/embedded/tbtree/reader.go b/embedded/tbtree/reader.go index 3dfe7cb5cf..5cbb812a6c 100644 --- a/embedded/tbtree/reader.go +++ b/embedded/tbtree/reader.go @@ -153,8 +153,8 @@ func (r *Reader) ReadBetween(initialTs, finalTs uint64) (key []byte, value []byt continue } - value, ts, _, hc, err := leafValue.lastUpdateBetween(r.snapshot.t.hLog, initialTs, finalTs) - if err == nil { + value, ts, lastDeleteAtTs, hc, err := leafValue.lastUpdateBetween(r.snapshot.t.hLog, initialTs, finalTs) + if err == nil && lastDeleteAtTs < finalTs { return cp(leafValue.key), cp(value), ts, hc, nil } } @@ -265,7 +265,7 @@ func (r *Reader) Read() (key []byte, value []byte, ts, hc uint64, err error) { return cp(leafValue.key), cp(leafValue.timedValue().Value), leafValue.timedValue().Ts, leafValue.historyCount(), nil } - tvs, _, hc, err := r.leafValue.history(r.leafValue.key, uint64(r.hoff), r.descOrder, 1, r.leafNode.t.hLog) + tvs, lastUpdateAtTs, hc, err := r.leafValue.history(r.leafValue.key, uint64(r.hoff), r.descOrder, 1, r.leafNode.t.hLog) if errors.Is(err, ErrNoMoreEntries) { r.leafValue = nil r.hoff = 0 @@ -281,6 +281,10 @@ func (r *Reader) Read() (key []byte, value []byte, ts, hc uint64, err error) { continue } + if lastUpdateAtTs >= tvs[0].Ts { + continue + } + var c uint64 if r.descOrder { diff --git a/embedded/tbtree/tbtree.go b/embedded/tbtree/tbtree.go index 9fbdff5007..28834f5ea0 100644 --- a/embedded/tbtree/tbtree.go +++ b/embedded/tbtree/tbtree.go @@ -178,6 +178,8 @@ type TBtree struct { root node + isDeletedValue func([]byte) bool + maxNodeSize int insertionCountSinceFlush int insertionCountSinceSync int @@ -578,6 +580,7 @@ func OpenWith(path string, nLog, hLog, cLog appendable.Appendable, opts *Options historyLogMaxOpenedFiles: opts.historyLogMaxOpenedFiles, commitLogMaxOpenedFiles: opts.commitLogMaxOpenedFiles, readOnly: opts.readOnly, + isDeletedValue: opts.isDeletedValue, snapshots: make(map[uint64]*Snapshot), } @@ -923,6 +926,10 @@ func (t *TBtree) readLeafNodeFrom(r *appendable.Reader) (*leafNode, error) { hCount: hCount, } + if len(value) > 0 && t.isDeletedValue != nil && t.isDeletedValue(value) { + leafValue.lastDeleteAtTs = ts + } + l.values[c] = leafValue if l._ts < ts { @@ -2244,10 +2251,11 @@ func (l *leafNode) insert(kvts []*KVT) (nodes []node, depth int, err error) { copy(timedValues, lv.timedValues) newLeaf.values[i] = &leafValue{ - key: lv.key, - timedValues: timedValues, - hOff: lv.hOff, - hCount: lv.hCount, + key: lv.key, + timedValues: timedValues, + hOff: lv.hOff, + hCount: lv.hCount, + lastDeleteAtTs: lv.lastDeleteAtTs, } } @@ -2527,10 +2535,11 @@ func (l *leafNode) setTs(ts uint64) (node, error) { copy(timedValues, lv.timedValues) newLeaf.values[i] = &leafValue{ - key: lv.key, - timedValues: timedValues, - hOff: lv.hOff, - hCount: lv.hCount, + key: lv.key, + timedValues: timedValues, + hOff: lv.hOff, + hCount: lv.hCount, + lastDeleteAtTs: lv.lastDeleteAtTs, } }