Skip to content

Commit 63ae89a

Browse files
Refactor write batching
Avoid holding transaction-specific state in the handler, which must then be protected by multiple locks. This is error prone and is a potential performance bottleneck. Instead, extract batch writes to a separate struct for clarity, and maintain the batch write state in the stub, which is unique to a specific transaction so requires no locking. The stub (and associated batch write state) is also discarded after a transaction invocation so there is no risk of memory leaks due to redundant batch write state being left in the shared handler. Signed-off-by: Mark S. Lewis <[email protected]>
1 parent e713f9f commit 63ae89a

File tree

5 files changed

+130
-136
lines changed

5 files changed

+130
-136
lines changed

shim/batch.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright the Hyperledger Fabric contributors. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package shim
5+
6+
import (
7+
"github.com/hyperledger/fabric-protos-go-apiv2/peer"
8+
)
9+
10+
type writeBatch struct {
11+
writes map[string]*peer.WriteRecord
12+
}
13+
14+
func newWriteBatch() *writeBatch {
15+
return &writeBatch{
16+
writes: make(map[string]*peer.WriteRecord),
17+
}
18+
}
19+
20+
func (b *writeBatch) Writes() []*peer.WriteRecord {
21+
if b == nil {
22+
return nil
23+
}
24+
25+
var results []*peer.WriteRecord
26+
for _, value := range b.writes {
27+
results = append(results, value)
28+
}
29+
30+
return results
31+
}
32+
33+
func (b *writeBatch) PutState(collection string, key string, value []byte) {
34+
b.writes[batchLedgerKey(collection, key)] = &peer.WriteRecord{
35+
Key: key,
36+
Value: value,
37+
Collection: collection,
38+
Type: peer.WriteRecord_PUT_STATE,
39+
}
40+
}
41+
42+
func (b *writeBatch) PutStateMetadataEntry(collection string, key string, metakey string, metadata []byte) {
43+
b.writes[batchLedgerKey(collection, key)] = &peer.WriteRecord{
44+
Key: key,
45+
Collection: collection,
46+
Metadata: &peer.StateMetadata{Metakey: metakey, Value: metadata},
47+
Type: peer.WriteRecord_PUT_STATE_METADATA,
48+
}
49+
}
50+
51+
func (b *writeBatch) DelState(collection string, key string) {
52+
b.writes[batchLedgerKey(collection, key)] = &peer.WriteRecord{
53+
Key: key,
54+
Collection: collection,
55+
Type: peer.WriteRecord_DEL_STATE,
56+
}
57+
}
58+
59+
func (b *writeBatch) PurgeState(collection string, key string) {
60+
b.writes[batchLedgerKey(collection, key)] = &peer.WriteRecord{
61+
Key: key,
62+
Collection: collection,
63+
Type: peer.WriteRecord_PURGE_PRIVATE_DATA,
64+
}
65+
}
66+
67+
func batchLedgerKey(collection string, key string) string {
68+
return prefixStateDataWriteBatch + collection + key
69+
}

shim/handler.go

Lines changed: 6 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,8 @@ type Handler struct {
5151
// state holds the current state of this handler.
5252
state state
5353
// if you can send the changes in batches.
54-
usePeerWriteBatch bool
55-
maxSizeWriteBatch uint32
56-
batchMutex sync.RWMutex
57-
batch map[string]map[string]*peer.WriteRecord
58-
startWriteBatchMutex sync.RWMutex
59-
startWriteBatch map[string]bool
54+
usePeerWriteBatch bool
55+
maxSizeWriteBatch uint32
6056

6157
// Multiple queries (and one transaction) with different txids can be executing in parallel for this chaincode
6258
// responseChannels is the channel on which responses are communicated by the shim to the chaincodeStub.
@@ -161,8 +157,6 @@ func newChaincodeHandler(peerChatStream PeerChaincodeStream, chaincode Chaincode
161157
cc: chaincode,
162158
responseChannels: map[string]chan *peer.ChaincodeMessage{},
163159
state: created,
164-
batch: map[string]map[string]*peer.WriteRecord{},
165-
startWriteBatch: map[string]bool{},
166160
}
167161
}
168162

@@ -201,8 +195,7 @@ func (h *Handler) handleInit(msg *peer.ChaincodeMessage) (*peer.ChaincodeMessage
201195
return nil, fmt.Errorf("failed to marshal response: %s", err)
202196
}
203197

