diff --git a/embedded/store/immustore.go b/embedded/store/immustore.go index 0874eac974..7229d28071 100644 --- a/embedded/store/immustore.go +++ b/embedded/store/immustore.go @@ -533,43 +533,45 @@ func OpenWith(path string, vLogs []appendable.Appendable, txLog, cLog appendable precommittedAlh := committedAlh precommittedTxLogSize := committedTxLogSize - // read pre-committed txs from txLog and insert into cLogBuf to continue with the commit process - // txLog may be partially written, precommitted transactions loading is terminated if an inconsistency is found - txReader := appendable.NewReaderFrom(txLog, precommittedTxLogSize, multiapp.DefaultReadBufferSize) + if !opts.DiscardPrecommittedTransactions { + // read pre-committed txs from txLog and insert into cLogBuf to continue with the commit process + // txLog may be partially written, precommitted transactions loading is terminated if an inconsistency is found + txReader := appendable.NewReaderFrom(txLog, precommittedTxLogSize, multiapp.DefaultReadBufferSize) - tx, _ := txPool.Alloc() + tx, _ := txPool.Alloc() - for { - err = tx.readFrom(txReader, false) - if errors.Is(err, io.EOF) { - break - } - if err != nil { - opts.logger.Infof("%v: discarding pre-committed transaction: %d", err, precommittedTxID+1) - break - } + for { + err = tx.readFrom(txReader, false) + if errors.Is(err, io.EOF) { + break + } + if err != nil { + opts.logger.Infof("%v: discarding pre-committed transaction: %d", err, precommittedTxID+1) + break + } - if tx.header.ID != precommittedTxID+1 || tx.header.PrevAlh != precommittedAlh { - opts.logger.Infof("%v: discarding pre-committed transaction: %d", ErrCorruptedData, precommittedTxID+1) - break - } + if tx.header.ID != precommittedTxID+1 || tx.header.PrevAlh != precommittedAlh { + opts.logger.Infof("%v: discarding pre-committed transaction: %d", ErrCorruptedData, precommittedTxID+1) + break + } - precommittedTxID++ - precommittedAlh = tx.header.Alh() + precommittedTxID++ + precommittedAlh = tx.header.Alh() - txSize := int(txReader.ReadCount() - (precommittedTxLogSize - committedTxLogSize)) + txSize := int(txReader.ReadCount() - (precommittedTxLogSize - committedTxLogSize)) - err = cLogBuf.put(precommittedTxID, precommittedAlh, precommittedTxLogSize, txSize) - if err != nil { - txPool.Release(tx) - return nil, fmt.Errorf("%v: while loading pre-committed transaction: %v", err, precommittedTxID+1) + err = cLogBuf.put(precommittedTxID, precommittedAlh, precommittedTxLogSize, txSize) + if err != nil { + txPool.Release(tx) + return nil, fmt.Errorf("%v: while loading pre-committed transaction: %v", err, precommittedTxID+1) + } + + precommittedTxLogSize += int64(txSize) } - precommittedTxLogSize += int64(txSize) + txPool.Release(tx) } - txPool.Release(tx) - vLogsMap := make(map[byte]*refVLog, len(vLogs)) vLogUnlockedList := list.New() diff --git a/embedded/store/ongoing_tx.go b/embedded/store/ongoing_tx.go index b82684f89a..750cb7e4b6 100644 --- a/embedded/store/ongoing_tx.go +++ b/embedded/store/ongoing_tx.go @@ -118,14 +118,16 @@ func newOngoingTx(ctx context.Context, s *ImmuStore, opts *TxOptions) (*OngoingT tx.mode = opts.Mode - if opts.Mode == WriteOnlyTx { + if opts.Mode != WriteOnlyTx { return tx, nil } tx.snapshotMustIncludeTxID = opts.SnapshotMustIncludeTxID tx.snapshotRenewalPeriod = opts.SnapshotRenewalPeriod - tx.mvccReadSet = &mvccReadSet{} + if opts.Mode == ReadWriteTx { + tx.mvccReadSet = &mvccReadSet{} + } return tx, nil } @@ -562,7 +564,6 @@ func (tx *OngoingTx) NewKeyReader(spec KeyReaderSpec) (KeyReader, error) { return snap.NewKeyReader(spec) } - return newOngoingTxKeyReader(tx, spec) } diff --git a/embedded/store/options.go b/embedded/store/options.go index f8fd51736c..57dd07b15c 100644 --- a/embedded/store/options.go +++ b/embedded/store/options.go @@ -147,6 +147,8 @@ type Options struct { CompressionLevel int EmbeddedValues bool PreallocFiles bool + // Discard processing of transactions that were precommitted before opening + DiscardPrecommittedTransactions bool // options below affect indexing IndexOpts *IndexOptions @@ -247,17 +249,17 @@ func DefaultOptions() *Options { WriteTxHeaderVersion: DefaultWriteTxHeaderVersion, // options below are only set during initialization and stored as metadata - MaxTxEntries: DefaultMaxTxEntries, - MaxKeyLen: DefaultMaxKeyLen, - MaxValueLen: DefaultMaxValueLen, - FileSize: DefaultFileSize, - CompressionFormat: DefaultCompressionFormat, - CompressionLevel: DefaultCompressionLevel, - EmbeddedValues: DefaultEmbeddedValues, - PreallocFiles: DefaultPreallocFiles, - - IndexOpts: DefaultIndexOptions(), - AHTOpts: DefaultAHTOptions(), + MaxTxEntries: DefaultMaxTxEntries, + MaxKeyLen: DefaultMaxKeyLen, + MaxValueLen: DefaultMaxValueLen, + FileSize: DefaultFileSize, + CompressionFormat: DefaultCompressionFormat, + CompressionLevel: DefaultCompressionLevel, + EmbeddedValues: DefaultEmbeddedValues, + PreallocFiles: DefaultPreallocFiles, + DiscardPrecommittedTransactions: false, + IndexOpts: DefaultIndexOptions(), + AHTOpts: DefaultAHTOptions(), } } @@ -620,6 +622,11 @@ func (opts *Options) WithAHTOptions(ahtOptions *AHTOptions) *Options { return opts } +func (opts *Options) WithDiscardPrecommittedTransactions(discard bool) *Options { + opts.DiscardPrecommittedTransactions = discard + return opts +} + // IndexOptions func (opts *IndexOptions) WithCacheSize(cacheSize int) *IndexOptions {