Skip to content

Commit 04ac63e

Browse files
committed
feat: support receipt batch
1 parent cf32a82 commit 04ac63e

File tree

3 files changed

+212
-12
lines changed

3 files changed

+212
-12
lines changed

client.go

Lines changed: 116 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ const (
4747
InvokeInterchainMethod = "invokeInterchain"
4848
InvokeInterchainsMethod = "invokeInterchains"
4949
InvokeReceiptMethod = "invokeReceipt"
50+
InvokeReceiptsMethod = "invokeReceipts"
5051
InvokeIndexUpdateMethod = "invokeIndexUpdate"
5152
InvokeGetDirectTransactionMetaMethod = "getDirectTransactionMeta"
5253
InvokerGetAppchainInfoMethod = "getAppchainInfo"
@@ -339,8 +340,39 @@ func (c *Client) GetIBTPCh() chan *pb.IBTP {
339340
return c.eventC
340341
}
341342

342-
func (c *Client) SubmitReceiptBatch(to []string, index []uint64, serviceID []string, ibtpType []pb.IBTP_Type, result []*pb.Result, proof []*pb.BxhProof) (*pb.SubmitIBTPResponse, error) {
343-
panic("implement me")
343+
func (c *Client) SubmitReceiptBatch(to []string, index []uint64, serviceID []string, ibtpTypes []pb.IBTP_Type, batchResult []*pb.Result, proofs []*pb.BxhProof) (*pb.SubmitIBTPResponse, error) {
344+
ret := &pb.SubmitIBTPResponse{Status: true}
345+
batchResults := make([][][][]byte, 0)
346+
batchMultiStatus := make([][]bool, 0)
347+
for _, result := range batchResult {
348+
var results [][][]byte
349+
for _, s := range result.Data {
350+
results = append(results, s.Data)
351+
}
352+
batchMultiStatus = append(batchMultiStatus, result.MultiStatus)
353+
batchResults = append(batchResults, results)
354+
}
355+
ibtpTyps := make([]uint64, 0)
356+
for _, ibtpType := range ibtpTypes {
357+
ibtpTyps = append(ibtpTyps, uint64(ibtpType))
358+
}
359+
360+
batchTxStatus := make([]uint64, 0)
361+
multiSigns := make([][][]byte, 0)
362+
for _, proof := range proofs {
363+
batchTxStatus = append(batchTxStatus, uint64(proof.TxStatus))
364+
multiSigns = append(multiSigns, proof.MultiSign)
365+
}
366+
367+
_, resp, err := c.InvokeReceipts(serviceID, to, index, ibtpTyps, batchResults, batchMultiStatus, batchTxStatus, multiSigns)
368+
if err != nil {
369+
ret.Status = false
370+
ret.Message = fmt.Sprintf("invoke receipt for ibtp to call: %s", err)
371+
return ret, nil
372+
}
373+
ret.Status = resp.OK
374+
ret.Message = resp.Message
375+
return ret, nil
344376
}
345377

346378
func (c *Client) SubmitIBTPBatch(from []string, index []uint64, serviceID []string, ibtpType []pb.IBTP_Type, content []*pb.Content, proof []*pb.BxhProof, isEncrypted []bool) (*pb.SubmitIBTPResponse, error) {
@@ -354,13 +386,13 @@ func (c *Client) SubmitIBTPBatch(from []string, index []uint64, serviceID []stri
354386
)
355387
for idx, ct := range content {
356388
callFunc = append(callFunc, ct.Func)
357-
args = append(args, ct.Args[1:])
389+
args = append(args, ct.Args)
358390
typ = append(typ, uint64(ibtpType[idx]))
359391
txStatus = append(txStatus, uint64(proof[idx].TxStatus))
360392
sign = append(sign, proof[idx].MultiSign)
361393
}
362394

363-
_, resp, err := c.InvokeInterchains(from, index, serviceID, typ, callFunc, args, txStatus, sign, isEncrypted)
395+
_, resp, err := c.InvokeInterchains(from, serviceID, index, typ, callFunc, args, txStatus, sign, isEncrypted)
364396
if err != nil {
365397
ret.Status = false
366398
ret.Message = fmt.Sprintf("invoke interchains failed: %s", err.Error())
@@ -445,7 +477,7 @@ func (c *Client) GetDirectTransactionMeta(IBTPid string) (uint64, uint64, uint64
445477

446478
}
447479

448-
func (c *Client) InvokeInterchains(srcFullID []string, index []uint64, destAddr []string, reqType []uint64, callFunc []string, callArgs [][][]byte, txStatus []uint64, multiSign [][][]byte, encrypt []bool) (*channel.Response, *Response, error) {
480+
func (c *Client) InvokeInterchains(srcFullID []string, destAddr []string, index []uint64, reqType []uint64, callFunc []string, callArgs [][][]byte, txStatus []uint64, multiSign [][][]byte, encrypt []bool) (*channel.Response, *Response, error) {
449481
srcFullIDBytes, err := json.Marshal(srcFullID)
450482
if err != nil {
451483
return nil, nil, err
@@ -483,7 +515,8 @@ func (c *Client) InvokeInterchains(srcFullID []string, index []uint64, destAddr
483515
return nil, nil, err
484516
}
485517

486-
args := util.ToChaincodeArgs(string(srcFullIDBytes), string(indexBytes), string(destAddrBytes), string(reqTypeBytes), string(callFuncBytes),
518+
// 0: srcFullID, 1: targetCID, 2: index, 3: typ, 4: callFunc, 5: callArgs, 6: txStatus, 7: signature, 8: isEncrypted
519+
args := util.ToChaincodeArgs(string(srcFullIDBytes), string(destAddrBytes), string(indexBytes), string(reqTypeBytes), string(callFuncBytes),
487520
string(callArgsBytes), string(txStatusBytes), string(multiSignBytes), string(encryptBytes))
488521

489522
request := channel.Request{
@@ -632,6 +665,83 @@ func (c *Client) InvokeReceipt(srcAddr string, dstFullID string, index uint64, r
632665
return &res, response, nil
633666
}
634667

668+
// InvokeReceipts call invokeReceipt multiple times
669+
func (c *Client) InvokeReceipts(srcAddrs []string, destFullIDs []string, indexs []uint64, reqTypes []uint64,
670+
batchResults [][][][]byte, batchMultiStatus [][]bool, batchTxStatus []uint64, multiSigns [][][]byte) (*channel.Response, *Response, error) {
671+
srcAddrsBytes, err := json.Marshal(srcAddrs)
672+
if err != nil {
673+
return nil, nil, err
674+
}
675+
destFullIDsBytes, err := json.Marshal(destFullIDs)
676+
if err != nil {
677+
return nil, nil, err
678+
}
679+
680+
indexsBytes, err := json.Marshal(indexs)
681+
if err != nil {
682+
return nil, nil, err
683+
}
684+
reqTypesBytes, err := json.Marshal(reqTypes)
685+
if err != nil {
686+
return nil, nil, err
687+
}
688+
689+
batchResultsBytes, err := json.Marshal(batchResults)
690+
if err != nil {
691+
return nil, nil, err
692+
}
693+
batchMultiStatusBytes, err := json.Marshal(batchMultiStatus)
694+
if err != nil {
695+
return nil, nil, err
696+
}
697+
batchTxStatusBytes, err := json.Marshal(batchTxStatus)
698+
if err != nil {
699+
return nil, nil, err
700+
}
701+
multiSignsBytes, err := json.Marshal(multiSigns)
702+
if err != nil {
703+
return nil, nil, err
704+
}
705+
// args:srcFullIDs, dstFullIDs, indexs, typs, results, multiStatuss, multiTxStatus, multiSignatures
706+
args := util.ToChaincodeArgs(string(srcAddrsBytes), string(destFullIDsBytes), string(indexsBytes), string(reqTypesBytes), string(batchResultsBytes), string(batchMultiStatusBytes), string(batchTxStatusBytes), string(multiSignsBytes))
707+
708+
request := channel.Request{
709+
ChaincodeID: c.meta.CCID,
710+
Fcn: InvokeReceiptsMethod,
711+
Args: args,
712+
}
713+
714+
// retry executing
715+
var res channel.Response
716+
if err := retry.Retry(func(attempt uint) error {
717+
res, err = c.consumer.ChannelClient.Execute(request)
718+
if err != nil {
719+
if strings.Contains(err.Error(), "Chaincode status Code: (500)") {
720+
res.ChaincodeStatus = shim.ERROR
721+
logger.Error("execute request failed", "err", err.Error())
722+
return nil
723+
}
724+
return fmt.Errorf("execute request: %w", err)
725+
}
726+
727+
return nil
728+
}, strategy.Wait(2*time.Second)); err != nil {
729+
logger.Error("Can't send rollback ibtp back to bitxhub", "error", err.Error())
730+
}
731+
732+
if err != nil {
733+
return nil, nil, err
734+
}
735+
736+
logger.Info("response", "cc status", strconv.Itoa(int(res.ChaincodeStatus)), "payload", string(res.Payload))
737+
response := &Response{}
738+
if err := json.Unmarshal(res.Payload, response); err != nil {
739+
return nil, nil, err
740+
}
741+
742+
return &res, response, nil
743+
}
744+
635745
func (c *Client) GetOutMessage(servicePair string, idx uint64) (*pb.IBTP, error) {
636746
args := util.ToChaincodeArgs(servicePair, strconv.FormatUint(idx, 10))
637747
request := channel.Request{

example/contracts.zip

-1.36 MB
Binary file not shown.

example/contracts/src/broker/broker.go

Lines changed: 96 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,8 @@ func (broker *Broker) Invoke(stub shim.ChaincodeStubInterface) pb.Response {
189189
return broker.invokeInterchains(stub, args)
190190
case "invokeReceipt":
191191
return broker.invokeReceipt(stub, args)
192+
case "invokeReceipts":
193+
return broker.invokeReceipts(stub, args)
192194
case "invokeIndexUpdate":
193195
return broker.invokeIndexUpdate(stub, args)
194196
case "EmitInterchainEvent":
@@ -211,6 +213,10 @@ func (broker *Broker) Invoke(stub shim.ChaincodeStubInterface) pb.Response {
211213
}
212214

213215
func (broker *Broker) initialize(stub shim.ChaincodeStubInterface, args []string) pb.Response {
216+
if len(args) != 3 {
217+
return shim.Error("incorrect number of arguments, expecting 3")
218+
}
219+
214220
if onlyAdmin := broker.onlyAdmin(stub); !onlyAdmin {
215221
return shim.Error(fmt.Sprintf("caller is not admin"))
216222
}
@@ -220,10 +226,6 @@ func (broker *Broker) initialize(stub shim.ChaincodeStubInterface, args []string
220226
return shim.Error(err.Error())
221227
}
222228

223-
if len(args) != 3 {
224-
return shim.Error("incorrect number of arguments, expecting 3")
225-
}
226-
227229
if err := stub.PutState(bxhID, []byte(args[0])); err != nil {
228230
return shim.Error(err.Error())
229231
}
@@ -476,6 +478,9 @@ func (broker *Broker) EmitInterchainEvent(stub shim.ChaincodeStubInterface, args
476478

477479
// 业务合约通过该接口进行注册: 0表示正在审核,1表示审核通过,2表示审核失败
478480
func (broker *Broker) register(stub shim.ChaincodeStubInterface, args []string) pb.Response {
481+
if len(args) != 1 {
482+
return shim.Error("incorrect number of arguments, expecting 1")
483+
}
479484
ordered, err := strconv.ParseBool(args[0])
480485
if err != nil {
481486
return errorResponse(fmt.Sprintf("cannot parse %s to bool", args[0]))
@@ -517,6 +522,9 @@ func (broker *Broker) register(stub shim.ChaincodeStubInterface, args []string)
517522

518523
// 通过chaincode自带的CID库可以验证调用者的相关信息
519524
func (broker *Broker) audit(stub shim.ChaincodeStubInterface, args []string) pb.Response {
525+
if len(args) != 3 {
526+
return shim.Error("incorrect number of arguments, expecting 3")
527+
}
520528
channel := args[0]
521529
chaincodeName := args[1]
522530
status := args[2]
@@ -766,7 +774,6 @@ func (broker *Broker) invokeInterchains(stub shim.ChaincodeStubInterface, args [
766774
signature [][][]byte
767775
isEncrypted []bool
768776
)
769-
770777
if err := json.Unmarshal([]byte(args[0]), &srcFullID); err != nil {
771778
return errorResponse(fmt.Sprintf("unmarshal args failed for %s", args[0]))
772779
}
@@ -794,8 +801,12 @@ func (broker *Broker) invokeInterchains(stub shim.ChaincodeStubInterface, args [
794801
if err := json.Unmarshal([]byte(args[8]), &isEncrypted); err != nil {
795802
return errorResponse(fmt.Sprintf("unmarshal args failed for %s", args[8]))
796803
}
804+
batchLen := len(srcFullID)
805+
if batchLen != len(targetCID) || batchLen != len(index) || batchLen != len(typ) || batchLen != len(callFunc) || batchLen != len(callArgs) || batchLen != len(txStatus) || batchLen != len(signature) || batchLen != len(isEncrypted) {
806+
return errorResponse("incorrect number of arguments")
807+
}
797808

798-
for idx := 0; idx < 9; idx++ {
809+
for idx := 0; idx < batchLen; idx++ {
799810
serviceOrdered, err := broker.getServiceOrderedList(stub)
800811
if err != nil {
801812
return errorResponse(fmt.Sprintf("get service orered list failed: %s", err.Error()))
@@ -1091,6 +1102,85 @@ func (broker *Broker) invokeReceipt(stub shim.ChaincodeStubInterface, args []str
10911102
return successResponse(res.Payload)
10921103
}
10931104

1105+
// invokeReceipts is used to muti time invokeReceipt
1106+
func (broker *Broker) invokeReceipts(stub shim.ChaincodeStubInterface, args []string) pb.Response {
1107+
// args:srcFullIDs, dstFullIDs, indexs, typs, results, multiStatuss, multiTxStatus, multiSignatures
1108+
if len(args) != 8 {
1109+
return errorResponse("incorrect number of arguments, expecting 8")
1110+
}
1111+
var (
1112+
srcFullIDs []string
1113+
dstFullIDs []string
1114+
indexs []uint64
1115+
typs []uint64
1116+
results [][][][]byte
1117+
multiStatus [][]bool
1118+
multiTxStatus []uint64
1119+
multiSignatures [][][]byte
1120+
)
1121+
1122+
fmt.Printf("receipts args:%v", args)
1123+
if err := json.Unmarshal([]byte(args[0]), &srcFullIDs); err != nil {
1124+
return errorResponse(fmt.Sprintf("unmarshal args failed for %s", args[0]))
1125+
}
1126+
if err := json.Unmarshal([]byte(args[1]), &dstFullIDs); err != nil {
1127+
return errorResponse(fmt.Sprintf("unmarshal args failed for %s", args[1]))
1128+
}
1129+
if err := json.Unmarshal([]byte(args[2]), &indexs); err != nil {
1130+
return errorResponse(fmt.Sprintf("unmarshal args failed for %s", args[2]))
1131+
}
1132+
if err := json.Unmarshal([]byte(args[3]), &typs); err != nil {
1133+
return errorResponse(fmt.Sprintf("unmarshal args failed for %s", args[3]))
1134+
}
1135+
if err := json.Unmarshal([]byte(args[4]), &results); err != nil {
1136+
return errorResponse(fmt.Sprintf("unmarshal args failed for %s", args[4]))
1137+
}
1138+
if err := json.Unmarshal([]byte(args[5]), &multiStatus); err != nil {
1139+
return errorResponse(fmt.Sprintf("unmarshal args failed for %s", args[5]))
1140+
}
1141+
if err := json.Unmarshal([]byte(args[6]), &multiTxStatus); err != nil {
1142+
return errorResponse(fmt.Sprintf("unmarshal args failed for %s", args[6]))
1143+
}
1144+
if err := json.Unmarshal([]byte(args[7]), &multiSignatures); err != nil {
1145+
return errorResponse(fmt.Sprintf("unmarshal args failed for %s", args[7]))
1146+
}
1147+
batchLen := len(srcFullIDs)
1148+
if batchLen != len(dstFullIDs) || batchLen != len(indexs) || batchLen != len(typs) || batchLen != len(results) || batchLen != len(multiStatus) || batchLen != len(multiTxStatus) || batchLen != len(multiSignatures) {
1149+
return errorResponse("args length not match")
1150+
}
1151+
for i := 0; i < batchLen; i++ {
1152+
resBytes, err := json.Marshal(results[i])
1153+
if err != nil {
1154+
return errorResponse("marshall results failed")
1155+
}
1156+
multiStatusBytes, err := json.Marshal(multiStatus[i])
1157+
if err != nil {
1158+
return errorResponse("marshall multiStatus failed")
1159+
}
1160+
multiSignatureBytes, err := json.Marshal(multiSignatures[i])
1161+
if err != nil {
1162+
return errorResponse("marshall multiSignatures failed")
1163+
}
1164+
1165+
var invokeArgs []string
1166+
invokeArgs = append(invokeArgs, srcFullIDs[i])
1167+
invokeArgs = append(invokeArgs, dstFullIDs[i])
1168+
invokeArgs = append(invokeArgs, strconv.FormatUint(indexs[i], 10))
1169+
invokeArgs = append(invokeArgs, strconv.FormatUint(typs[i], 10))
1170+
invokeArgs = append(invokeArgs, string(resBytes))
1171+
invokeArgs = append(invokeArgs, string(multiStatusBytes))
1172+
invokeArgs = append(invokeArgs, strconv.FormatUint(multiTxStatus[i], 10))
1173+
invokeArgs = append(invokeArgs, string(multiSignatureBytes))
1174+
1175+
resp := broker.invokeReceipt(stub, invokeArgs)
1176+
if resp.Status != shim.OK {
1177+
return errorResponse(resp.Message)
1178+
}
1179+
}
1180+
1181+
return shim.Success(nil)
1182+
}
1183+
10941184
func (broker *Broker) registerAppchain(stub shim.ChaincodeStubInterface, args []string) pb.Response {
10951185
if len(args) != 4 {
10961186
return shim.Error("incorrect number of arguments, expecting 4")

0 commit comments

Comments
 (0)