Skip to content

Commit 0d6ee82

Browse files
committed
remote request: extract chain-specific response handling into pluggable policy and move Aptos failed-hash logic out of ClientRequest
1 parent 8820141 commit 0d6ee82

File tree

6 files changed

+472
-248
lines changed

6 files changed

+472
-248
lines changed

core/capabilities/remote/executable/request/client_request.go

Lines changed: 37 additions & 203 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"errors"
88
"fmt"
99
"sort"
10-
"strings"
1110
"sync"
1211
"time"
1312

@@ -18,7 +17,6 @@ import (
1817
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
1918
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
2019
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
21-
aptoscap "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/aptos"
2220
"github.com/smartcontractkit/chainlink-common/pkg/logger"
2321
"github.com/smartcontractkit/chainlink-protos/workflows/go/events"
2422

@@ -48,17 +46,10 @@ type ClientRequest struct {
4846

4947
requiredIdenticalResponses int
5048
remoteNodeCount int
51-
failedHashQuorumResponses int
52-
capabilityID string
53-
capabilityMethod string
5449

5550
requestTimeout time.Duration
5651

57-
aptosFailedHashCount map[string]int
58-
aptosFailedHashPayload map[string][]byte
59-
aptosFailedHashMetering map[string][]commoncap.MeteringNodeDetail
60-
aptosFailedTotal int
61-
aptosWriteSuccessSeen bool
52+
responsePolicy responsePolicy
6253

6354
respSent bool
6455
mux sync.Mutex
@@ -194,28 +185,18 @@ func newClientRequest(ctx context.Context, lggr logger.Logger, requestID string,
194185
}(ctxWithCancel, peerID, delay)
195186
}
196187

197-
failedHashQuorumResponses := min(len(remoteCapabilityDonInfo.Members), int(2*remoteCapabilityDonInfo.F+1))
198-
if failedHashQuorumResponses < 1 {
199-
failedHashQuorumResponses = 1
200-
}
201-
202188
return &ClientRequest{
203189
id: requestID,
204190
cancelFn: cancelFn,
205191
createdAt: time.Now(),
206192
requestTimeout: requestTimeout,
207193
requiredIdenticalResponses: int(remoteCapabilityDonInfo.F + 1),
208194
remoteNodeCount: len(remoteCapabilityDonInfo.Members),
209-
failedHashQuorumResponses: failedHashQuorumResponses,
210-
capabilityID: remoteCapabilityInfo.ID,
211-
capabilityMethod: capMethodName,
212195
responseIDCount: make(map[[32]byte]int),
213196
meteringResponses: make(map[[32]byte][]commoncap.MeteringNodeDetail),
214197
errorCount: make(map[string]int),
215198
responseReceived: responseReceived,
216-
aptosFailedHashCount: make(map[string]int),
217-
aptosFailedHashPayload: make(map[string][]byte),
218-
aptosFailedHashMetering: make(map[string][]commoncap.MeteringNodeDetail),
199+
responsePolicy: newResponsePolicy(remoteCapabilityInfo, capMethodName),
219200
responseCh: make(chan clientResponse, 1),
220201
wg: &wg,
221202
lggr: lggr,
@@ -285,13 +266,15 @@ func (c *ClientRequest) Cancel(err error) {
285266
c.mux.Lock()
286267
defer c.mux.Unlock()
287268
if !c.respSent {
288-
payload, ok, buildErr := c.buildDeterministicAptosFailedResponse(true)
289-
if buildErr != nil {
290-
c.lggr.Warnw("failed to build deterministic Aptos failed response", "error", buildErr)
291-
}
292-
if ok {
293-
c.sendResponse(clientResponse{Result: payload})
294-
return
269+
if c.responsePolicy != nil {
270+
payload, ok, buildErr := c.responsePolicy.BuildDeterministicResponse(true)
271+
if buildErr != nil {
272+
c.lggr.Warnw("failed to build deterministic policy response", "error", buildErr)
273+
}
274+
if ok {
275+
c.sendResponse(clientResponse{Result: payload})
276+
return
277+
}
295278
}
296279
c.sendResponse(clientResponse{Err: err})
297280
}
@@ -370,15 +353,17 @@ func (c *ClientRequest) OnMessage(_ context.Context, msg *types.MessageBody) err
370353
}
371354

372355
if !c.respSent {
373-
c.recordAptosFailedResponse(msg, metadata)
374-
payload, ok, buildErr := c.buildDeterministicAptosFailedResponse(c.allResponsesReceived())
375-
if buildErr != nil {
376-
return fmt.Errorf("failed to build deterministic Aptos failed response: %w", buildErr)
377-
}
378-
if ok {
379-
c.sendResponse(clientResponse{Result: payload})
380-
} else if err := c.maybeFinalizeAptosWriteAfterAllResponses(); err != nil {
381-
return err
356+
if c.responsePolicy != nil {
357+
c.responsePolicy.ObserveOKResponse(msg, metadata)
358+
payload, ok, buildErr := c.responsePolicy.BuildDeterministicResponse(c.allResponsesReceived())
359+
if buildErr != nil {
360+
return fmt.Errorf("failed to build deterministic policy response: %w", buildErr)
361+
}
362+
if ok {
363+
c.sendResponse(clientResponse{Result: payload})
364+
} else if err := c.maybeFinalizeResponsePolicyAfterAllResponses(); err != nil {
365+
return err
366+
}
382367
}
383368
}
384369
} else {
@@ -390,8 +375,8 @@ func (c *ClientRequest) OnMessage(_ context.Context, msg *types.MessageBody) err
390375
c.lggr.Warn("received multiple different errors for the same request, number of different errors received: %d", len(c.errorCount))
391376
}
392377

393-
if c.isAptosWriteReportRequest() {
394-
if err := c.maybeFinalizeAptosWriteAfterAllResponses(); err != nil {
378+
if c.responsePolicy != nil && c.responsePolicy.ShouldDeferErrorResponses() {
379+
if err := c.maybeFinalizeResponsePolicyAfterAllResponses(); err != nil {
395380
return err
396381
}
397382
return nil
@@ -447,25 +432,11 @@ func (c *ClientRequest) encodePayloadWithMetadata(msg *types.MessageBody, metada
447432
return pb.MarshalCapabilityResponse(resp)
448433
}
449434

450-
func (c *ClientRequest) isAptosWriteReportRequest() bool {
451-
if c.capabilityMethod != "WriteReport" {
452-
return false
453-
}
454-
455-
return strings.HasPrefix(c.capabilityID, "aptos:")
456-
}
457-
458435
func (c *ClientRequest) shouldDeferIdenticalResponse(payload []byte) bool {
459-
if !c.isAptosWriteReportRequest() {
460-
return false
461-
}
462-
463-
reply, ok := decodeAptosWriteReportReply(payload)
464-
if !ok {
436+
if c.responsePolicy == nil {
465437
return false
466438
}
467-
468-
return reply.GetTxStatus() == aptoscap.TxStatus_TX_STATUS_FAILED
439+
return c.responsePolicy.ShouldDeferIdenticalResponse(payload)
469440
}
470441

471442
func (c *ClientRequest) allResponsesReceived() bool {
@@ -482,160 +453,23 @@ func (c *ClientRequest) allResponsesReceived() bool {
482453
return true
483454
}
484455

485-
func (c *ClientRequest) recordAptosFailedResponse(msg *types.MessageBody, metadata commoncap.ResponseMetadata) {
486-
if !c.isAptosWriteReportRequest() {
487-
return
488-
}
489-
490-
reply, ok := decodeAptosWriteReportReply(msg.Payload)
491-
if !ok {
492-
return
493-
}
494-
if reply.GetTxStatus() == aptoscap.TxStatus_TX_STATUS_SUCCESS {
495-
c.aptosWriteSuccessSeen = true
496-
return
497-
}
498-
if reply.GetTxStatus() != aptoscap.TxStatus_TX_STATUS_FAILED || len(reply.GetTxHash()) == 0 {
499-
return
500-
}
501-
502-
normalizedHash, ok := normalizeAptosTxHash(reply.GetTxHash())
503-
if !ok {
504-
return
505-
}
506-
507-
c.aptosFailedHashCount[normalizedHash]++
508-
c.aptosFailedTotal++
509-
510-
if _, exists := c.aptosFailedHashPayload[normalizedHash]; !exists {
511-
c.aptosFailedHashPayload[normalizedHash] = append([]byte(nil), msg.Payload...)
512-
}
513-
514-
if len(metadata.Metering) > 0 {
515-
c.aptosFailedHashMetering[normalizedHash] = append(c.aptosFailedHashMetering[normalizedHash], metadata.Metering...)
516-
}
517-
}
518-
519-
func (c *ClientRequest) buildDeterministicAptosFailedResponse(allowNoQuorum bool) ([]byte, bool, error) {
520-
if !c.isAptosWriteReportRequest() {
521-
return nil, false, nil
522-
}
523-
if c.aptosWriteSuccessSeen {
524-
return nil, false, nil
525-
}
526-
527-
if len(c.aptosFailedHashCount) == 0 {
528-
return nil, false, nil
529-
}
530-
if !allowNoQuorum && c.aptosFailedTotal < c.failedHashQuorumResponses {
531-
return nil, false, nil
532-
}
533-
534-
selectedHash := ""
535-
for hash := range c.aptosFailedHashPayload {
536-
if selectedHash == "" || hash < selectedHash {
537-
selectedHash = hash
538-
}
539-
}
540-
541-
if selectedHash == "" {
542-
return nil, false, nil
543-
}
544-
545-
selectedPayload, ok := c.aptosFailedHashPayload[selectedHash]
546-
if !ok || len(selectedPayload) == 0 {
547-
return nil, false, nil
548-
}
549-
550-
nodeReports := c.aptosFailedHashMetering[selectedHash]
551-
metadata := commoncap.ResponseMetadata{
552-
Metering: append([]commoncap.MeteringNodeDetail(nil), nodeReports...),
553-
}
554-
555-
payload, err := buildAptosFailedWriteReportPayload(selectedPayload, selectedHash, metadata)
556-
if err != nil {
557-
return nil, false, err
558-
}
559-
560-
return payload, true, nil
561-
}
562-
563-
func decodeAptosWriteReportReply(payload []byte) (*aptoscap.WriteReportReply, bool) {
564-
resp, err := pb.UnmarshalCapabilityResponse(payload)
565-
if err != nil {
566-
return nil, false
567-
}
568-
569-
reply := &aptoscap.WriteReportReply{}
570-
if _, err := commoncap.UnwrapResponse(resp, reply); err != nil {
571-
return nil, false
572-
}
573-
574-
return reply, true
575-
}
576-
577-
func buildAptosFailedWriteReportPayload(payload []byte, normalizedHash string, metadata commoncap.ResponseMetadata) ([]byte, error) {
578-
resp, err := pb.UnmarshalCapabilityResponse(payload)
579-
if err != nil {
580-
return nil, err
581-
}
582-
583-
reply := &aptoscap.WriteReportReply{}
584-
migrated, err := commoncap.UnwrapResponse(resp, reply)
585-
if err != nil {
586-
return nil, err
587-
}
588-
589-
reply.TxStatus = aptoscap.TxStatus_TX_STATUS_FAILED
590-
reply.TxHash = []byte("0x" + normalizedHash)
591-
592-
if err := commoncap.SetResponse(&resp, migrated, reply); err != nil {
593-
return nil, err
594-
}
595-
596-
resp.Metadata = metadata
597-
return pb.MarshalCapabilityResponse(resp)
598-
}
599-
600-
func normalizeAptosTxHash(raw []byte) (string, bool) {
601-
if len(raw) == 32 {
602-
return hex.EncodeToString(raw), true
603-
}
604-
605-
s := strings.TrimSpace(strings.ToLower(string(raw)))
606-
s = strings.TrimPrefix(s, "0x")
607-
if len(s) != 64 {
608-
return "", false
609-
}
610-
611-
if _, err := hex.DecodeString(s); err != nil {
612-
return "", false
613-
}
614-
615-
return s, true
616-
}
617-
618-
func (c *ClientRequest) maybeFinalizeAptosWriteAfterAllResponses() error {
619-
if !c.isAptosWriteReportRequest() || c.respSent || !c.allResponsesReceived() {
456+
func (c *ClientRequest) maybeFinalizeResponsePolicyAfterAllResponses() error {
457+
if c.responsePolicy == nil || c.respSent {
620458
return nil
621459
}
622460

623-
payload, ok, buildErr := c.buildDeterministicAptosFailedResponse(true)
624-
if buildErr != nil {
625-
return fmt.Errorf("failed to build deterministic Aptos failed response: %w", buildErr)
461+
response, ok, err := c.responsePolicy.FinalizeAfterAllResponses(responsePolicyState{
462+
AllResponsesReceived: c.allResponsesReceived(),
463+
ResponseVariants: len(c.responseIDCount),
464+
TotalErrorCount: c.totalErrorCount,
465+
})
466+
if err != nil {
467+
return err
626468
}
627-
if ok {
628-
c.sendResponse(clientResponse{Result: payload})
469+
if !ok || response == nil {
629470
return nil
630471
}
631472

632-
c.sendResponse(clientResponse{
633-
Err: fmt.Errorf(
634-
"received all Aptos write responses without deterministic failed hash (okResponses=%d errorResponses=%d failedHashResponses=%d)",
635-
len(c.responseIDCount),
636-
c.totalErrorCount,
637-
c.aptosFailedTotal,
638-
),
639-
})
473+
c.sendResponse(*response)
640474
return nil
641475
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package request
2+
3+
import (
4+
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
5+
6+
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
7+
)
8+
9+
// responsePolicy allows request finalization behavior to be extended for specific
10+
// capabilities without coupling generic request handling to chain-specific logic.
11+
type responsePolicy interface {
12+
ShouldDeferIdenticalResponse(payload []byte) bool
13+
ObserveOKResponse(msg *types.MessageBody, metadata commoncap.ResponseMetadata)
14+
BuildDeterministicResponse(allowNoQuorum bool) ([]byte, bool, error)
15+
ShouldDeferErrorResponses() bool
16+
FinalizeAfterAllResponses(state responsePolicyState) (*clientResponse, bool, error)
17+
}
18+
19+
type responsePolicyState struct {
20+
AllResponsesReceived bool
21+
ResponseVariants int
22+
TotalErrorCount int
23+
}
24+
25+
type responsePolicyBuilder func(remoteCapabilityInfo commoncap.CapabilityInfo, capabilityMethod string) responsePolicy
26+
27+
var responsePolicyBuilders []responsePolicyBuilder
28+
29+
func registerResponsePolicyBuilder(builder responsePolicyBuilder) {
30+
responsePolicyBuilders = append(responsePolicyBuilders, builder)
31+
}
32+
33+
func newResponsePolicy(remoteCapabilityInfo commoncap.CapabilityInfo, capabilityMethod string) responsePolicy {
34+
for _, builder := range responsePolicyBuilders {
35+
policy := builder(remoteCapabilityInfo, capabilityMethod)
36+
if policy != nil {
37+
return policy
38+
}
39+
}
40+
return nil
41+
}

0 commit comments

Comments
 (0)