Skip to content

Commit c0ec378

Browse files
Merge pull request #405 from YusukeShimizu/feature/peersync-package
2 parents 93106af + f47d874 commit c0ec378

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+3894
-956
lines changed

clightning/clightning.go

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,16 @@ import (
1010
"os"
1111
"time"
1212

13-
"github.com/elementsproject/peerswap/log"
14-
"github.com/elementsproject/peerswap/premium"
15-
1613
"github.com/btcsuite/btcd/chaincfg"
1714
"github.com/elementsproject/glightning/gbitcoin"
18-
"github.com/elementsproject/peerswap/onchain"
19-
2015
"github.com/elementsproject/glightning/glightning"
2116
"github.com/elementsproject/glightning/jrpc2"
2217
"github.com/elementsproject/peerswap/lightning"
18+
"github.com/elementsproject/peerswap/log"
2319
"github.com/elementsproject/peerswap/messages"
24-
"github.com/elementsproject/peerswap/poll"
20+
"github.com/elementsproject/peerswap/onchain"
21+
"github.com/elementsproject/peerswap/peersync"
22+
"github.com/elementsproject/peerswap/premium"
2523
"github.com/elementsproject/peerswap/swap"
2624
"github.com/elementsproject/peerswap/wallet"
2725
goerrors "github.com/go-errors/errors"
@@ -75,7 +73,8 @@ type ClightningClient struct {
7573
swaps *swap.SwapService
7674
requestedSwaps *swap.RequestedSwapsPrinter
7775
policy PolicyReloader
78-
pollService *poll.Service
76+
peerSync *peersync.PeerSync
77+
peerStore *peersync.Store
7978
ps *premium.Setting
8079

8180
gbitcoin *gbitcoin.Bitcoin
@@ -326,14 +325,15 @@ func (cl *ClightningClient) GetPreimage() (lightning.Preimage, error) {
326325
func (cl *ClightningClient) SetupClients(liquidWallet wallet.Wallet,
327326
swaps *swap.SwapService,
328327
policy PolicyReloader, requestedSwaps *swap.RequestedSwapsPrinter,
329-
bitcoin *gbitcoin.Bitcoin, bitcoinChain *onchain.BitcoinOnChain, pollService *poll.Service,
328+
bitcoin *gbitcoin.Bitcoin, bitcoinChain *onchain.BitcoinOnChain, peerSync *peersync.PeerSync, peerStore *peersync.Store,
330329
ps *premium.Setting) {
331330
cl.liquidWallet = liquidWallet
332331
cl.requestedSwaps = requestedSwaps
333332
cl.swaps = swaps
334333
cl.policy = policy
335334
cl.gbitcoin = bitcoin
336-
cl.pollService = pollService
335+
cl.peerSync = peerSync
336+
cl.peerStore = peerStore
337337
cl.bitcoinChain = bitcoinChain
338338
cl.ps = ps
339339
if cl.bitcoinChain != nil {
@@ -519,16 +519,6 @@ func (cl *ClightningClient) isPeerConnected(nodeId string) bool {
519519
return peer.Connected
520520
}
521521

522-
// peerRunsPeerSwap returns true if the peer with peerId is listed in the
523-
// pollService.
524-
func (cl *ClightningClient) peerRunsPeerSwap(peerId string) bool {
525-
pollInfo, err := cl.pollService.GetPollFrom(peerId)
526-
if err == nil && pollInfo != nil {
527-
return true
528-
}
529-
return false
530-
}
531-
532522
// This is called after the Plugin starts up successfully
533523
func (cl *ClightningClient) onInit(plugin *glightning.Plugin, options map[string]glightning.Option, config *glightning.Config) {
534524
cl.glightning.StartUp(config.RpcFile, config.LightningDir)
@@ -549,10 +539,20 @@ func (cl *ClightningClient) OnConnect(connectEvent *glightning.ConnectEvent) {
549539
go func() {
550540
for {
551541
time.Sleep(10 * time.Second)
552-
if cl.pollService != nil {
553-
cl.pollService.RequestPoll(
554-
lo.Ternary(connectEvent.PeerId != "",
555-
connectEvent.PeerId, connectEvent.Conn.PeerId))
542+
if cl.peerSync != nil {
543+
peerIDValue := lo.Ternary(connectEvent.PeerId != "",
544+
connectEvent.PeerId, connectEvent.Conn.PeerId)
545+
if peerIDValue == "" {
546+
return
547+
}
548+
peerID, err := peersync.NewPeerID(peerIDValue)
549+
if err != nil {
550+
log.Debugf("invalid peer id for request poll: %v", err)
551+
return
552+
}
553+
if err := cl.peerSync.RequestPoll(cl.ctx, peerID); err != nil {
554+
log.Debugf("failed to request poll for %s: %v", peerIDValue, err)
555+
}
556556
return
557557
}
558558
}

clightning/clightning_commands.go

Lines changed: 38 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package clightning
22

33
import (
4+
"context"
45
"crypto/rand"
56
"encoding/hex"
67
"errors"
@@ -12,6 +13,8 @@ import (
1213

1314
"github.com/elementsproject/peerswap/log"
1415
"github.com/elementsproject/peerswap/peerswaprpc"
16+
"github.com/elementsproject/peerswap/peersync"
17+
"github.com/elementsproject/peerswap/peersync/format"
1518
"github.com/elementsproject/peerswap/premium"
1619

1720
"github.com/elementsproject/glightning/glightning"
@@ -225,8 +228,10 @@ func (l *SwapOut) Call() (jrpc2.Result, error) {
225228
}
226229

227230
// Skip this check when `force` is set.
228-
if !l.Force && !l.cl.peerRunsPeerSwap(fundingChannels.Id) {
229-
return nil, fmt.Errorf("peer does not run peerswap")
231+
if !l.Force {
232+
if l.cl.peerSync == nil || !l.cl.peerSync.HasCompatiblePeer(fundingChannels.Id) {
233+
return nil, fmt.Errorf("peer does not run peerswap")
234+
}
230235
}
231236

232237
if !l.cl.isPeerConnected(fundingChannels.Id) {
@@ -345,8 +350,10 @@ func (l *SwapIn) Call() (jrpc2.Result, error) {
345350
}
346351

347352
// Skip this check when `force` is set.
348-
if !l.Force && !l.cl.peerRunsPeerSwap(fundingChannels.Id) {
349-
return nil, fmt.Errorf("peer does not run peerswap")
353+
if !l.Force {
354+
if l.cl.peerSync == nil || !l.cl.peerSync.HasCompatiblePeer(fundingChannels.Id) {
355+
return nil, fmt.Errorf("peer does not run peerswap")
356+
}
350357
}
351358

352359
if !l.cl.isPeerConnected(fundingChannels.Id) {
@@ -559,91 +566,26 @@ func (l *ListPeers) Call() (jrpc2.Result, error) {
559566
fundingChannels[channel.ShortChannelId] = channel
560567
}
561568

562-
// get polls
563-
polls, err := l.cl.pollService.GetCompatiblePolls()
564-
if err != nil {
565-
return nil, err
569+
compatible := make(map[string]*peersync.Peer)
570+
if l.cl.peerSync != nil {
571+
compatible, err = l.cl.peerSync.CompatiblePeers()
572+
if err != nil {
573+
return nil, err
574+
}
566575
}
567576

568577
peerSwappers := []*peerswaprpc.PeerSwapPeer{}
569578
for _, peer := range peers {
570-
if p, ok := polls[peer.Id]; ok {
579+
peerState, ok := compatible[peer.Id]
580+
if ok {
581+
capability := peerState.Capability()
571582
swaps, err := l.cl.swaps.ListSwapsByPeer(peer.Id)
572583
if err != nil {
573584
return nil, err
574585
}
575586

576-
var paidFees uint64
577-
var ReceiverSwapsOut, ReceiverSwapsIn, ReceiverSatsOut, ReceiverSatsIn uint64
578-
var SenderSwapsOut, SenderSwapsIn, SenderSatsOut, SenderSatsIn uint64
579-
for _, s := range swaps {
580-
// We only list successful swaps. They all end in an
581-
// State_ClaimedPreimage state.
582-
if s.Current == swap.State_ClaimedPreimage {
583-
if s.Role == swap.SWAPROLE_SENDER {
584-
paidFees += s.Data.OpeningTxFee
585-
if s.Type == swap.SWAPTYPE_OUT {
586-
SenderSwapsOut++
587-
SenderSatsOut += s.Data.GetAmount()
588-
} else {
589-
SenderSwapsIn++
590-
SenderSatsIn += s.Data.GetAmount()
591-
}
592-
} else {
593-
if s.Type == swap.SWAPTYPE_OUT {
594-
ReceiverSwapsOut++
595-
ReceiverSatsOut += s.Data.GetAmount()
596-
} else {
597-
ReceiverSwapsIn++
598-
ReceiverSatsIn += s.Data.GetAmount()
599-
}
600-
}
601-
}
602-
}
603-
604-
peerSwapPeer := &peerswaprpc.PeerSwapPeer{
605-
NodeId: peer.Id,
606-
SwapsAllowed: p.PeerAllowed,
607-
SupportedAssets: p.Assets,
608-
AsSender: &peerswaprpc.SwapStats{
609-
SwapsOut: SenderSwapsOut,
610-
SwapsIn: SenderSwapsIn,
611-
SatsOut: SenderSatsOut,
612-
SatsIn: SenderSatsIn,
613-
},
614-
AsReceiver: &peerswaprpc.SwapStats{
615-
SwapsOut: ReceiverSwapsOut,
616-
SwapsIn: ReceiverSwapsIn,
617-
SatsOut: ReceiverSatsOut,
618-
SatsIn: ReceiverSatsIn,
619-
},
620-
PaidFee: paidFees,
621-
PeerPremium: &peerswaprpc.PeerPremium{
622-
NodeId: peer.Id,
623-
Rates: []*peerswaprpc.PremiumRate{
624-
{
625-
Asset: peerswaprpc.AssetType_BTC,
626-
Operation: peerswaprpc.OperationType_SWAP_IN,
627-
PremiumRatePpm: p.BTCSwapInPremiumRatePPM,
628-
},
629-
{
630-
Asset: peerswaprpc.AssetType_BTC,
631-
Operation: peerswaprpc.OperationType_SWAP_OUT,
632-
PremiumRatePpm: p.BTCSwapOutPremiumRatePPM,
633-
},
634-
{
635-
Asset: peerswaprpc.AssetType_LBTC,
636-
Operation: peerswaprpc.OperationType_SWAP_IN,
637-
PremiumRatePpm: p.LBTCSwapInPremiumRatePPM,
638-
},
639-
{
640-
Asset: peerswaprpc.AssetType_LBTC,
641-
Operation: peerswaprpc.OperationType_SWAP_OUT,
642-
PremiumRatePpm: p.LBTCSwapOutPremiumRatePPM,
643-
},
644-
},
645-
},
646-
}
587+
view := format.BuildPeerView(peer.Id, capability, swaps)
588+
peerSwapPeer := peerswaprpc.NewPeerSwapPeerFromView(view)
647589
channels, err := l.cl.glightning.ListChannelsBySource(peer.Id)
648590
if err != nil {
649591
return nil, err
@@ -754,8 +696,10 @@ func (c ReloadPolicyFile) Call() (jrpc2.Result, error) {
754696
if err != nil {
755697
return nil, err
756698
}
757-
// Resend poll
758-
c.cl.pollService.PollAllPeers()
699+
// Resend capability updates
700+
if c.cl.peerSync != nil {
701+
c.cl.peerSync.ForcePollAllPeers(context.Background())
702+
}
759703
return c.cl.policy.Get(), nil
760704
}
761705

@@ -1217,10 +1161,13 @@ func (c *UpdatePremiumRate) Call() (jrpc2.Result, error) {
12171161
if err != nil {
12181162
return nil, fmt.Errorf("error creating premium rate: %v", err)
12191163
}
1220-
err = c.cl.ps.SetRate(c.PeerID, rate)
1164+
err = c.cl.ps.SetRate(context.Background(), c.PeerID, rate)
12211165
if err != nil {
12221166
return nil, fmt.Errorf("error setting premium rate: %v", err)
12231167
}
1168+
if c.cl.peerSync != nil {
1169+
c.cl.peerSync.ForcePollAllPeers(context.Background())
1170+
}
12241171
return &peerswaprpc.PremiumRate{
12251172
Asset: peerswaprpc.ToAssetType(rate.Asset()),
12261173
Operation: peerswaprpc.ToOperationType(rate.Operation()),
@@ -1263,11 +1210,14 @@ func (c *DeletePremiumRate) Call() (jrpc2.Result, error) {
12631210
if !c.cl.isReady {
12641211
return nil, ErrWaitingForReady
12651212
}
1266-
err := c.cl.ps.DeleteRate(c.PeerID, toPremiumAssetType(c.Asset),
1213+
err := c.cl.ps.DeleteRate(context.Background(), c.PeerID, toPremiumAssetType(c.Asset),
12671214
toPremiumOperationType(c.Operation))
12681215
if err != nil {
12691216
return nil, fmt.Errorf("error deleting premium rate: %v", err)
12701217
}
1218+
if c.cl.peerSync != nil {
1219+
c.cl.peerSync.ForcePollAllPeers(context.Background())
1220+
}
12711221
return &peerswaprpc.PremiumRate{
12721222
Asset: peerswaprpc.ToAssetType(toPremiumAssetType(c.Asset)),
12731223
Operation: peerswaprpc.ToOperationType(toPremiumOperationType(c.Operation)),
@@ -1315,10 +1265,13 @@ func (c *UpdateGlobalPremiumRate) Call() (jrpc2.Result, error) {
13151265
if err != nil {
13161266
return nil, fmt.Errorf("error creating premium rate: %v", err)
13171267
}
1318-
err = c.cl.ps.SetDefaultRate(rate)
1268+
err = c.cl.ps.SetDefaultRate(context.Background(), rate)
13191269
if err != nil {
13201270
return nil, fmt.Errorf("error setting default premium rate: %v", err)
13211271
}
1272+
if c.cl.peerSync != nil {
1273+
c.cl.peerSync.ForcePollAllPeers(context.Background())
1274+
}
13221275
return &peerswaprpc.PremiumRate{
13231276
Asset: peerswaprpc.ToAssetType(rate.Asset()),
13241277
Operation: peerswaprpc.ToOperationType(rate.Operation()),

cmd/peerswap-plugin/main.go

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/elementsproject/peerswap/log"
1515
"github.com/elementsproject/peerswap/lwk"
1616
"github.com/elementsproject/peerswap/version"
17+
"github.com/thejerf/suture/v4"
1718
"golang.org/x/sys/unix"
1819

1920
"github.com/vulpemventures/go-elements/network"
@@ -26,8 +27,8 @@ import (
2627
"github.com/elementsproject/peerswap/clightning"
2728
"github.com/elementsproject/peerswap/messages"
2829
"github.com/elementsproject/peerswap/onchain"
30+
"github.com/elementsproject/peerswap/peersync"
2931
"github.com/elementsproject/peerswap/policy"
30-
"github.com/elementsproject/peerswap/poll"
3132
"github.com/elementsproject/peerswap/premium"
3233
"github.com/elementsproject/peerswap/swap"
3334
"github.com/elementsproject/peerswap/txwatcher"
@@ -372,16 +373,54 @@ func run(ctx context.Context, lightningPlugin *clightning.ClightningClient) erro
372373
return err
373374
}
374375

375-
pollStore, err := poll.NewStore(swapDb)
376+
peerSyncDBPath := filepath.Join(config.PeerswapDir, "peersync.db")
377+
peerStore, err := peersync.NewStore(peerSyncDBPath)
376378
if err != nil {
377379
return err
378380
}
379-
pollService := poll.NewService(1*time.Hour, 2*time.Hour, pollStore, lightningPlugin, pol, lightningPlugin, supportedAssets, ps)
380-
pollService.Start()
381-
defer pollService.Stop()
381+
defer func() {
382+
if closeErr := peerStore.Close(); closeErr != nil {
383+
log.Infof("peersync store close failed: %v", closeErr)
384+
}
385+
}()
386+
387+
nodeID, err := peersync.NewPeerID(lightningPlugin.GetNodeId())
388+
if err != nil {
389+
return err
390+
}
391+
392+
// Use the peersync CLN lightning adapter; it self-registers message handler
393+
clnAdapter := peersync.NewClnLightningAdapter(lightningPlugin)
394+
peerSync := peersync.NewPeerSync(nodeID, peerStore, clnAdapter, pol, supportedAssets, ps)
395+
396+
psSupervisor := suture.New("peersync", suture.Spec{
397+
EventHook: func(e suture.Event) {
398+
log.Infof("peersync supervisor event: %s", e)
399+
},
400+
})
401+
psSupervisor.Add(peerSync)
402+
403+
peerSyncCtx, peerSyncCancel := context.WithCancel(ctx)
404+
peerSyncErrCh := psSupervisor.ServeBackground(peerSyncCtx)
405+
defer func() {
406+
peerSyncCancel()
407+
if err := <-peerSyncErrCh; err != nil && !errors.Is(err, context.Canceled) {
408+
log.Infof("peersync supervisor exited: %v", err)
409+
}
410+
}()
382411

383412
sp := swap.NewRequestedSwapsPrinter(requestedSwapStore)
384-
lightningPlugin.SetupClients(liquidRpcWallet, swapService, pol, sp, bitcoinCli, bitcoinOnChainService, pollService, ps)
413+
lightningPlugin.SetupClients(
414+
liquidRpcWallet,
415+
swapService,
416+
pol,
417+
sp,
418+
bitcoinCli,
419+
bitcoinOnChainService,
420+
peerSync,
421+
peerStore,
422+
ps,
423+
)
385424

386425
// We are ready to accept and handle requests.
387426
// FIXME: Once we reworked the recovery service (non-blocking) we want to

0 commit comments

Comments
 (0)