Skip to content

Commit 30e7166

Browse files
authored
Merge pull request #1100 from lightninglabs/robust-multi-proof-delivery
Enhance Proof Delivery Resilience with ParSliceErrCollect in ChainPorter
2 parents 4556837 + ac10515 commit 30e7166

File tree

3 files changed

+101
-2
lines changed

3 files changed

+101
-2
lines changed

fn/concurrency.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package fn
22

33
import (
44
"context"
5+
"fmt"
56
"runtime"
7+
"sync"
68

79
"golang.org/x/sync/errgroup"
810
)
@@ -32,3 +34,48 @@ func ParSlice[V any](ctx context.Context, s []V, f ErrFunc[V]) error {
3234

3335
return errGroup.Wait()
3436
}
37+
38+
// ParSliceErrCollect can be used to execute a function on each element of a
39+
// slice in parallel. This function is fully blocking and will wait for all
40+
// goroutines to finish (subject to context cancellation/timeout). Any errors
41+
// will be collected and returned as a map of slice element index to error.
42+
// Active goroutines limited with number of CPU.
43+
func ParSliceErrCollect[V any](ctx context.Context, s []V,
44+
f ErrFunc[V]) (map[int]error, error) {
45+
46+
errGroup, ctx := errgroup.WithContext(ctx)
47+
errGroup.SetLimit(runtime.NumCPU())
48+
49+
var instanceErrorsMutex sync.Mutex
50+
instanceErrors := make(map[int]error, len(s))
51+
52+
for idx := range s {
53+
errGroup.Go(func() error {
54+
err := f(ctx, s[idx])
55+
if err != nil {
56+
instanceErrorsMutex.Lock()
57+
instanceErrors[idx] = err
58+
instanceErrorsMutex.Unlock()
59+
}
60+
61+
// Avoid returning an error here, as that would cancel
62+
// the errGroup and terminate all slice element
63+
// processing instances. Instead, collect the error and
64+
// return it later.
65+
return nil
66+
})
67+
}
68+
69+
// Now we will wait/block for all goroutines to finish.
70+
//
71+
// The goroutines that are executing in parallel should not return an
72+
// error, but the Wait call may return an error if the context is
73+
// canceled or timed out.
74+
err := errGroup.Wait()
75+
if err != nil {
76+
return nil, fmt.Errorf("failed to wait on error group in "+
77+
"ParSliceErrorCollect: %w", err)
78+
}
79+
80+
return instanceErrors, nil
81+
}

fn/func.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package fn
22

3-
import "fmt"
3+
import (
4+
"fmt"
5+
6+
"github.com/lightningnetwork/lnd/fn"
7+
)
48

59
// Reducer represents a function that takes an accumulator and the value, then
610
// returns a new accumulator.
@@ -263,3 +267,19 @@ func Last[T any](xs []*T, pred func(*T) bool) (*T, error) {
263267

264268
return matches[len(matches)-1], nil
265269
}
270+
271+
// KV is a generic struct that holds a key-value pair.
272+
type KV[K any, V any] struct {
273+
Key K
274+
Value V
275+
}
276+
277+
// PeekMap non-deterministically selects and returns a single key-value pair
278+
// from the given map.
279+
func PeekMap[K comparable, V any](m map[K]V) fn.Option[KV[K, V]] {
280+
for k, v := range m {
281+
return fn.Some(KV[K, V]{Key: k, Value: v})
282+
}
283+
284+
return fn.None[KV[K, V]]()
285+
}

tapfreighter/chain_porter.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -870,11 +870,43 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
870870

871871
// If we have a non-interactive proof, then we'll launch several
872872
// goroutines to deliver the proof(s) to the receiver(s).
873-
err := fn.ParSlice(ctx, pkg.OutboundPkg.Outputs, deliver)
873+
instanceErrors, err := fn.ParSliceErrCollect(
874+
ctx, pkg.OutboundPkg.Outputs, deliver,
875+
)
874876
if err != nil {
875877
return fmt.Errorf("error delivering proof(s): %w", err)
876878
}
877879

880+
// If there were any errors during the proof delivery process, we'll
881+
// log them all here.
882+
for idx := range instanceErrors {
883+
output := pkg.OutboundPkg.Outputs[idx]
884+
instanceErr := instanceErrors[idx]
885+
886+
scriptPubKey := output.ScriptKey.PubKey.SerializeCompressed()
887+
anchorOutpoint := output.Anchor.OutPoint.String()
888+
courierAddr := string(output.ProofCourierAddr)
889+
890+
log.Errorf("Error delivering transfer output proof "+
891+
"(anchor_outpoint=%s, script_pub_key=%v, "+
892+
"position=%d, proof_courier_addr=%s, "+
893+
"proof_delivery_status=%v): %v",
894+
anchorOutpoint, scriptPubKey, output.Position,
895+
courierAddr, output.ProofDeliveryComplete,
896+
instanceErr)
897+
}
898+
899+
// Return the first error encountered during the proof delivery process,
900+
// if any.
901+
var firstErr error
902+
fn.PeekMap(instanceErrors).WhenSome(func(kv fn.KV[int, error]) {
903+
firstErr = err
904+
})
905+
906+
if firstErr != nil {
907+
return firstErr
908+
}
909+
878910
// At this point, the transfer is fully finalised and successful:
879911
// - The anchoring transaction has been confirmed on-chain.
880912
// - The proof(s) have been delivered to the receiver(s).

0 commit comments

Comments
 (0)