Skip to content

Commit 8a8daea

Browse files
authored
Merge pull request #1203 from lightninglabs/block-proof-courier-connect
Block on Proof Courier Service Connection Attempt
2 parents 1dd4f02 + 3aae9ea commit 8a8daea

File tree

6 files changed

+79
-11
lines changed

6 files changed

+79
-11
lines changed

itest/tapd_harness.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,8 @@ func newTapdHarness(t *testing.T, ht *harnessTest, cfg tapdConfig,
270270
BackoffCfg: &hashmailBackoffCfg,
271271
}
272272
finalCfg.UniverseRpcCourier = &proof.UniverseRpcCourierCfg{
273-
BackoffCfg: &universeRpcBackoffCfg,
273+
BackoffCfg: &universeRpcBackoffCfg,
274+
ServiceRequestTimeout: 50 * time.Millisecond,
274275
}
275276

276277
switch typedProofCourier := (opts.proofCourier).(type) {

itest/test_harness.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,8 @@ func setupHarnesses(t *testing.T, ht *harnessTest,
327327
// If nothing is specified, we use the universe RPC proof courier by
328328
// default.
329329
default:
330+
t.Logf("Address of universe server as proof courier: %v",
331+
universeServer.service.rpcHost())
330332
proofCourier = universeServer
331333
}
332334

proof/courier.go

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,7 @@ func (b *BackoffHandler) Exec(ctx context.Context, proofLocator Locator,
654654
if err != nil {
655655
return err
656656
}
657-
log.Infof("Starting proof transfer backoff procedure for proof "+
657+
log.Infof("Starting proof transfer backoff procedure "+
658658
"(transfer_type=%s, locator_hash=%x)", transferType,
659659
locatorHash[:])
660660

@@ -710,7 +710,7 @@ func (b *BackoffHandler) Exec(ctx context.Context, proofLocator Locator,
710710
)
711711
subscriberEvent(waitEvent)
712712

713-
log.Debugf("Proof delivery failed with error. Backing off. "+
713+
log.Debugf("Proof transfer failed with error. Backing off. "+
714714
"(transfer_type=%s, locator_hash=%x, backoff=%s, "+
715715
"attempt=%d): %v",
716716
transferType, locatorHash[:], backoff, i, errExec)
@@ -742,7 +742,7 @@ func (b *BackoffHandler) wait(ctx context.Context, wait time.Duration) error {
742742
case <-time.After(wait):
743743
return nil
744744
case <-ctx.Done():
745-
return fmt.Errorf("context canceled")
745+
return fmt.Errorf("back off handler context done")
746746
}
747747
}
748748

@@ -1154,10 +1154,17 @@ func (h *HashMailCourier) SetSubscribers(
11541154
var _ Courier = (*HashMailCourier)(nil)
11551155

11561156
// UniverseRpcCourierCfg is the config for the universe RPC proof courier.
1157+
//
1158+
// nolint:lll
11571159
type UniverseRpcCourierCfg struct {
11581160
// BackoffCfg configures the behaviour of the proof delivery
11591161
// functionality.
11601162
BackoffCfg *BackoffCfg
1163+
1164+
// ServiceRequestTimeout defines the maximum duration we'll wait for
1165+
// a courier service to handle our outgoing request during a connection
1166+
// attempt, or when delivering or retrieving a proof.
1167+
ServiceRequestTimeout time.Duration `long:"servicerequestimeout" description:"The maximum duration we'll wait for a courier service to handle our outgoing request during a connection attempt, or when delivering or retrieving a proof."`
11611168
}
11621169

11631170
// UniverseRpcCourier is a universe RPC proof courier service handle. It
@@ -1356,18 +1363,29 @@ func (c *UniverseRpcCourier) DeliverProof(ctx context.Context,
13561363
deliverFunc := func() error {
13571364
// Connect to the courier service if a connection hasn't
13581365
// been established yet.
1359-
err := c.ensureConnect(ctx)
1366+
subCtx, subCtxCancel := context.WithTimeout(
1367+
ctx, c.cfg.ServiceRequestTimeout,
1368+
)
1369+
defer subCtxCancel()
1370+
1371+
err := c.ensureConnect(subCtx)
13601372
if err != nil {
13611373
return fmt.Errorf("unable to connect to "+
13621374
"courier service during delivery "+
13631375
"attempt: %w", err)
13641376
}
13651377

13661378
// Submit proof to courier.
1367-
_, err = c.client.InsertProof(ctx, &unirpc.AssetProof{
1379+
subCtx, subCtxCancel = context.WithTimeout(
1380+
ctx, c.cfg.ServiceRequestTimeout,
1381+
)
1382+
defer subCtxCancel()
1383+
1384+
assetProof := unirpc.AssetProof{
13681385
Key: &universeKey,
13691386
AssetLeaf: &assetLeaf,
1370-
})
1387+
}
1388+
_, err = c.client.InsertProof(subCtx, &assetProof)
13711389
if err != nil {
13721390
return fmt.Errorf("error inserting proof "+
13731391
"into universe courier service: %w",
@@ -1420,15 +1438,25 @@ func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context,
14201438
receiveFunc := func() error {
14211439
// Connect to the courier service if a connection hasn't
14221440
// been established yet.
1423-
err := c.ensureConnect(ctx)
1441+
subCtx, subCtxCancel := context.WithTimeout(
1442+
ctx, c.cfg.ServiceRequestTimeout,
1443+
)
1444+
defer subCtxCancel()
1445+
1446+
err := c.ensureConnect(subCtx)
14241447
if err != nil {
14251448
return fmt.Errorf("unable to connect to "+
1426-
"courier service during delivery "+
1427-
"attempt: %w", err)
1449+
"universe RPC courier service during "+
1450+
"recieve attempt: %w", err)
14281451
}
14291452

14301453
// Retrieve proof from courier.
1431-
resp, err := c.client.QueryProof(ctx, &universeKey)
1454+
subCtx, subCtxCancel = context.WithTimeout(
1455+
ctx, c.cfg.ServiceRequestTimeout,
1456+
)
1457+
defer subCtxCancel()
1458+
1459+
resp, err := c.client.QueryProof(subCtx, &universeKey)
14321460
if err != nil {
14331461
return fmt.Errorf("error retrieving proof "+
14341462
"from universe courier service: %w",

sample-tapd.conf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@
6464
; {s, m, h}.
6565
; custodianproofretrievaldelay=5s
6666

67+
; The maximum duration we'll wait for a proof courier service to handle our
68+
; outgoing request during a connection attempt, or when delivering or retrieving
69+
; a proof.
70+
; universerpccourier.servicerequestimeout=5s
71+
6772
; Network to run on (mainnet, regtest, testnet, simnet, signet)
6873
; network=testnet
6974

tapcfg/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ const (
9898
// use for waiting for a receiver to acknowledge a proof transfer.
9999
defaultProofTransferReceiverAckTimeout = time.Hour * 6
100100

101+
// defaultProofCourierServiceResponseTimeout is the default timeout
102+
// we'll use for waiting for a response from the proof courier service.
103+
defaultProofCourierServiceResponseTimeout = time.Second * 5
104+
101105
// defaultUniverseSyncInterval is the default interval that we'll use
102106
// to sync Universe state with the federation.
103107
defaultUniverseSyncInterval = time.Minute * 10
@@ -435,6 +439,7 @@ func DefaultConfig() Config {
435439
InitialBackoff: defaultProofTransferInitialBackoff,
436440
MaxBackoff: defaultProofTransferMaxBackoff,
437441
},
442+
ServiceRequestTimeout: defaultProofCourierServiceResponseTimeout,
438443
},
439444
CustodianProofRetrievalDelay: defaultProofRetrievalDelay,
440445
Universe: &UniverseConfig{

tapfreighter/chain_porter.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -881,6 +881,10 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
881881
// those that do not.
882882
reportProofTransfers(notDeliveringOutputs, pendingDeliveryOutputs)
883883

884+
// incompleteDelivery is set to true if any proof delivery attempts fail
885+
// and exceed the maximum backoff limit.
886+
incompleteDelivery := false
887+
884888
deliver := func(ctx context.Context, out TransferOutput) error {
885889
scriptKey := out.ScriptKey.PubKey
886890
scriptKeyBytes := scriptKey.SerializeCompressed()
@@ -940,6 +944,14 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
940944
// later.
941945
var backoffExecErr *proof.BackoffExecError
942946
if errors.As(err, &backoffExecErr) {
947+
log.Debugf("Exceeded backoff limit for proof delivery "+
948+
"(script_key=%x, proof_courier_addr=%s)",
949+
scriptKey.SerializeCompressed(),
950+
out.ProofCourierAddr)
951+
952+
// Set the incomplete delivery flag to true so that we
953+
// can retry the proof transfer state later.
954+
incompleteDelivery = true
943955
return nil
944956
}
945957
if err != nil {
@@ -957,6 +969,10 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
957969
"confirmation: %w", err)
958970
}
959971

972+
log.Infof("Transfer output proof delivery complete "+
973+
"(anchor_txid=%v, output_position=%d)",
974+
pkg.OutboundPkg.AnchorTx.TxHash(), out.Position)
975+
960976
return nil
961977
}
962978

@@ -999,6 +1015,17 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
9991015
return firstErr
10001016
}
10011017

1018+
// If the delivery is incomplete, we'll return early so that we can
1019+
// retry proof transfer later.
1020+
if incompleteDelivery {
1021+
log.Debugf("Proof delivery incomplete, will retry executing "+
1022+
"the proof transfer state (transfer_anchor_tx_hash=%v)",
1023+
pkg.OutboundPkg.AnchorTx.TxHash())
1024+
1025+
// Return here before setting the transfer to complete.
1026+
return nil
1027+
}
1028+
10021029
// At this point, the transfer is fully finalised and successful:
10031030
// - The anchoring transaction has been confirmed on-chain.
10041031
// - The proof(s) have been delivered to the receiver(s).

0 commit comments

Comments
 (0)