Skip to content

Commit 708de45

Browse files
committed
add put state batch
Signed-off-by: Fedor Partanskiy <[email protected]>
1 parent 299e4f5 commit 708de45

File tree

11 files changed

+322
-37
lines changed

11 files changed

+322
-37
lines changed

pkg/statebased/statebasedimpl_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func TestAddOrg(t *testing.T) {
2222

2323
// bad role type
2424
err = ep.AddOrgs("unknown", "Org1")
25-
assert.Equal(t, &statebased.RoleTypeDoesNotExistError{RoleType: statebased.RoleType("unknown")}, err)
25+
assert.Equal(t, &statebased.RoleTypeDoesNotExistError{RoleType: "unknown"}, err)
2626
assert.EqualError(t, err, "role type unknown does not exist")
2727

2828
epBytes, err := ep.Policy()

shim/chaincodeserver.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,12 @@ import (
99

1010
"github.com/hyperledger/fabric-chaincode-go/v2/shim/internal"
1111
"github.com/hyperledger/fabric-protos-go-apiv2/peer"
12-
1312
"google.golang.org/grpc/keepalive"
1413
)
1514

1615
// TLSProperties passed to ChaincodeServer
1716
type TLSProperties struct {
18-
//Disabled forces default to be TLS enabled
17+
// Disabled forces default to be TLS enabled
1918
Disabled bool
2019
Key []byte
2120
Cert []byte

shim/handler.go

Lines changed: 212 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ const (
1818
created state = "created" // start state
1919
established state = "established" // connection established
2020
ready state = "ready" // ready for requests
21+
22+
defaultMaxSizeWriteBatch = 100
23+
prefixMetaDataWriteBatch = "m"
24+
prefixStateDataWriteBatch = "s"
2125
)
2226

2327
// PeerChaincodeStream is the common stream interface for Peer - chaincode communication.
@@ -46,6 +50,13 @@ type Handler struct {
4650
cc Chaincode
4751
// state holds the current state of this handler.
4852
state state
53+
// if you can send the changes in batches.
54+
usePeerWriteBatch bool
55+
maxSizeWriteBatch uint32
56+
batchMutex sync.RWMutex
57+
batch map[string]map[string]*peer.WriteRecord
58+
startWriteBatchMutex sync.RWMutex
59+
startWriteBatch map[string]bool
4960

5061
// Multiple queries (and one transaction) with different txids can be executing in parallel for this chaincode
5162
// responseChannels is the channel on which responses are communicated by the shim to the chaincodeStub.
@@ -150,6 +161,8 @@ func newChaincodeHandler(peerChatStream PeerChaincodeStream, chaincode Chaincode
150161
cc: chaincode,
151162
responseChannels: map[string]chan *peer.ChaincodeMessage{},
152163
state: created,
164+
batch: map[string]map[string]*peer.WriteRecord{},
165+
startWriteBatch: map[string]bool{},
153166
}
154167
}
155168

@@ -188,6 +201,11 @@ func (h *Handler) handleInit(msg *peer.ChaincodeMessage) (*peer.ChaincodeMessage
188201
return nil, fmt.Errorf("failed to marshal response: %s", err)
189202
}
190203

204+
err = h.sendBatch(msg.ChannelId, msg.Txid)
205+
if err != nil {
206+
return nil, fmt.Errorf("failed send batch: %s", err)
207+
}
208+
191209
return &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_COMPLETED, Payload: resBytes, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent, ChannelId: stub.ChannelID}, nil
192210
}
193211

@@ -214,6 +232,11 @@ func (h *Handler) handleTransaction(msg *peer.ChaincodeMessage) (*peer.Chaincode
214232
return nil, fmt.Errorf("failed to marshal response: %s", err)
215233
}
216234

235+
err = h.sendBatch(msg.ChannelId, msg.Txid)
236+
if err != nil {
237+
return nil, fmt.Errorf("failed send batch: %s", err)
238+
}
239+
217240
return &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_COMPLETED, Payload: resBytes, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent, ChannelId: stub.ChannelID}, nil
218241
}
219242

