diff --git a/db/batch/batch.go b/db/batch/batch.go index 553815b2eb..38fb0c5be0 100644 --- a/db/batch/batch.go +++ b/db/batch/batch.go @@ -40,6 +40,8 @@ type ( ClearAndUnlock() // Put insert or update a record identified by (namespace, key) Put(string, []byte, []byte, string) + // Append appends a KVStoreBatch to the current batch + Append(KVStoreBatch) // Delete deletes a record by (namespace, key) Delete(string, []byte, string) // Size returns the size of batch diff --git a/db/batch/batch_impl.go b/db/batch/batch_impl.go index 49dc7f4235..ed12b0917f 100644 --- a/db/batch/batch_impl.go +++ b/db/batch/batch_impl.go @@ -77,6 +77,27 @@ func (b *baseKVStoreBatch) Put(namespace string, key, value []byte, errorMessage b.batch(Put, namespace, key, value, errorMessage) } +func (b *baseKVStoreBatch) Append(kvb KVStoreBatch) { + b.mutex.Lock() + defer b.mutex.Unlock() + kvb.Lock() + defer kvb.Unlock() + for i := 0; i < kvb.Size(); i++ { + wi, err := b.Entry(i) + if err != nil { + panic(err) + } + switch wi.writeType { + case Put: + b.batch(Put, wi.namespace, wi.key, wi.value, wi.errorMessage) + case Delete: + b.batch(Delete, wi.namespace, wi.key, nil, wi.errorMessage) + default: + panic("unexpected write type") + } + } +} + // Delete deletes a record func (b *baseKVStoreBatch) Delete(namespace string, key []byte, errorMessage string) { b.mutex.Lock() @@ -280,16 +301,45 @@ func (cb *cachedBatch) touchKey(h kvCacheKey) { func (cb *cachedBatch) Put(namespace string, key, value []byte, errorMessage string) { cb.lock.Lock() defer cb.lock.Unlock() + cb.put(namespace, key, value, errorMessage) +} + +func (cb *cachedBatch) put(namespace string, key, value []byte, errorMessage string) { h := cb.hash(namespace, key) cb.touchKey(h) cb.currentCache().Write(&h, value) cb.kvStoreBatch.batch(Put, namespace, key, value, errorMessage) } +func (cb *cachedBatch) Append(b KVStoreBatch) { + cb.lock.Lock() + defer cb.lock.Unlock() + b.Lock() + defer b.Unlock() + for i := 0; i < b.Size(); i++ { + wi, err := b.Entry(i) + if err != nil { + panic(err) + } + switch wi.writeType { + case Put: + cb.put(wi.namespace, wi.key, wi.value, wi.errorMessage) + case Delete: + cb.delete(wi.namespace, wi.key, wi.errorMessage) + default: + panic("unexpected write type") + } + } +} + // Delete deletes a record func (cb *cachedBatch) Delete(namespace string, key []byte, errorMessage string) { cb.lock.Lock() defer cb.lock.Unlock() + cb.delete(namespace, key, errorMessage) +} + +func (cb *cachedBatch) delete(namespace string, key []byte, errorMessage string) { h := cb.hash(namespace, key) cb.touchKey(h) cb.currentCache().Evict(&h) diff --git a/db/batch/batch_impl_test.go b/db/batch/batch_impl_test.go index 6c8520dbd6..caa825d977 100644 --- a/db/batch/batch_impl_test.go +++ b/db/batch/batch_impl_test.go @@ -71,6 +71,11 @@ func TestBaseKVStoreBatch(t *testing.T) { require.NoError(err) require.Equal("to_delete_ns", newEntry1.Namespace()) require.Equal(Put, newEntry1.WriteType()) + bb := NewBatch() + bb.Put("bb", []byte("bbkey"), []byte("bbvalue"), "") + bb.Put("bb", []byte("bbkey2"), []byte("bbvalue2"), "") + b.Append(bb) + require.Equal(4, b.Size()) b.Clear() require.Equal(0, b.Size()) } @@ -136,6 +141,11 @@ func TestCachedBatch(t *testing.T) { } return wi }).SerializeQueue(nil, nil))) + bb := NewCachedBatch() + bb.Put("bb", []byte("bbkey"), []byte("bbvalue"), "") + bb.Put("bb", []byte("bbkey2"), []byte("bbvalue2"), "") + cb.Append(bb) + require.Equal(3, cb.Size()) cb.Clear() require.Equal(0, cb.Size()) } diff --git a/db/kvstorewithbuffer.go b/db/kvstorewithbuffer.go index 2fe33d1b09..4673e7b0c3 100644 --- a/db/kvstorewithbuffer.go +++ b/db/kvstorewithbuffer.go @@ -8,7 +8,6 @@ import ( "github.com/pkg/errors" "github.com/iotexproject/iotex-core/v2/db/batch" - "github.com/iotexproject/iotex-core/v2/pkg/log" ) type ( @@ -248,38 +247,6 @@ func (kvb *kvStoreWithBuffer) Filter(ns string, cond Condition, minKey, maxKey [ } func (kvb *kvStoreWithBuffer) WriteBatch(b batch.KVStoreBatch) (err error) { - b.Lock() - defer func() { - if err == nil { - // clear the batch if commit succeeds - b.ClearAndUnlock() - } else { - b.Unlock() - } - }() - writes := make([]*batch.WriteInfo, b.Size()) - for i := 0; i < b.Size(); i++ { - write, e := b.Entry(i) - if e != nil { - return e - } - if write.WriteType() != batch.Put && write.WriteType() != batch.Delete { - return errors.Errorf("invalid write type %d", write.WriteType()) - } - writes[i] = write - } - kvb.buffer.Lock() - defer kvb.buffer.Unlock() - for _, write := range writes { - switch write.WriteType() { - case batch.Put: - kvb.buffer.Put(write.Namespace(), write.Key(), write.Value(), write.Error()) - case batch.Delete: - kvb.buffer.Delete(write.Namespace(), write.Key(), write.Error()) - default: - log.S().Panic("unexpected write type") - } - } - + kvb.buffer.Append(b) return nil } diff --git a/test/mock/mock_batch/mock_batch.go b/test/mock/mock_batch/mock_batch.go index 401b894dfb..3de5701a96 100644 --- a/test/mock/mock_batch/mock_batch.go +++ b/test/mock/mock_batch/mock_batch.go @@ -46,6 +46,18 @@ func (mr *MockKVStoreBatchMockRecorder) AddFillPercent(arg0, arg1 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddFillPercent", reflect.TypeOf((*MockKVStoreBatch)(nil).AddFillPercent), arg0, arg1) } +// Append mocks base method. +func (m *MockKVStoreBatch) Append(arg0 batch.KVStoreBatch) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Append", arg0) +} + +// Append indicates an expected call of Append. +func (mr *MockKVStoreBatchMockRecorder) Append(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Append", reflect.TypeOf((*MockKVStoreBatch)(nil).Append), arg0) +} + // CheckFillPercent mocks base method. func (m *MockKVStoreBatch) CheckFillPercent(arg0 string) (float64, bool) { m.ctrl.T.Helper() @@ -225,6 +237,18 @@ func (mr *MockCachedBatchMockRecorder) AddFillPercent(arg0, arg1 interface{}) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddFillPercent", reflect.TypeOf((*MockCachedBatch)(nil).AddFillPercent), arg0, arg1) } +// Append mocks base method. +func (m *MockCachedBatch) Append(arg0 batch.KVStoreBatch) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Append", arg0) +} + +// Append indicates an expected call of Append. +func (mr *MockCachedBatchMockRecorder) Append(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Append", reflect.TypeOf((*MockCachedBatch)(nil).Append), arg0) +} + // CheckFillPercent mocks base method. func (m *MockCachedBatch) CheckFillPercent(arg0 string) (float64, bool) { m.ctrl.T.Helper()