Skip to content

Commit 8820141

Browse files
committed
cre aptos: stabilize write tx-hash resolution and enforce failed-write hash verification in suite
1 parent a39120e commit 8820141

File tree

21 files changed

+814
-60
lines changed

21 files changed

+814
-60
lines changed

core/capabilities/remote/executable/request/client_request.go

Lines changed: 244 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"errors"
88
"fmt"
99
"sort"
10+
"strings"
1011
"sync"
1112
"time"
1213

@@ -17,6 +18,7 @@ import (
1718
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
1819
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
1920
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
21+
aptoscap "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/aptos"
2022
"github.com/smartcontractkit/chainlink-common/pkg/logger"
2123
"github.com/smartcontractkit/chainlink-protos/workflows/go/events"
2224

@@ -46,9 +48,18 @@ type ClientRequest struct {
4648

4749
requiredIdenticalResponses int
4850
remoteNodeCount int
51+
failedHashQuorumResponses int
52+
capabilityID string
53+
capabilityMethod string
4954

5055
requestTimeout time.Duration
5156

57+
aptosFailedHashCount map[string]int
58+
aptosFailedHashPayload map[string][]byte
59+
aptosFailedHashMetering map[string][]commoncap.MeteringNodeDetail
60+
aptosFailedTotal int
61+
aptosWriteSuccessSeen bool
62+
5263
respSent bool
5364
mux sync.Mutex
5465
wg *sync.WaitGroup
@@ -183,17 +194,28 @@ func newClientRequest(ctx context.Context, lggr logger.Logger, requestID string,
183194
}(ctxWithCancel, peerID, delay)
184195
}
185196

197+
failedHashQuorumResponses := min(len(remoteCapabilityDonInfo.Members), int(2*remoteCapabilityDonInfo.F+1))
198+
if failedHashQuorumResponses < 1 {
199+
failedHashQuorumResponses = 1
200+
}
201+
186202
return &ClientRequest{
187203
id: requestID,
188204
cancelFn: cancelFn,
189205
createdAt: time.Now(),
190206
requestTimeout: requestTimeout,
191207
requiredIdenticalResponses: int(remoteCapabilityDonInfo.F + 1),
192208
remoteNodeCount: len(remoteCapabilityDonInfo.Members),
209+
failedHashQuorumResponses: failedHashQuorumResponses,
210+
capabilityID: remoteCapabilityInfo.ID,
211+
capabilityMethod: capMethodName,
193212
responseIDCount: make(map[[32]byte]int),
194213
meteringResponses: make(map[[32]byte][]commoncap.MeteringNodeDetail),
195214
errorCount: make(map[string]int),
196215
responseReceived: responseReceived,
216+
aptosFailedHashCount: make(map[string]int),
217+
aptosFailedHashPayload: make(map[string][]byte),
218+
aptosFailedHashMetering: make(map[string][]commoncap.MeteringNodeDetail),
197219
responseCh: make(chan clientResponse, 1),
198220
wg: &wg,
199221
lggr: lggr,
@@ -263,6 +285,14 @@ func (c *ClientRequest) Cancel(err error) {
263285
c.mux.Lock()
264286
defer c.mux.Unlock()
265287
if !c.respSent {
288+
payload, ok, buildErr := c.buildDeterministicAptosFailedResponse(true)
289+
if buildErr != nil {
290+
c.lggr.Warnw("failed to build deterministic Aptos failed response", "error", buildErr)
291+
}
292+
if ok {
293+
c.sendResponse(clientResponse{Result: payload})
294+
return
295+
}
266296
c.sendResponse(clientResponse{Err: err})
267297
}
268298
}
@@ -330,14 +360,27 @@ func (c *ClientRequest) OnMessage(_ context.Context, msg *types.MessageBody) err
330360
lggr.Warnw("received multiple unique responses for the same request", "count for responseID", len(c.responseIDCount))
331361
}
332362

333-
if c.responseIDCount[responseID] == c.requiredIdenticalResponses {
363+
if c.responseIDCount[responseID] == c.requiredIdenticalResponses && !c.shouldDeferIdenticalResponse(msg.Payload) {
334364
payload, err := c.encodePayloadWithMetadata(msg, commoncap.ResponseMetadata{Metering: nodeReports})
335365
if err != nil {
336366
return fmt.Errorf("failed to encode payload with metadata: %w", err)
337367
}
338368

339369
c.sendResponse(clientResponse{Result: payload})
340370
}
371+
372+
if !c.respSent {
373+
c.recordAptosFailedResponse(msg, metadata)
374+
payload, ok, buildErr := c.buildDeterministicAptosFailedResponse(c.allResponsesReceived())
375+
if buildErr != nil {
376+
return fmt.Errorf("failed to build deterministic Aptos failed response: %w", buildErr)
377+
}
378+
if ok {
379+
c.sendResponse(clientResponse{Result: payload})
380+
} else if err := c.maybeFinalizeAptosWriteAfterAllResponses(); err != nil {
381+
return err
382+
}
383+
}
341384
} else {
342385
c.lggr.Debugw("received error from peer", "error", msg.Error, "errorMsg", msg.ErrorMsg, "peer", sender)
343386
c.errorCount[msg.ErrorMsg]++
@@ -347,6 +390,13 @@ func (c *ClientRequest) OnMessage(_ context.Context, msg *types.MessageBody) err
347390
c.lggr.Warn("received multiple different errors for the same request, number of different errors received: %d", len(c.errorCount))
348391
}
349392

393+
if c.isAptosWriteReportRequest() {
394+
if err := c.maybeFinalizeAptosWriteAfterAllResponses(); err != nil {
395+
return err
396+
}
397+
return nil
398+
}
399+
350400
if c.errorCount[msg.ErrorMsg] == c.requiredIdenticalResponses {
351401
c.sendResponse(clientResponse{Err: fmt.Errorf("%s : %s", msg.Error, msg.ErrorMsg)})
352402
} else if c.totalErrorCount == c.remoteNodeCount-c.requiredIdenticalResponses+1 {
@@ -396,3 +446,196 @@ func (c *ClientRequest) encodePayloadWithMetadata(msg *types.MessageBody, metada
396446

397447
return pb.MarshalCapabilityResponse(resp)
398448
}
449+
450+
func (c *ClientRequest) isAptosWriteReportRequest() bool {
451+
if c.capabilityMethod != "WriteReport" {
452+
return false
453+
}
454+
455+
return strings.HasPrefix(c.capabilityID, "aptos:")
456+
}
457+
458+
func (c *ClientRequest) shouldDeferIdenticalResponse(payload []byte) bool {
459+
if !c.isAptosWriteReportRequest() {
460+
return false
461+
}
462+
463+
reply, ok := decodeAptosWriteReportReply(payload)
464+
if !ok {
465+
return false
466+
}
467+
468+
return reply.GetTxStatus() == aptoscap.TxStatus_TX_STATUS_FAILED
469+
}
470+
471+
func (c *ClientRequest) allResponsesReceived() bool {
472+
if len(c.responseReceived) == 0 {
473+
return false
474+
}
475+
476+
for _, received := range c.responseReceived {
477+
if !received {
478+
return false
479+
}
480+
}
481+
482+
return true
483+
}
484+
485+
func (c *ClientRequest) recordAptosFailedResponse(msg *types.MessageBody, metadata commoncap.ResponseMetadata) {
486+
if !c.isAptosWriteReportRequest() {
487+
return
488+
}
489+
490+
reply, ok := decodeAptosWriteReportReply(msg.Payload)
491+
if !ok {
492+
return
493+
}
494+
if reply.GetTxStatus() == aptoscap.TxStatus_TX_STATUS_SUCCESS {
495+
c.aptosWriteSuccessSeen = true
496+
return
497+
}
498+
if reply.GetTxStatus() != aptoscap.TxStatus_TX_STATUS_FAILED || len(reply.GetTxHash()) == 0 {
499+
return
500+
}
501+
502+
normalizedHash, ok := normalizeAptosTxHash(reply.GetTxHash())
503+
if !ok {
504+
return
505+
}
506+
507+
c.aptosFailedHashCount[normalizedHash]++
508+
c.aptosFailedTotal++
509+
510+
if _, exists := c.aptosFailedHashPayload[normalizedHash]; !exists {
511+
c.aptosFailedHashPayload[normalizedHash] = append([]byte(nil), msg.Payload...)
512+
}
513+
514+
if len(metadata.Metering) > 0 {
515+
c.aptosFailedHashMetering[normalizedHash] = append(c.aptosFailedHashMetering[normalizedHash], metadata.Metering...)
516+
}
517+
}
518+
519+
func (c *ClientRequest) buildDeterministicAptosFailedResponse(allowNoQuorum bool) ([]byte, bool, error) {
520+
if !c.isAptosWriteReportRequest() {
521+
return nil, false, nil
522+
}
523+
if c.aptosWriteSuccessSeen {
524+
return nil, false, nil
525+
}
526+
527+
if len(c.aptosFailedHashCount) == 0 {
528+
return nil, false, nil
529+
}
530+
if !allowNoQuorum && c.aptosFailedTotal < c.failedHashQuorumResponses {
531+
return nil, false, nil
532+
}
533+
534+
selectedHash := ""
535+
for hash := range c.aptosFailedHashPayload {
536+
if selectedHash == "" || hash < selectedHash {
537+
selectedHash = hash
538+
}
539+
}
540+
541+
if selectedHash == "" {
542+
return nil, false, nil
543+
}
544+
545+
selectedPayload, ok := c.aptosFailedHashPayload[selectedHash]
546+
if !ok || len(selectedPayload) == 0 {
547+
return nil, false, nil
548+
}
549+
550+
nodeReports := c.aptosFailedHashMetering[selectedHash]
551+
metadata := commoncap.ResponseMetadata{
552+
Metering: append([]commoncap.MeteringNodeDetail(nil), nodeReports...),
553+
}
554+
555+
payload, err := buildAptosFailedWriteReportPayload(selectedPayload, selectedHash, metadata)
556+
if err != nil {
557+
return nil, false, err
558+
}
559+
560+
return payload, true, nil
561+
}
562+
563+
func decodeAptosWriteReportReply(payload []byte) (*aptoscap.WriteReportReply, bool) {
564+
resp, err := pb.UnmarshalCapabilityResponse(payload)
565+
if err != nil {
566+
return nil, false
567+
}
568+
569+
reply := &aptoscap.WriteReportReply{}
570+
if _, err := commoncap.UnwrapResponse(resp, reply); err != nil {
571+
return nil, false
572+
}
573+
574+
return reply, true
575+
}
576+
577+
func buildAptosFailedWriteReportPayload(payload []byte, normalizedHash string, metadata commoncap.ResponseMetadata) ([]byte, error) {
578+
resp, err := pb.UnmarshalCapabilityResponse(payload)
579+
if err != nil {
580+
return nil, err
581+
}
582+
583+
reply := &aptoscap.WriteReportReply{}
584+
migrated, err := commoncap.UnwrapResponse(resp, reply)
585+
if err != nil {
586+
return nil, err
587+
}
588+
589+
reply.TxStatus = aptoscap.TxStatus_TX_STATUS_FAILED
590+
reply.TxHash = []byte("0x" + normalizedHash)
591+
592+
if err := commoncap.SetResponse(&resp, migrated, reply); err != nil {
593+
return nil, err
594+
}
595+
596+
resp.Metadata = metadata
597+
return pb.MarshalCapabilityResponse(resp)
598+
}
599+
600+
func normalizeAptosTxHash(raw []byte) (string, bool) {
601+
if len(raw) == 32 {
602+
return hex.EncodeToString(raw), true
603+
}
604+
605+
s := strings.TrimSpace(strings.ToLower(string(raw)))
606+
s = strings.TrimPrefix(s, "0x")
607+
if len(s) != 64 {
608+
return "", false
609+
}
610+
611+
if _, err := hex.DecodeString(s); err != nil {
612+
return "", false
613+
}
614+
615+
return s, true
616+
}
617+
618+
func (c *ClientRequest) maybeFinalizeAptosWriteAfterAllResponses() error {
619+
if !c.isAptosWriteReportRequest() || c.respSent || !c.allResponsesReceived() {
620+
return nil
621+
}
622+
623+
payload, ok, buildErr := c.buildDeterministicAptosFailedResponse(true)
624+
if buildErr != nil {
625+
return fmt.Errorf("failed to build deterministic Aptos failed response: %w", buildErr)
626+
}
627+
if ok {
628+
c.sendResponse(clientResponse{Result: payload})
629+
return nil
630+
}
631+
632+
c.sendResponse(clientResponse{
633+
Err: fmt.Errorf(
634+
"received all Aptos write responses without deterministic failed hash (okResponses=%d errorResponses=%d failedHashResponses=%d)",
635+
len(c.responseIDCount),
636+
c.totalErrorCount,
637+
c.aptosFailedTotal,
638+
),
639+
})
640+
return nil
641+
}

0 commit comments

Comments
 (0)