@@ -312,6 +335,17 @@ func (h *Handler) handleGetStateMetadata(collection string, key string, channelI
312335

313336
// handlePutState communicates with the peer to put state information into the ledger.
314337
func (h *Handler) handlePutState(collection string, key string, value []byte, channelID string, txid string) error {
338+
if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txid) {
339+
st := h.batchByID(channelID, txid)
340+
st[prefixStateDataWriteBatch+collection+key] = &peer.WriteRecord{
341+
Key: key,
342+
Value: value,
343+
Collection: collection,
344+
Type: peer.WriteRecord_PUT_STATE,
345+
}
346+
return nil
347+
}
348+
315349
// Construct payload for PUT_STATE
316350
payloadBytes := marshalOrPanic(&peer.PutState{Collection: collection, Key: key, Value: value})
317351

@@ -340,6 +374,19 @@ func (h *Handler) handlePutState(collection string, key string, value []byte, ch
340374
func (h *Handler) handlePutStateMetadataEntry(collection string, key string, metakey string, metadata []byte, channelID string, txID string) error {
341375
// Construct payload for PUT_STATE_METADATA
342376
md := &peer.StateMetadata{Metakey: metakey, Value: metadata}
377+
378+
if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txID) {
379+
st := h.batchByID(channelID, txID)
380+
st[prefixMetaDataWriteBatch+collection+key] = &peer.WriteRecord{
381+
Key: key,
382+
Collection: collection,
383+
Metadata: md,
384+
Type: peer.WriteRecord_PUT_STATE_METADATA,
385+
}
386+
387+
return nil
388+
}
389+
343390
payloadBytes := marshalOrPanic(&peer.PutStateMetadata{Collection: collection, Key: key, Metadata: md})
344391

345392
msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_PUT_STATE_METADATA, Payload: payloadBytes, Txid: txID, ChannelId: channelID}
@@ -365,6 +412,16 @@ func (h *Handler) handlePutStateMetadataEntry(collection string, key string, met
365412

366413
// handleDelState communicates with the peer to delete a key from the state in the ledger.
367414
func (h *Handler) handleDelState(collection string, key string, channelID string, txid string) error {
415+
if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txid) {
416+
st := h.batchByID(channelID, txid)
417+
st[prefixStateDataWriteBatch+collection+key] = &peer.WriteRecord{
418+
Key: key,
419+
Collection: collection,
420+
Type: peer.WriteRecord_DEL_STATE,
421+
}
422+
return nil
423+
}
424+
368425
payloadBytes := marshalOrPanic(&peer.DelState{Collection: collection, Key: key})
369426
msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_DEL_STATE, Payload: payloadBytes, Txid: txid, ChannelId: channelID}
370427
// Execute the request and get response
@@ -388,6 +445,16 @@ func (h *Handler) handleDelState(collection string, key string, channelID string
388445

389446
// handlePurgeState communicates with the peer to purge a state from private data
390447
func (h *Handler) handlePurgeState(collection string, key string, channelID string, txid string) error {
448+
if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txid) {
449+
st := h.batchByID(channelID, txid)
450+
st[prefixStateDataWriteBatch+collection+key] = &peer.WriteRecord{
451+
Key: key,
452+
Collection: collection,
453+
Type: peer.WriteRecord_PURGE_PRIVATE_DATA,
454+
}
455+
return nil
456+
}
457+
391458
payloadBytes := marshalOrPanic(&peer.DelState{Collection: collection, Key: key})
392459
msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_PURGE_PRIVATE_DATA, Payload: payloadBytes, Txid: txid, ChannelId: channelID}
393460
// Execute the request and get response
@@ -409,6 +476,111 @@ func (h *Handler) handlePurgeState(collection string, key string, channelID stri
409476
return fmt.Errorf("[%s] incorrect chaincode message %s received. Expecting %s or %s", shorttxid(responseMsg.Txid), responseMsg.Type, peer.ChaincodeMessage_RESPONSE, peer.ChaincodeMessage_ERROR)
410477
}
411478

479+
// handleWriteBatch communicates with the peer to write batch to state all changes information into the ledger.
480+
func (h *Handler) handleWriteBatch(batch *peer.WriteBatchState, channelID string, txid string) error {
481+
// Construct payload for PUT_STATE_BATCH
482+
payloadBytes := marshalOrPanic(batch)
483+
484+
msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_WRITE_BATCH_STATE, Payload: payloadBytes, Txid: txid, ChannelId: channelID}
485+
486+
// Execute the request and get response
487+
responseMsg, err := h.callPeerWithChaincodeMsg(msg, channelID, txid)
488+
if err != nil {
489+
return fmt.Errorf("[%s] error sending %s: %s", msg.Txid, peer.ChaincodeMessage_WRITE_BATCH_STATE, err)
490+
}
491+
492+
if responseMsg.Type == peer.ChaincodeMessage_RESPONSE {
493+
// Success response
494+
return nil
495+
}
496+
497+
if responseMsg.Type == peer.ChaincodeMessage_ERROR {
498+
// Error response
499+
return fmt.Errorf("%s", responseMsg.Payload[:])
500+
}
501+
502+
// Incorrect chaincode message received
503+
return fmt.Errorf("[%s] incorrect chaincode message %s received. Expecting %s or %s", shorttxid(responseMsg.Txid), responseMsg.Type, peer.ChaincodeMessage_RESPONSE, peer.ChaincodeMessage_ERROR)
504+
}
505+
506+
func (h *Handler) sendBatch(channelID string, txid string) error {
507+
if !h.usePeerWriteBatch || !h.isStartWriteBatch(channelID, txid) {
508+
return nil
509+
}
510+
511+
st := h.batchByID(channelID, txid)
512+
txCtxID := transactionContextID(channelID, txid)
513+
514+
defer func() {
515+
h.batchMutex.Lock()
516+
h.startWriteBatchMutex.Lock()
517+
518+
delete(h.batch, txCtxID)
519+
delete(h.startWriteBatch, txCtxID)
520+
521+
h.startWriteBatchMutex.Unlock()
522+
h.batchMutex.Unlock()
523+
}()
524+
525+
batch := &peer.WriteBatchState{}
526+
for _, kv := range st {
527+
batch.Rec = append(batch.Rec, kv)
528+
if len(batch.Rec) >= int(h.maxSizeWriteBatch) {
529+
err := h.handleWriteBatch(batch, channelID, txid)
530+
if err != nil {
531+
return fmt.Errorf("failed send batch: %s", err)
532+
}
533+
batch.Rec = batch.Rec[:0]
534+
}
535+
}
536+
537+
if len(batch.Rec) != 0 {
538+
err := h.handleWriteBatch(batch, channelID, txid)
539+
if err != nil {
540+
return fmt.Errorf("failed send batch: %s", err)
541+
}
542+
}
543+
544+
return nil
545+
}
546+
547+
func (h *Handler) handleStartWriteBatch(channelID string, txID string) error {
548+
if !h.usePeerWriteBatch {
549+
return errors.New("peer does not support write batch")
550+
}
551+
552+
txCtxID := transactionContextID(channelID, txID)
553+
h.startWriteBatchMutex.Lock()
554+
defer h.startWriteBatchMutex.Unlock()
555+
556+
h.startWriteBatch[txCtxID] = true
557+
return nil
558+
}
559+
560+
func (h *Handler) handleFinishWriteBatch(channelID string, txID string) error {
561+
return h.sendBatch(channelID, txID)
562+
}
563+
564+
func (h *Handler) handleCancelWriteBatch(channelID string, txID string) {
565+
if !h.usePeerWriteBatch || !h.isStartWriteBatch(channelID, txID) {
566+
return
567+
}
568+
569+
txCtxID := transactionContextID(channelID, txID)
570+
571+
h.batchMutex.Lock()
572+
delete(h.batch, txCtxID)
573+
h.batchMutex.Unlock()
574+
}
575+
576+
func (h *Handler) isStartWriteBatch(channelID string, txID string) bool {
577+
txCtxID := transactionContextID(channelID, txID)
578+
h.startWriteBatchMutex.RLock()
579+
defer h.startWriteBatchMutex.RUnlock()
580+
581+
return h.startWriteBatch[txCtxID]
582+
}
583+
412584
func (h *Handler) handleGetStateByRange(collection, startKey, endKey string, metadata []byte,
413585
channelID string, txid string) (*peer.QueryResponse, error) {
414586
// Send GET_STATE_BY_RANGE message to peer chaincode support
@@ -655,6 +827,23 @@ func (h *Handler) handleEstablished(msg *peer.ChaincodeMessage) error {
655827
}
656828

657829
h.state = ready
830+
if len(msg.Payload) == 0 {
831+
return nil
832+
}
833+
834+
ccAdditionalParams := &peer.ChaincodeAdditionalParams{}
835+
err := proto.Unmarshal(msg.Payload, ccAdditionalParams)
836+
if err != nil {
837+
return nil
838+
}
839+
840+
h.usePeerWriteBatch = ccAdditionalParams.UseWriteBatch
841+
h.maxSizeWriteBatch = ccAdditionalParams.MaxSizeWriteBatch
842+
843+
if h.usePeerWriteBatch && h.maxSizeWriteBatch < defaultMaxSizeWriteBatch {
844+
h.maxSizeWriteBatch = defaultMaxSizeWriteBatch
845+
}
846+
658847
return nil
659848
}
660849

@@ -697,7 +886,29 @@ func (h *Handler) handleMessage(msg *peer.ChaincodeMessage, errc chan error) err
697886
return nil
698887
}
699888

700-
// marshalOrPanic attempts to marshal the provided protobbuf message but will panic
889+
func (h *Handler) batchByID(channelID string, txID string) map[string]*peer.WriteRecord {
890+
txCtxID := transactionContextID(channelID, txID)
891+
892+
h.batchMutex.RLock()
893+
st, ok := h.batch[txCtxID]
894+
h.batchMutex.RUnlock()
895+
if ok {
896+
return st
897+
}
898+
899+
h.batchMutex.Lock()
900+
defer h.batchMutex.Unlock()
901+
st, ok = h.batch[txCtxID]
902+
if ok {
903+
return st
904+
}
905+
906+
st = make(map[string]*peer.WriteRecord)
907+
h.batch[txCtxID] = st
908+
return st
909+
}
910+
911+
// marshalOrPanic attempts to marshal the provided protobuf message but will panic
701912
// when marshaling fails instead of returning an error.
702913
func marshalOrPanic(msg proto.Message) []byte {
703914
bytes, err := proto.Marshal(msg)

0 commit comments

Comments
 (0)