diff --git a/nodedb.go b/nodedb.go index f3aaeea04..1fc4e982d 100644 --- a/nodedb.go +++ b/nodedb.go @@ -416,83 +416,42 @@ func (ndb *nodeDB) saveNodeFromPruning(node *Node) error { return ndb.batch.Set(ndb.nodeKey(node.GetKey()), buf.Bytes()) } -// rootkey cache of two elements, attempting to mimic a direct-mapped cache. -type rootkeyCache struct { - // initial value is set to {-1, -1}, which is an invalid version for a getrootkey call. - versions [2]int64 - rootKeys [2][]byte - next int -} - -func (rkc *rootkeyCache) getRootKey(ndb *nodeDB, version int64) ([]byte, error) { - // Check both cache entries - for i := 0; i < 2; i++ { - if rkc.versions[i] == version { - return rkc.rootKeys[i], nil - } - } - - rootKey, err := ndb.GetRoot(version) - if err != nil { - return nil, err - } - rkc.setRootKey(version, rootKey) - return rootKey, nil -} - -func (rkc *rootkeyCache) setRootKey(version int64, rootKey []byte) { - // Store in next available slot, cycling between 0 and 1 - rkc.versions[rkc.next] = version - rkc.rootKeys[rkc.next] = rootKey - rkc.next = (rkc.next + 1) % 2 -} - -func newRootkeyCache() *rootkeyCache { - return &rootkeyCache{ - versions: [2]int64{-1, -1}, - rootKeys: [2][]byte{}, - next: 0, - } -} - // deleteVersion deletes a tree version from disk. // deletes orphans -func (ndb *nodeDB) deleteVersion(version int64, cache *rootkeyCache) error { - rootKey, err := cache.getRootKey(ndb, version) - if err != nil && err != ErrVersionDoesNotExist { +func (ndb *nodeDB) deleteVersion(version int64) error { + rootKey, err := ndb.GetRoot(version) + if err != nil { return err } // If rootKey is nil, it indicates that the root is either a dangling reference or does not exist at all. // In this case, we can skip the orphans pruning process since there are no nodes in the current version to be considered orphans. // Otherwise, proceed with pruning the orphans. - if rootKey != nil { - if err := ndb.traverseOrphansWithRootkeyCache(cache, version, version+1, func(orphan *Node) error { - if orphan.nodeKey.nonce == 0 && !orphan.isLegacy { - // if the orphan is a reformatted root, it can be a legacy root - // so it should be removed from the pruning process. - if err := ndb.deleteFromPruning(ndb.legacyNodeKey(orphan.hash)); err != nil { - return err - } - } - if orphan.nodeKey.nonce == 1 && orphan.nodeKey.version < version { - // if the orphan is referred to the previous root, it should be reformatted - // to (version, 0), because the root (version, 1) should be removed but not - // applied now due to the batch writing. - orphan.nodeKey.nonce = 0 - } - nk := orphan.GetKey() - if orphan.isLegacy { - return ndb.deleteFromPruning(ndb.legacyNodeKey(nk)) + if err := ndb.traverseOrphans(version, version+1, func(orphan *Node) error { + if orphan.nodeKey.nonce == 0 && !orphan.isLegacy { + // if the orphan is a reformatted root, it can be a legacy root + // so it should be removed from the pruning process. + if err := ndb.deleteFromPruning(ndb.legacyNodeKey(orphan.hash)); err != nil { + return err } - return ndb.deleteFromPruning(ndb.nodeKey(nk)) - }); err != nil && err != ErrVersionDoesNotExist { - return err } + if orphan.nodeKey.nonce == 1 && orphan.nodeKey.version < version { + // if the orphan is referred to the previous root, it should be reformatted + // to (version, 0), because the root (version, 1) should be removed but not + // applied now due to the batch writing. + orphan.nodeKey.nonce = 0 + } + nk := orphan.GetKey() + if orphan.isLegacy { + return ndb.deleteFromPruning(ndb.legacyNodeKey(nk)) + } + return ndb.deleteFromPruning(ndb.nodeKey(nk)) + }); err != nil { + return err } literalRootKey := GetRootKey(version) - if rootKey == nil || !bytes.Equal(rootKey, literalRootKey) { + if !bytes.Equal(rootKey, literalRootKey) { // if the root key is not matched with the literal root key, it means the given root // is a reference root to the previous version. if err := ndb.deleteFromPruning(ndb.nodeKey(literalRootKey)); err != nil { @@ -501,7 +460,7 @@ func (ndb *nodeDB) deleteVersion(version int64, cache *rootkeyCache) error { } // check if the version is referred by the next version - nextRootKey, err := cache.getRootKey(ndb, version+1) + nextRootKey, err := ndb.GetRoot(version + 1) if err != nil { return err } @@ -660,8 +619,8 @@ func (ndb *nodeDB) startPruning() { } if err := ndb.deleteVersionsTo(toVersion); err != nil { - ndb.logger.Error("Error while pruning", "err", err) - time.Sleep(1 * time.Second) + ndb.logger.Error("Error while pruning full store asynchronously", "version to prune to", toVersion, "err", err) + time.Sleep(500 * time.Millisecond) continue } @@ -731,10 +690,14 @@ func (ndb *nodeDB) deleteVersionsTo(toVersion int64) error { ndb.resetLegacyLatestVersion(-1) } - rootkeyCache := newRootkeyCache() for version := first; version <= toVersion; version++ { - if err := ndb.deleteVersion(version, rootkeyCache); err != nil { - return err + if err := ndb.deleteVersion(version); err != nil { + // If the version is not found in the store continue on to the next version available + if err != ErrVersionDoesNotExist { + return err + } + + ndb.logger.Error("Error while pruning, moving on the the next version in the store", "version missing", version, "next version", version+1, "err", err) } ndb.resetFirstVersion(version + 1) } @@ -1118,12 +1081,7 @@ func isReferenceRoot(bz []byte) (bool, int) { // traverseOrphans traverses orphans which removed by the updates of the curVersion in the prevVersion. // NOTE: it is used for both legacy and new nodes. func (ndb *nodeDB) traverseOrphans(prevVersion, curVersion int64, fn func(*Node) error) error { - cache := newRootkeyCache() - return ndb.traverseOrphansWithRootkeyCache(cache, prevVersion, curVersion, fn) -} - -func (ndb *nodeDB) traverseOrphansWithRootkeyCache(cache *rootkeyCache, prevVersion, curVersion int64, fn func(*Node) error) error { - curKey, err := cache.getRootKey(ndb, curVersion) + curKey, err := ndb.GetRoot(curVersion) if err != nil { return err } @@ -1133,7 +1091,7 @@ func (ndb *nodeDB) traverseOrphansWithRootkeyCache(cache *rootkeyCache, prevVers return err } - prevKey, err := cache.getRootKey(ndb, prevVersion) + prevKey, err := ndb.GetRoot(prevVersion) if err != nil { return err }