Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions db/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type (
ClearAndUnlock()
// Put insert or update a record identified by (namespace, key)
Put(string, []byte, []byte, string)
// Append appends a KVStoreBatch to the current batch
Append(KVStoreBatch)
// Delete deletes a record by (namespace, key)
Delete(string, []byte, string)
// Size returns the size of batch
Expand Down
50 changes: 50 additions & 0 deletions db/batch/batch_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,27 @@ func (b *baseKVStoreBatch) Put(namespace string, key, value []byte, errorMessage
b.batch(Put, namespace, key, value, errorMessage)
}

func (b *baseKVStoreBatch) Append(kvb KVStoreBatch) {
b.mutex.Lock()
defer b.mutex.Unlock()
kvb.Lock()
defer kvb.Unlock()
for i := 0; i < kvb.Size(); i++ {
wi, err := b.Entry(i)
if err != nil {
panic(err)
}
switch wi.writeType {
case Put:
b.batch(Put, wi.namespace, wi.key, wi.value, wi.errorMessage)
case Delete:
b.batch(Delete, wi.namespace, wi.key, nil, wi.errorMessage)
default:
panic("unexpected write type")
}
}
}

// Delete deletes a record
func (b *baseKVStoreBatch) Delete(namespace string, key []byte, errorMessage string) {
b.mutex.Lock()
Expand Down Expand Up @@ -280,16 +301,45 @@ func (cb *cachedBatch) touchKey(h kvCacheKey) {
func (cb *cachedBatch) Put(namespace string, key, value []byte, errorMessage string) {
cb.lock.Lock()
defer cb.lock.Unlock()
cb.put(namespace, key, value, errorMessage)
}

func (cb *cachedBatch) put(namespace string, key, value []byte, errorMessage string) {
h := cb.hash(namespace, key)
cb.touchKey(h)
cb.currentCache().Write(&h, value)
cb.kvStoreBatch.batch(Put, namespace, key, value, errorMessage)
}

func (cb *cachedBatch) Append(b KVStoreBatch) {
cb.lock.Lock()
defer cb.lock.Unlock()
b.Lock()
defer b.Unlock()
for i := 0; i < b.Size(); i++ {
wi, err := b.Entry(i)
if err != nil {
panic(err)
}
switch wi.writeType {
case Put:
cb.put(wi.namespace, wi.key, wi.value, wi.errorMessage)
case Delete:
cb.delete(wi.namespace, wi.key, wi.errorMessage)
default:
panic("unexpected write type")
}
}
}

// Delete deletes a record
func (cb *cachedBatch) Delete(namespace string, key []byte, errorMessage string) {
cb.lock.Lock()
defer cb.lock.Unlock()
cb.delete(namespace, key, errorMessage)
}

func (cb *cachedBatch) delete(namespace string, key []byte, errorMessage string) {
h := cb.hash(namespace, key)
cb.touchKey(h)
cb.currentCache().Evict(&h)
Expand Down
10 changes: 10 additions & 0 deletions db/batch/batch_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ func TestBaseKVStoreBatch(t *testing.T) {
require.NoError(err)
require.Equal("to_delete_ns", newEntry1.Namespace())
require.Equal(Put, newEntry1.WriteType())
bb := NewBatch()
bb.Put("bb", []byte("bbkey"), []byte("bbvalue"), "")
bb.Put("bb", []byte("bbkey2"), []byte("bbvalue2"), "")
b.Append(bb)
require.Equal(4, b.Size())
b.Clear()
require.Equal(0, b.Size())
}
Expand Down Expand Up @@ -136,6 +141,11 @@ func TestCachedBatch(t *testing.T) {
}
return wi
}).SerializeQueue(nil, nil)))
bb := NewCachedBatch()
bb.Put("bb", []byte("bbkey"), []byte("bbvalue"), "")
bb.Put("bb", []byte("bbkey2"), []byte("bbvalue2"), "")
cb.Append(bb)
require.Equal(3, cb.Size())
cb.Clear()
require.Equal(0, cb.Size())
}
Expand Down
35 changes: 1 addition & 34 deletions db/kvstorewithbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/pkg/errors"

"github.com/iotexproject/iotex-core/v2/db/batch"
"github.com/iotexproject/iotex-core/v2/pkg/log"
)

type (
Expand Down Expand Up @@ -248,38 +247,6 @@ func (kvb *kvStoreWithBuffer) Filter(ns string, cond Condition, minKey, maxKey [
}

func (kvb *kvStoreWithBuffer) WriteBatch(b batch.KVStoreBatch) (err error) {
b.Lock()
defer func() {
if err == nil {
// clear the batch if commit succeeds
b.ClearAndUnlock()
} else {
b.Unlock()
}
}()
writes := make([]*batch.WriteInfo, b.Size())
for i := 0; i < b.Size(); i++ {
write, e := b.Entry(i)
if e != nil {
return e
}
if write.WriteType() != batch.Put && write.WriteType() != batch.Delete {
return errors.Errorf("invalid write type %d", write.WriteType())
}
writes[i] = write
}
kvb.buffer.Lock()
defer kvb.buffer.Unlock()
for _, write := range writes {
switch write.WriteType() {
case batch.Put:
kvb.buffer.Put(write.Namespace(), write.Key(), write.Value(), write.Error())
case batch.Delete:
kvb.buffer.Delete(write.Namespace(), write.Key(), write.Error())
default:
log.S().Panic("unexpected write type")
}
}

kvb.buffer.Append(b)
return nil
}
24 changes: 24 additions & 0 deletions test/mock/mock_batch/mock_batch.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.