Skip to content

Commit f47d874

Browse files
committed
peersync: replace poll service with new package
This change prepares support for 2-hop peerswap. It removes the old poll service. It adds a new peersync package. Both the CLN and LND plugins start one shared PeerSync instance. The external CLI and RPC interfaces stay the same. The existing swap and premium logic also stay the same. No data migration is required. Version compatibility requirements do not change. The refactor makes the peer sync logic easier to understand and test. The peersync package is split into smaller parts with unit tests. CLN and LND now share the same code path. PeerSync can resend updates at once on premium changes or policy reloads. This reduces configuration delay and makes operations and monitoring easier.
1 parent 569b7be commit f47d874

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+3894
-956
lines changed

clightning/clightning.go

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,16 @@ import (
1010
"os"
1111
"time"
1212

13-
"github.com/elementsproject/peerswap/log"
14-
"github.com/elementsproject/peerswap/premium"
15-
1613
"github.com/btcsuite/btcd/chaincfg"
1714
"github.com/elementsproject/glightning/gbitcoin"
18-
"github.com/elementsproject/peerswap/onchain"
19-
2015
"github.com/elementsproject/glightning/glightning"
2116
"github.com/elementsproject/glightning/jrpc2"
2217
"github.com/elementsproject/peerswap/lightning"
18+
"github.com/elementsproject/peerswap/log"
2319
"github.com/elementsproject/peerswap/messages"
24-
"github.com/elementsproject/peerswap/poll"
20+
"github.com/elementsproject/peerswap/onchain"
21+
"github.com/elementsproject/peerswap/peersync"
22+
"github.com/elementsproject/peerswap/premium"
2523
"github.com/elementsproject/peerswap/swap"
2624
"github.com/elementsproject/peerswap/wallet"
2725
goerrors "github.com/go-errors/errors"
@@ -75,7 +73,8 @@ type ClightningClient struct {
7573
swaps *swap.SwapService
7674
requestedSwaps *swap.RequestedSwapsPrinter
7775
policy PolicyReloader
78-
pollService *poll.Service
76+
peerSync *peersync.PeerSync
77+
peerStore *peersync.Store
7978
ps *premium.Setting
8079

8180
gbitcoin *gbitcoin.Bitcoin
@@ -326,14 +325,15 @@ func (cl *ClightningClient) GetPreimage() (lightning.Preimage, error) {
326325
func (cl *ClightningClient) SetupClients(liquidWallet wallet.Wallet,
327326
swaps *swap.SwapService,
328327
policy PolicyReloader, requestedSwaps *swap.RequestedSwapsPrinter,
329-
bitcoin *gbitcoin.Bitcoin, bitcoinChain *onchain.BitcoinOnChain, pollService *poll.Service,
328+
bitcoin *gbitcoin.Bitcoin, bitcoinChain *onchain.BitcoinOnChain, peerSync *peersync.PeerSync, peerStore *peersync.Store,
330329
ps *premium.Setting) {
331330
cl.liquidWallet = liquidWallet
332331
cl.requestedSwaps = requestedSwaps
333332
cl.swaps = swaps
334333
cl.policy = policy
335334
cl.gbitcoin = bitcoin
336-
cl.pollService = pollService
335+
cl.peerSync = peerSync
336+
cl.peerStore = peerStore
337337
cl.bitcoinChain = bitcoinChain
338338
cl.ps = ps
339339
if cl.bitcoinChain != nil {
@@ -519,16 +519,6 @@ func (cl *ClightningClient) isPeerConnected(nodeId string) bool {
519519
return peer.Connected
520520
}
521521

522-
// peerRunsPeerSwap returns true if the peer with peerId is listed in the
523-
// pollService.
524-
func (cl *ClightningClient) peerRunsPeerSwap(peerId string) bool {
525-
pollInfo, err := cl.pollService.GetPollFrom(peerId)
526-
if err == nil && pollInfo != nil {
527-
return true
528-
}
529-
return false
530-
}
531-
532522
// This is called after the Plugin starts up successfully
533523
func (cl *ClightningClient) onInit(plugin *glightning.Plugin, options map[string]glightning.Option, config *glightning.Config) {
534524
cl.glightning.StartUp(config.RpcFile, config.LightningDir)
@@ -549,10 +539,20 @@ func (cl *ClightningClient) OnConnect(connectEvent *glightning.ConnectEvent) {
549539
go func() {
550540
for {
551541
time.Sleep(10 * time.Second)
552-
if cl.pollService != nil {
553-
cl.pollService.RequestPoll(
554-
lo.Ternary(connectEvent.PeerId != "",
555-
connectEvent.PeerId, connectEvent.Conn.PeerId))
542+
if cl.peerSync != nil {
543+
peerIDValue := lo.Ternary(connectEvent.PeerId != "",
544+
connectEvent.PeerId, connectEvent.Conn.PeerId)
545+
if peerIDValue == "" {
546+
return
547+
}
548+
peerID, err := peersync.NewPeerID(peerIDValue)
549+
if err != nil {
550+
log.Debugf("invalid peer id for request poll: %v", err)
551+
return
552+
}
553+
if err := cl.peerSync.RequestPoll(cl.ctx, peerID); err != nil {
554+
log.Debugf("failed to request poll for %s: %v", peerIDValue, err)
555+
}
556556
return
557557
}
558558
}

clightning/clightning_commands.go

Lines changed: 38 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package clightning
22

33
import (
4+
"context"
45
"crypto/rand"
56
"encoding/hex"
67
"errors"
@@ -12,6 +13,8 @@ import (
1213

1314
"github.com/elementsproject/peerswap/log"
1415
"github.com/elementsproject/peerswap/peerswaprpc"
16+
"github.com/elementsproject/peerswap/peersync"
17+
"github.com/elementsproject/peerswap/peersync/format"
1518
"github.com/elementsproject/peerswap/premium"
1619

1720
"github.com/elementsproject/glightning/glightning"
@@ -225,8 +228,10 @@ func (l *SwapOut) Call() (jrpc2.Result, error) {
225228
}
226229

227230
// Skip this check when `force` is set.
228-
if !l.Force && !l.cl.peerRunsPeerSwap(fundingChannels.Id) {
229-
return nil, fmt.Errorf("peer does not run peerswap")
231+
if !l.Force {
232+
if l.cl.peerSync == nil || !l.cl.peerSync.HasCompatiblePeer(fundingChannels.Id) {
233+
return nil, fmt.Errorf("peer does not run peerswap")
234+
}
230235
}
231236

232237
if !l.cl.isPeerConnected(fundingChannels.Id) {
@@ -345,8 +350,10 @@ func (l *SwapIn) Call() (jrpc2.Result, error) {
345350
}
346351

347352
// Skip this check when `force` is set.
348-
if !l.Force && !l.cl.peerRunsPeerSwap(fundingChannels.Id) {
349-
return nil, fmt.Errorf("peer does not run peerswap")
353+
if !l.Force {
354+
if l.cl.peerSync == nil || !l.cl.peerSync.HasCompatiblePeer(fundingChannels.Id) {
355+
return nil, fmt.Errorf("peer does not run peerswap")
356+
}
350357
}
351358

352359
if !l.cl.isPeerConnected(fundingChannels.Id) {
@@ -559,91 +566,26 @@ func (l *ListPeers) Call() (jrpc2.Result, error) {
559566
fundingChannels[channel.ShortChannelId] = channel
560567
}
561568

562-
// get polls
563-
polls, err := l.cl.pollService.GetCompatiblePolls()
564-
if err != nil {
565-
return nil, err
569+
compatible := make(map[string]*peersync.Peer)
570+
if l.cl.peerSync != nil {
571+
compatible, err = l.cl.peerSync.CompatiblePeers()
572+
if err != nil {
573+
return nil, err
574+
}
566575
}
567576

568577
peerSwappers := []*peerswaprpc.PeerSwapPeer{}
569578
for _, peer := range peers {
570-
if p, ok := polls[peer.Id]; ok {
579+
peerState, ok := compatible[peer.Id]
580+
if ok {
581+
capability := peerState.Capability()
571582
swaps, err := l.cl.swaps.ListSwapsByPeer(peer.Id)
572583
if err != nil {
573584
return nil, err
574585
}
575586

576-
var paidFees uint64
577-
var ReceiverSwapsOut, ReceiverSwapsIn, ReceiverSatsOut, ReceiverSatsIn uint64
578-
var SenderSwapsOut, SenderSwapsIn, SenderSatsOut, SenderSatsIn uint64
579-
for _, s := range swaps {
580-
// We only list successful swaps. They all end in an
581-
// State_ClaimedPreimage state.
582-
if s.Current == swap.State_ClaimedPreimage {
583-
if s.Role == swap.SWAPROLE_SENDER {
584-
paidFees += s.Data.OpeningTxFee
585-
if s.Type == swap.SWAPTYPE_OUT {
586-
SenderSwapsOut++
587-
SenderSatsOut += s.Data.GetAmount()
588-
} else {
589-
SenderSwapsIn++
590-
SenderSatsIn += s.Data.GetAmount()
591-
}
592-
} else {
593-
if s.Type == swap.SWAPTYPE_OUT {
594-
ReceiverSwapsOut++
595-
ReceiverSatsOut += s.Data.GetAmount()
596-
} else {
597-
ReceiverSwapsIn++
598-
ReceiverSatsIn += s.Data.GetAmount()
599-
}
600-
}
601-
}
602-
}
603-
604-
peerSwapPeer := &peerswaprpc.PeerSwapPeer{
605-
NodeId: peer.Id,
606-
SwapsAllowed: p.PeerAllowed,
607-
SupportedAssets: p.Assets,
608-
AsSender: &peerswaprpc.SwapStats{
609-
SwapsOut: SenderSwapsOut,
610-
SwapsIn: SenderSwapsIn,
611-
SatsOut: SenderSatsOut,
612-
SatsIn: SenderSatsIn,
613-
},
614-
AsReceiver: &peerswaprpc.SwapStats{
615-
SwapsOut: ReceiverSwapsOut,
616-
SwapsIn: ReceiverSwapsIn,
617-
SatsOut: ReceiverSatsOut,
618-
SatsIn: ReceiverSatsIn,
619-
},
620-
PaidFee: paidFees,
621-
PeerPremium: &peerswaprpc.PeerPremium{
622-
NodeId: peer.Id,
623-
Rates: []*peerswaprpc.PremiumRate{
624-
{
625-
Asset: peerswaprpc.AssetType_BTC,
626-
Operation: peerswaprpc.OperationType_SWAP_IN,
627-
PremiumRatePpm: p.BTCSwapInPremiumRatePPM,
628-
},
629-
{
630-
Asset: peerswaprpc.AssetType_BTC,
631-
Operation: peerswaprpc.OperationType_SWAP_OUT,
632-
PremiumRatePpm: p.BTCSwapOutPremiumRatePPM,
633-
},
634-
{
635-
Asset: peerswaprpc.AssetType_LBTC,
636-
Operation: peerswaprpc.OperationType_SWAP_IN,
637-
PremiumRatePpm: p.LBTCSwapInPremiumRatePPM,
638-
},
639-
{
640-
Asset: peerswaprpc.AssetType_LBTC,
641-
Operation: peerswaprpc.OperationType_SWAP_OUT,
642-
PremiumRatePpm: p.LBTCSwapOutPremiumRatePPM,
643-
},
644-
},
645-
},
646-
}
587+
view := format.BuildPeerView(peer.Id, capability, swaps)
588+
peerSwapPeer := peerswaprpc.NewPeerSwapPeerFromView(view)
647589
channels, err := l.cl.glightning.ListChannelsBySource(peer.Id)
648590
if err != nil {
649591
return nil, err
@@ -754,8 +696,10 @@ func (c ReloadPolicyFile) Call() (jrpc2.Result, error) {
754696
if err != nil {
755697
return nil, err
756698
}
757-
// Resend poll
758-
c.cl.pollService.PollAllPeers()
699+
// Resend capability updates
700+
if c.cl.peerSync != nil {
701+
c.cl.peerSync.ForcePollAllPeers(context.Background())
702+
}
759703
return c.cl.policy.Get(), nil
760704
}
761705

@@ -1217,10 +1161,13 @@ func (c *UpdatePremiumRate) Call() (jrpc2.Result, error) {
12171161
if err != nil {
12181162
return nil, fmt.Errorf("error creating premium rate: %v", err)
12191163
}
1220-
err = c.cl.ps.SetRate(c.PeerID, rate)
1164+
err = c.cl.ps.SetRate(context.Background(), c.PeerID, rate)
12211165
if err != nil {
12221166
return nil, fmt.Errorf("error setting premium rate: %v", err)
12231167
}
1168+
if c.cl.peerSync != nil {
1169+
c.cl.peerSync.ForcePollAllPeers(context.Background())
1170+
}
12241171
return &peerswaprpc.PremiumRate{
12251172
Asset: peerswaprpc.ToAssetType(rate.Asset()),
12261173
Operation: peerswaprpc.ToOperationType(rate.Operation()),
@@ -1263,11 +1210,14 @@ func (c *DeletePremiumRate) Call() (jrpc2.Result, error) {
12631210
if !c.cl.isReady {
12641211
return nil, ErrWaitingForReady
12651212
}
1266-
err := c.cl.ps.DeleteRate(c.PeerID, toPremiumAssetType(c.Asset),
1213+
err := c.cl.ps.DeleteRate(context.Background(), c.PeerID, toPremiumAssetType(c.Asset),
12671214
toPremiumOperationType(c.Operation))
12681215
if err != nil {
12691216
return nil, fmt.Errorf("error deleting premium rate: %v", err)
12701217
}
1218+
if c.cl.peerSync != nil {
1219+
c.cl.peerSync.ForcePollAllPeers(context.Background())
1220+
}
12711221
return &peerswaprpc.PremiumRate{
12721222
Asset: peerswaprpc.ToAssetType(toPremiumAssetType(c.Asset)),
12731223
Operation: peerswaprpc.ToOperationType(toPremiumOperationType(c.Operation)),
@@ -1315,10 +1265,13 @@ func (c *UpdateGlobalPremiumRate) Call() (jrpc2.Result, error) {
13151265
if err != nil {
13161266
return nil, fmt.Errorf("error creating premium rate: %v", err)
13171267
}
1318-
err = c.cl.ps.SetDefaultRate(rate)
1268+
err = c.cl.ps.SetDefaultRate(context.Background(), rate)
13191269
if err != nil {
13201270
return nil, fmt.Errorf("error setting default premium rate: %v", err)
13211271
}
1272+
if c.cl.peerSync != nil {
1273+
c.cl.peerSync.ForcePollAllPeers(context.Background())
1274+
}
13221275
return &peerswaprpc.PremiumRate{
13231276
Asset: peerswaprpc.ToAssetType(rate.Asset()),
13241277
Operation: peerswaprpc.ToOperationType(rate.Operation()),

cmd/peerswap-plugin/main.go

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/elementsproject/peerswap/log"
1515
"github.com/elementsproject/peerswap/lwk"
1616
"github.com/elementsproject/peerswap/version"
17+
"github.com/thejerf/suture/v4"
1718
"golang.org/x/sys/unix"
1819

1920
"github.com/vulpemventures/go-elements/network"
@@ -26,8 +27,8 @@ import (
2627
"github.com/elementsproject/peerswap/clightning"
2728
"github.com/elementsproject/peerswap/messages"
2829
"github.com/elementsproject/peerswap/onchain"
30+
"github.com/elementsproject/peerswap/peersync"
2931
"github.com/elementsproject/peerswap/policy"
30-
"github.com/elementsproject/peerswap/poll"
3132
"github.com/elementsproject/peerswap/premium"
3233
"github.com/elementsproject/peerswap/swap"
3334
"github.com/elementsproject/peerswap/txwatcher"
@@ -372,16 +373,54 @@ func run(ctx context.Context, lightningPlugin *clightning.ClightningClient) erro
372373
return err
373374
}
374375

375-
pollStore, err := poll.NewStore(swapDb)
376+
peerSyncDBPath := filepath.Join(config.PeerswapDir, "peersync.db")
377+
peerStore, err := peersync.NewStore(peerSyncDBPath)
376378
if err != nil {
377379
return err
378380
}
379-
pollService := poll.NewService(1*time.Hour, 2*time.Hour, pollStore, lightningPlugin, pol, lightningPlugin, supportedAssets, ps)
380-
pollService.Start()
381-
defer pollService.Stop()
381+
defer func() {
382+
if closeErr := peerStore.Close(); closeErr != nil {
383+
log.Infof("peersync store close failed: %v", closeErr)
384+
}
385+
}()
386+
387+
nodeID, err := peersync.NewPeerID(lightningPlugin.GetNodeId())
388+
if err != nil {
389+
return err
390+
}
391+
392+
// Use the peersync CLN lightning adapter; it self-registers message handler
393+
clnAdapter := peersync.NewClnLightningAdapter(lightningPlugin)
394+
peerSync := peersync.NewPeerSync(nodeID, peerStore, clnAdapter, pol, supportedAssets, ps)
395+
396+
psSupervisor := suture.New("peersync", suture.Spec{
397+
EventHook: func(e suture.Event) {
398+
log.Infof("peersync supervisor event: %s", e)
399+
},
400+
})
401+
psSupervisor.Add(peerSync)
402+
403+
peerSyncCtx, peerSyncCancel := context.WithCancel(ctx)
404+
peerSyncErrCh := psSupervisor.ServeBackground(peerSyncCtx)
405+
defer func() {
406+
peerSyncCancel()
407+
if err := <-peerSyncErrCh; err != nil && !errors.Is(err, context.Canceled) {
408+
log.Infof("peersync supervisor exited: %v", err)
409+
}
410+
}()
382411

383412
sp := swap.NewRequestedSwapsPrinter(requestedSwapStore)
384-
lightningPlugin.SetupClients(liquidRpcWallet, swapService, pol, sp, bitcoinCli, bitcoinOnChainService, pollService, ps)
413+
lightningPlugin.SetupClients(
414+
liquidRpcWallet,
415+
swapService,
416+
pol,
417+
sp,
418+
bitcoinCli,
419+
bitcoinOnChainService,
420+
peerSync,
421+
peerStore,
422+
ps,
423+
)
385424

386425
// We are ready to accept and handle requests.
387426
// FIXME: Once we reworked the recovery service (non-blocking) we want to

0 commit comments

Comments
 (0)