Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions db/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type (
ClearAndUnlock()
// Put insert or update a record identified by (namespace, key)
Put(string, []byte, []byte, string)
// Append appends a KVStoreBatch to the current batch
Append(KVStoreBatch)
// Delete deletes a record by (namespace, key)
Delete(string, []byte, string)
// Size returns the size of batch
Expand Down
50 changes: 50 additions & 0 deletions db/batch/batch_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,27 @@ func (b *baseKVStoreBatch) Put(namespace string, key, value []byte, errorMessage
b.batch(Put, namespace, key, value, errorMessage)
}

func (b *baseKVStoreBatch) Append(kvb KVStoreBatch) {
b.mutex.Lock()
defer b.mutex.Unlock()
kvb.Lock()
defer kvb.Unlock()
for i := range kvb.Size() {
wi, err := b.Entry(i)
if err != nil {
panic(err)
}
switch wi.writeType {
case Put:
b.batch(Put, wi.namespace, wi.key, wi.value, wi.errorMessage)
case Delete:
b.batch(Delete, wi.namespace, wi.key, nil, wi.errorMessage)
default:
panic("unexpected write type")
}
}
}

// Delete deletes a record
func (b *baseKVStoreBatch) Delete(namespace string, key []byte, errorMessage string) {
b.mutex.Lock()
Expand Down Expand Up @@ -280,16 +301,45 @@ func (cb *cachedBatch) touchKey(h kvCacheKey) {
func (cb *cachedBatch) Put(namespace string, key, value []byte, errorMessage string) {
cb.lock.Lock()
defer cb.lock.Unlock()
cb.put(namespace, key, value, errorMessage)
}

func (cb *cachedBatch) put(namespace string, key, value []byte, errorMessage string) {
h := cb.hash(namespace, key)
cb.touchKey(h)
cb.currentCache().Write(&h, value)
cb.kvStoreBatch.batch(Put, namespace, key, value, errorMessage)
}

func (cb *cachedBatch) Append(b KVStoreBatch) {
cb.lock.Lock()
defer cb.lock.Unlock()
b.Lock()
defer b.Unlock()
for i := range b.Size() {
wi, err := b.Entry(i)
if err != nil {
panic(err)
}
switch wi.writeType {
case Put:
cb.put(wi.namespace, wi.key, wi.value, wi.errorMessage)
case Delete:
cb.delete(wi.namespace, wi.key, wi.errorMessage)
default:
panic("unexpected write type")
}
}
}

// Delete deletes a record
func (cb *cachedBatch) Delete(namespace string, key []byte, errorMessage string) {
cb.lock.Lock()
defer cb.lock.Unlock()
cb.delete(namespace, key, errorMessage)
}

func (cb *cachedBatch) delete(namespace string, key []byte, errorMessage string) {
h := cb.hash(namespace, key)
cb.touchKey(h)
cb.currentCache().Evict(&h)
Expand Down
10 changes: 10 additions & 0 deletions db/batch/batch_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ func TestBaseKVStoreBatch(t *testing.T) {
require.NoError(err)
require.Equal("to_delete_ns", newEntry1.Namespace())
require.Equal(Put, newEntry1.WriteType())
bb := NewBatch()
bb.Put("bb", []byte("bbkey"), []byte("bbvalue"), "")
bb.Put("bb", []byte("bbkey2"), []byte("bbvalue2"), "")
b.Append(bb)
require.Equal(4, b.Size())
b.Clear()
require.Equal(0, b.Size())
}
Expand Down Expand Up @@ -136,6 +141,11 @@ func TestCachedBatch(t *testing.T) {
}
return wi
}).SerializeQueue(nil, nil)))
bb := NewCachedBatch()
bb.Put("bb", []byte("bbkey"), []byte("bbvalue"), "")
bb.Put("bb", []byte("bbkey2"), []byte("bbvalue2"), "")
cb.Append(bb)
require.Equal(3, cb.Size())
cb.Clear()
require.Equal(0, cb.Size())
}
Expand Down
35 changes: 1 addition & 34 deletions db/kvstorewithbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/pkg/errors"

"github.com/iotexproject/iotex-core/v2/db/batch"
"github.com/iotexproject/iotex-core/v2/pkg/log"
)

type (
Expand Down Expand Up @@ -248,38 +247,6 @@ func (kvb *kvStoreWithBuffer) Filter(ns string, cond Condition, minKey, maxKey [
}

func (kvb *kvStoreWithBuffer) WriteBatch(b batch.KVStoreBatch) (err error) {
b.Lock()
defer func() {
if err == nil {
// clear the batch if commit succeeds
b.ClearAndUnlock()
} else {
b.Unlock()
}
}()
writes := make([]*batch.WriteInfo, b.Size())
for i := 0; i < b.Size(); i++ {
write, e := b.Entry(i)
if e != nil {
return e
}
if write.WriteType() != batch.Put && write.WriteType() != batch.Delete {
return errors.Errorf("invalid write type %d", write.WriteType())
}
writes[i] = write
}
kvb.buffer.Lock()
defer kvb.buffer.Unlock()
for _, write := range writes {
switch write.WriteType() {
case batch.Put:
kvb.buffer.Put(write.Namespace(), write.Key(), write.Value(), write.Error())
case batch.Delete:
kvb.buffer.Delete(write.Namespace(), write.Key(), write.Error())
default:
log.S().Panic("unexpected write type")
}
}

kvb.buffer.Append(b)
return nil
}
24 changes: 24 additions & 0 deletions test/mock/mock_batch/mock_batch.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.