204-
err = h.sendBatch(msg.ChannelId, msg.Txid)
205-
if err != nil {
198+
if err := stub.FinishWriteBatch(); err != nil {
206199
return nil, fmt.Errorf("failed send batch: %s", err)
207200
}
208201

@@ -232,8 +225,7 @@ func (h *Handler) handleTransaction(msg *peer.ChaincodeMessage) (*peer.Chaincode
232225
return nil, fmt.Errorf("failed to marshal response: %s", err)
233226
}
234227

235-
err = h.sendBatch(msg.ChannelId, msg.Txid)
236-
if err != nil {
228+
if err := stub.FinishWriteBatch(); err != nil {
237229
return nil, fmt.Errorf("failed send batch: %s", err)
238230
}
239231

@@ -335,17 +327,6 @@ func (h *Handler) handleGetStateMetadata(collection string, key string, channelI
335327

336328
// handlePutState communicates with the peer to put state information into the ledger.
337329
func (h *Handler) handlePutState(collection string, key string, value []byte, channelID string, txid string) error {
338-
if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txid) {
339-
st := h.batchByID(channelID, txid)
340-
st[prefixStateDataWriteBatch+collection+key] = &peer.WriteRecord{
341-
Key: key,
342-
Value: value,
343-
Collection: collection,
344-
Type: peer.WriteRecord_PUT_STATE,
345-
}
346-
return nil
347-
}
348-
349330
// Construct payload for PUT_STATE
350331
payloadBytes := marshalOrPanic(&peer.PutState{Collection: collection, Key: key, Value: value})
351332

@@ -375,18 +356,6 @@ func (h *Handler) handlePutStateMetadataEntry(collection string, key string, met
375356
// Construct payload for PUT_STATE_METADATA
376357
md := &peer.StateMetadata{Metakey: metakey, Value: metadata}
377358

378-
if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txID) {
379-
st := h.batchByID(channelID, txID)
380-
st[prefixMetaDataWriteBatch+collection+key] = &peer.WriteRecord{
381-
Key: key,
382-
Collection: collection,
383-
Metadata: md,
384-
Type: peer.WriteRecord_PUT_STATE_METADATA,
385-
}
386-
387-
return nil
388-
}
389-
390359
payloadBytes := marshalOrPanic(&peer.PutStateMetadata{Collection: collection, Key: key, Metadata: md})
391360

392361
msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_PUT_STATE_METADATA, Payload: payloadBytes, Txid: txID, ChannelId: channelID}
@@ -412,16 +381,6 @@ func (h *Handler) handlePutStateMetadataEntry(collection string, key string, met
412381

413382
// handleDelState communicates with the peer to delete a key from the state in the ledger.
414383
func (h *Handler) handleDelState(collection string, key string, channelID string, txid string) error {
415-
if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txid) {
416-
st := h.batchByID(channelID, txid)
417-
st[prefixStateDataWriteBatch+collection+key] = &peer.WriteRecord{
418-
Key: key,
419-
Collection: collection,
420-
Type: peer.WriteRecord_DEL_STATE,
421-
}
422-
return nil
423-
}
424-
425384
payloadBytes := marshalOrPanic(&peer.DelState{Collection: collection, Key: key})
426385
msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_DEL_STATE, Payload: payloadBytes, Txid: txid, ChannelId: channelID}
427386
// Execute the request and get response
@@ -445,16 +404,6 @@ func (h *Handler) handleDelState(collection string, key string, channelID string
445404

446405
// handlePurgeState communicates with the peer to purge a state from private data
447406
func (h *Handler) handlePurgeState(collection string, key string, channelID string, txid string) error {
448-
if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txid) {
449-
st := h.batchByID(channelID, txid)
450-
st[prefixStateDataWriteBatch+collection+key] = &peer.WriteRecord{
451-
Key: key,
452-
Collection: collection,
453-
Type: peer.WriteRecord_PURGE_PRIVATE_DATA,
454-
}
455-
return nil
456-
}
457-
458407
payloadBytes := marshalOrPanic(&peer.DelState{Collection: collection, Key: key})
459408
msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_PURGE_PRIVATE_DATA, Payload: payloadBytes, Txid: txid, ChannelId: channelID}
460409
// Execute the request and get response
@@ -503,27 +452,9 @@ func (h *Handler) handleWriteBatch(batch *peer.WriteBatchState, channelID string
503452
return fmt.Errorf("[%s] incorrect chaincode message %s received. Expecting %s or %s", shorttxid(responseMsg.Txid), responseMsg.Type, peer.ChaincodeMessage_RESPONSE, peer.ChaincodeMessage_ERROR)
504453
}
505454

506-
func (h *Handler) sendBatch(channelID string, txid string) error {
507-
if !h.usePeerWriteBatch || !h.isStartWriteBatch(channelID, txid) {
508-
return nil
509-
}
510-
511-
st := h.batchByID(channelID, txid)
512-
txCtxID := transactionContextID(channelID, txid)
513-
514-
defer func() {
515-
h.batchMutex.Lock()
516-
h.startWriteBatchMutex.Lock()
517-
518-
delete(h.batch, txCtxID)
519-
delete(h.startWriteBatch, txCtxID)
520-
521-
h.startWriteBatchMutex.Unlock()
522-
h.batchMutex.Unlock()
523-
}()
524-
455+
func (h *Handler) sendBatch(channelID string, txid string, writes []*peer.WriteRecord) error {
525456
batch := &peer.WriteBatchState{}
526-
for _, kv := range st {
457+
for _, kv := range writes {
527458
batch.Rec = append(batch.Rec, kv)
528459
if len(batch.Rec) >= int(h.maxSizeWriteBatch) {
529460
err := h.handleWriteBatch(batch, channelID, txid)
@@ -544,30 +475,6 @@ func (h *Handler) sendBatch(channelID string, txid string) error {
544475
return nil
545476
}
546477

547-
func (h *Handler) handleStartWriteBatch(channelID string, txID string) {
548-
if !h.usePeerWriteBatch {
549-
return
550-
}
551-
552-
txCtxID := transactionContextID(channelID, txID)
553-
h.startWriteBatchMutex.Lock()
554-
defer h.startWriteBatchMutex.Unlock()
555-
556-
h.startWriteBatch[txCtxID] = true
557-
}
558-
559-
func (h *Handler) handleFinishWriteBatch(channelID string, txID string) error {
560-
return h.sendBatch(channelID, txID)
561-
}
562-
563-
func (h *Handler) isStartWriteBatch(channelID string, txID string) bool {
564-
txCtxID := transactionContextID(channelID, txID)
565-
h.startWriteBatchMutex.RLock()
566-
defer h.startWriteBatchMutex.RUnlock()
567-
568-
return h.startWriteBatch[txCtxID]
569-
}
570-
571478
func (h *Handler) handleGetStateByRange(collection, startKey, endKey string, metadata []byte,
572479
channelID string, txid string) (*peer.QueryResponse, error) {
573480
// Send GET_STATE_BY_RANGE message to peer chaincode support
@@ -873,28 +780,6 @@ func (h *Handler) handleMessage(msg *peer.ChaincodeMessage, errc chan error) err
873780
return nil
874781
}
875782

876-
func (h *Handler) batchByID(channelID string, txID string) map[string]*peer.WriteRecord {
877-
txCtxID := transactionContextID(channelID, txID)
878-
879-
h.batchMutex.RLock()
880-
st, ok := h.batch[txCtxID]
881-
h.batchMutex.RUnlock()
882-
if ok {
883-
return st
884-
}
885-
886-
h.batchMutex.Lock()
887-
defer h.batchMutex.Unlock()
888-
st, ok = h.batch[txCtxID]
889-
if ok {
890-
return st
891-
}
892-
893-
st = make(map[string]*peer.WriteRecord)
894-
h.batch[txCtxID] = st
895-
return st
896-
}
897-
898783
// marshalOrPanic attempts to marshal the provided protobuf message but will panic
899784
// when marshaling fails instead of returning an error.
900785
func marshalOrPanic(msg proto.Message) []byte {

shim/handler_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ func TestNewHandler_CreatedState(t *testing.T) {
4848
cc: cc,
4949
responseChannels: map[string]chan *peer.ChaincodeMessage{},
5050
state: created,
51-
batch: map[string]map[string]*peer.WriteRecord{},
52-
startWriteBatch: map[string]bool{},
5351
}
5452

5553
handler := newChaincodeHandler(chatStream, cc)

0 commit comments

Comments
 (0)