From 31196453b6deb05b2458385714fb843545e881f5 Mon Sep 17 00:00:00 2001 From: Stefano Scafiti Date: Fri, 14 Feb 2025 10:03:14 +0100 Subject: [PATCH 1/2] chore(embedded/store): allow discarding precommitted transactions during open Signed-off-by: Stefano Scafiti --- embedded/store/immustore.go | 56 +++++++++++++++++++------------------ embedded/store/options.go | 29 +++++++++++-------- 2 files changed, 47 insertions(+), 38 deletions(-) diff --git a/embedded/store/immustore.go b/embedded/store/immustore.go index 0874eac974..7229d28071 100644 --- a/embedded/store/immustore.go +++ b/embedded/store/immustore.go @@ -533,43 +533,45 @@ func OpenWith(path string, vLogs []appendable.Appendable, txLog, cLog appendable precommittedAlh := committedAlh precommittedTxLogSize := committedTxLogSize - // read pre-committed txs from txLog and insert into cLogBuf to continue with the commit process - // txLog may be partially written, precommitted transactions loading is terminated if an inconsistency is found - txReader := appendable.NewReaderFrom(txLog, precommittedTxLogSize, multiapp.DefaultReadBufferSize) + if !opts.DiscardPrecommittedTransactions { + // read pre-committed txs from txLog and insert into cLogBuf to continue with the commit process + // txLog may be partially written, precommitted transactions loading is terminated if an inconsistency is found + txReader := appendable.NewReaderFrom(txLog, precommittedTxLogSize, multiapp.DefaultReadBufferSize) - tx, _ := txPool.Alloc() + tx, _ := txPool.Alloc() - for { - err = tx.readFrom(txReader, false) - if errors.Is(err, io.EOF) { - break - } - if err != nil { - opts.logger.Infof("%v: discarding pre-committed transaction: %d", err, precommittedTxID+1) - break - } + for { + err = tx.readFrom(txReader, false) + if errors.Is(err, io.EOF) { + break + } + if err != nil { + opts.logger.Infof("%v: discarding pre-committed transaction: %d", err, precommittedTxID+1) + break + } - if tx.header.ID != precommittedTxID+1 || tx.header.PrevAlh != precommittedAlh { - opts.logger.Infof("%v: discarding pre-committed transaction: %d", ErrCorruptedData, precommittedTxID+1) - break - } + if tx.header.ID != precommittedTxID+1 || tx.header.PrevAlh != precommittedAlh { + opts.logger.Infof("%v: discarding pre-committed transaction: %d", ErrCorruptedData, precommittedTxID+1) + break + } - precommittedTxID++ - precommittedAlh = tx.header.Alh() + precommittedTxID++ + precommittedAlh = tx.header.Alh() - txSize := int(txReader.ReadCount() - (precommittedTxLogSize - committedTxLogSize)) + txSize := int(txReader.ReadCount() - (precommittedTxLogSize - committedTxLogSize)) - err = cLogBuf.put(precommittedTxID, precommittedAlh, precommittedTxLogSize, txSize) - if err != nil { - txPool.Release(tx) - return nil, fmt.Errorf("%v: while loading pre-committed transaction: %v", err, precommittedTxID+1) + err = cLogBuf.put(precommittedTxID, precommittedAlh, precommittedTxLogSize, txSize) + if err != nil { + txPool.Release(tx) + return nil, fmt.Errorf("%v: while loading pre-committed transaction: %v", err, precommittedTxID+1) + } + + precommittedTxLogSize += int64(txSize) } - precommittedTxLogSize += int64(txSize) + txPool.Release(tx) } - txPool.Release(tx) - vLogsMap := make(map[byte]*refVLog, len(vLogs)) vLogUnlockedList := list.New() diff --git a/embedded/store/options.go b/embedded/store/options.go index f8fd51736c..57dd07b15c 100644 --- a/embedded/store/options.go +++ b/embedded/store/options.go @@ -147,6 +147,8 @@ type Options struct { CompressionLevel int EmbeddedValues bool PreallocFiles bool + // Discard processing of transactions that were precommitted before opening + DiscardPrecommittedTransactions bool // options below affect indexing IndexOpts *IndexOptions @@ -247,17 +249,17 @@ func DefaultOptions() *Options { WriteTxHeaderVersion: DefaultWriteTxHeaderVersion, // options below are only set during initialization and stored as metadata - MaxTxEntries: DefaultMaxTxEntries, - MaxKeyLen: DefaultMaxKeyLen, - MaxValueLen: DefaultMaxValueLen, - FileSize: DefaultFileSize, - CompressionFormat: DefaultCompressionFormat, - CompressionLevel: DefaultCompressionLevel, - EmbeddedValues: DefaultEmbeddedValues, - PreallocFiles: DefaultPreallocFiles, - - IndexOpts: DefaultIndexOptions(), - AHTOpts: DefaultAHTOptions(), + MaxTxEntries: DefaultMaxTxEntries, + MaxKeyLen: DefaultMaxKeyLen, + MaxValueLen: DefaultMaxValueLen, + FileSize: DefaultFileSize, + CompressionFormat: DefaultCompressionFormat, + CompressionLevel: DefaultCompressionLevel, + EmbeddedValues: DefaultEmbeddedValues, + PreallocFiles: DefaultPreallocFiles, + DiscardPrecommittedTransactions: false, + IndexOpts: DefaultIndexOptions(), + AHTOpts: DefaultAHTOptions(), } } @@ -620,6 +622,11 @@ func (opts *Options) WithAHTOptions(ahtOptions *AHTOptions) *Options { return opts } +func (opts *Options) WithDiscardPrecommittedTransactions(discard bool) *Options { + opts.DiscardPrecommittedTransactions = discard + return opts +} + // IndexOptions func (opts *IndexOptions) WithCacheSize(cacheSize int) *IndexOptions { From ca9f05d12a4f81dc51a4a393f97935fd0fc9cbd3 Mon Sep 17 00:00:00 2001 From: Stefano Scafiti Date: Thu, 20 Feb 2025 14:28:49 +0100 Subject: [PATCH 2/2] Disable mvcc for ReadOnlyTx too --- embedded/store/ongoing_tx.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/embedded/store/ongoing_tx.go b/embedded/store/ongoing_tx.go index b82684f89a..750cb7e4b6 100644 --- a/embedded/store/ongoing_tx.go +++ b/embedded/store/ongoing_tx.go @@ -118,14 +118,16 @@ func newOngoingTx(ctx context.Context, s *ImmuStore, opts *TxOptions) (*OngoingT tx.mode = opts.Mode - if opts.Mode == WriteOnlyTx { + if opts.Mode != WriteOnlyTx { return tx, nil } tx.snapshotMustIncludeTxID = opts.SnapshotMustIncludeTxID tx.snapshotRenewalPeriod = opts.SnapshotRenewalPeriod - tx.mvccReadSet = &mvccReadSet{} + if opts.Mode == ReadWriteTx { + tx.mvccReadSet = &mvccReadSet{} + } return tx, nil } @@ -562,7 +564,6 @@ func (tx *OngoingTx) NewKeyReader(spec KeyReaderSpec) (KeyReader, error) { return snap.NewKeyReader(spec) } - return newOngoingTxKeyReader(tx, spec) }