Skip to content

Commit a0848ea

Browse files
authored
Merge pull request #7 from storacha/feat/enable-wss
feat(lp2p): add forked lp2p with wss support
2 parents 5c85a7e + 93055f9 commit a0848ea

File tree

3 files changed

+235
-4
lines changed

3 files changed

+235
-4
lines changed

cron/pollproviders.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ import (
1010

1111
infomempeerstore "code.riba.cloud/go/libp2p-infomempeerstore"
1212
"code.riba.cloud/go/toolbox-interplanetary/fil"
13-
"code.riba.cloud/go/toolbox-interplanetary/lp2p"
1413
"code.riba.cloud/go/toolbox/cmn"
1514
"code.riba.cloud/go/toolbox/ufcli"
1615
filaddr "github.com/filecoin-project/go-address"
1716
"github.com/georgysavva/scany/pgxscan"
1817
"github.com/multiformats/go-multiaddr"
1918
"github.com/storacha/spade/internal/app"
2019
"github.com/storacha/spade/internal/filtypes"
20+
"github.com/storacha/spade/internal/lp2p"
2121
"golang.org/x/sync/errgroup"
2222
)
2323

@@ -218,7 +218,7 @@ func getSPInfo(ctx context.Context, sp filaddr.Address, timeOut time.Duration) (
218218
return spi, nil
219219
}
220220

221-
nodeHost, peerStore, err := lp2p.NewPlainNodeTCP(timeOut)
221+
nodeHost, peerStore, err := lp2p.NewPlainNode(timeOut)
222222
if err != nil {
223223
return spInfo{}, cmn.WrErr(err)
224224
}

cron/proposepending.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ import (
88
"sync/atomic"
99
"time"
1010

11-
"code.riba.cloud/go/toolbox-interplanetary/lp2p"
1211
"code.riba.cloud/go/toolbox/cmn"
1312
"code.riba.cloud/go/toolbox/ufcli"
1413
"github.com/georgysavva/scany/pgxscan"
1514
"github.com/google/uuid"
1615
"github.com/storacha/spade/internal/app"
1716
"github.com/storacha/spade/internal/filtypes"
17+
"github.com/storacha/spade/internal/lp2p"
1818
"golang.org/x/sync/errgroup"
1919
"golang.org/x/xerrors"
2020

@@ -140,7 +140,7 @@ var proposePending = &ufcli.Command{
140140
}
141141
tot.uniqueProviders = len(props)
142142

143-
nodeHost, _, err := lp2p.NewPlainNodeTCP(time.Duration(proposalTimeout) * time.Second)
143+
nodeHost, _, err := lp2p.NewPlainNode(time.Duration(proposalTimeout) * time.Second)
144144
if err != nil {
145145
return cmn.WrErr(err)
146146
}

internal/lp2p/lp2p.go

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
// Package lp2p is a fork of code.riba.cloud/go/toolbox-interplanetary/lp2p with
2+
// wss support added
3+
package lp2p //nolint:revive
4+
5+
import (
6+
"context"
7+
"errors"
8+
"fmt"
9+
"io"
10+
"math"
11+
"time"
12+
13+
infomempeerstore "code.riba.cloud/go/libp2p-infomempeerstore"
14+
"code.riba.cloud/go/toolbox/cmn"
15+
cborutil "github.com/filecoin-project/go-cbor-util"
16+
lp2p "github.com/libp2p/go-libp2p"
17+
lp2phost "github.com/libp2p/go-libp2p/core/host"
18+
lp2pnet "github.com/libp2p/go-libp2p/core/network"
19+
lp2ppeer "github.com/libp2p/go-libp2p/core/peer"
20+
lp2pproto "github.com/libp2p/go-libp2p/core/protocol"
21+
lp2pyamux "github.com/libp2p/go-libp2p/p2p/muxer/yamux"
22+
lp2pconnmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr"
23+
lp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
24+
lp2ptcp "github.com/libp2p/go-libp2p/p2p/transport/tcp"
25+
lp2pwss "github.com/libp2p/go-libp2p/p2p/transport/websocket"
26+
"github.com/libp2p/go-yamux/v4"
27+
"github.com/multiformats/go-multiaddr"
28+
)
29+
30+
type Host = lp2phost.Host
31+
type PeerID = lp2ppeer.ID
32+
33+
// AddrInfo is a better annotated version of lp2ppeer.AddrInfo
34+
type AddrInfo struct {
35+
PeerID *PeerID `json:"peerid"`
36+
MultiAddrs []multiaddr.Multiaddr `json:"multiaddrs"`
37+
}
38+
39+
func (ai *AddrInfo) ToLp2p() lp2ppeer.AddrInfo {
40+
return lp2ppeer.AddrInfo{
41+
ID: *ai.PeerID,
42+
Addrs: ai.MultiAddrs,
43+
}
44+
}
45+
46+
func AddrInfoFromLp2p(ai lp2ppeer.AddrInfo) AddrInfo {
47+
return AddrInfo{
48+
PeerID: &ai.ID,
49+
MultiAddrs: ai.Addrs,
50+
}
51+
}
52+
53+
func NewPlainNode(withTimeout time.Duration) (lp2phost.Host, *infomempeerstore.PeerStore, error) {
54+
ps, err := infomempeerstore.NewPeerstore()
55+
if err != nil {
56+
return nil, nil, cmn.WrErr(err)
57+
}
58+
59+
connmgr, err := lp2pconnmgr.NewConnManager(8192, 16384) // effectively deactivate
60+
if err != nil {
61+
return nil, nil, cmn.WrErr(err)
62+
}
63+
64+
yc := yamux.DefaultConfig()
65+
yc.EnableKeepAlive = true
66+
yc.KeepAliveInterval = 10 * time.Second
67+
yc.MeasureRTTInterval = 10 * time.Second
68+
yc.ConnectionWriteTimeout = 5 * time.Second
69+
// rest of default settings from https://github.com/libp2p/go-libp2p/blob/v0.27.3/p2p/muxer/yamux/transport.go#L17-L34
70+
yc.MaxStreamWindowSize = uint32(32 << 20)
71+
yc.LogOutput = io.Discard
72+
yc.ReadBufSize = 0
73+
yc.MaxIncomingStreams = math.MaxUint32
74+
75+
nodeHost, err := lp2p.New(
76+
lp2p.Peerstore(ps), // allows us collect random on-connect data
77+
lp2p.RandomIdentity, // *NEVER* reuse a peerid
78+
lp2p.ConnectionManager(connmgr),
79+
lp2p.ResourceManager(&lp2pnet.NullResourceManager{}),
80+
lp2p.Ping(false),
81+
lp2p.DisableMetrics(),
82+
lp2p.DisableRelay(),
83+
lp2p.NoListenAddrs,
84+
lp2p.NoTransports,
85+
lp2p.Transport(lp2pwss.New, lp2pwss.WithHandshakeTimeout(withTimeout+100*time.Millisecond)),
86+
lp2p.Transport(lp2ptcp.NewTCPTransport, lp2ptcp.WithConnectionTimeout(withTimeout+100*time.Millisecond)),
87+
lp2p.Security(lp2ptls.ID, lp2ptls.New),
88+
lp2p.UserAgent("interplanetary-toolbox"),
89+
lp2p.WithDialTimeout(withTimeout),
90+
lp2p.Muxer(lp2pyamux.ID, (*lp2pyamux.Transport)(yc)),
91+
)
92+
if err != nil {
93+
return nil, nil, cmn.WrErr(err)
94+
}
95+
96+
return nodeHost, ps, nil
97+
}
98+
99+
var DefaultRPCTimeout = time.Duration(5 * time.Minute)
100+
101+
type RPCTook struct {
102+
LocalPeerID *PeerID `json:"dialing_peerid"`
103+
PeerConnectMsecs *int64 `json:"peer_connect_took_msecs,omitempty"`
104+
StreamOpenMsecs *int64 `json:"stream_open_took_msecs,omitempty"`
105+
StreamWriteMsecs *int64 `json:"stream_write_took_msecs,omitempty"`
106+
StreamReadMsecs *int64 `json:"stream_read_took_msecs,omitempty"`
107+
}
108+
109+
func ExecCborRPC(
110+
ctx context.Context, nodeHost Host,
111+
peerAddr AddrInfo, proto lp2pproto.ID,
112+
args interface{}, resp interface{},
113+
) (RPCTook, error) {
114+
115+
myID := nodeHost.ID()
116+
t := RPCTook{LocalPeerID: &myID}
117+
118+
unprot, connectTook, err := ConnectAndProtect(ctx, nodeHost, peerAddr)
119+
t.PeerConnectMsecs = &connectTook
120+
if err != nil {
121+
return t, err
122+
}
123+
defer unprot()
124+
125+
t0 := time.Now()
126+
st, err := nodeHost.NewStream(ctx, *peerAddr.PeerID, proto)
127+
t1 := time.Since(t0).Milliseconds()
128+
t.StreamOpenMsecs = &t1
129+
if err != nil {
130+
return t, fmt.Errorf("error while opening %s stream: %w", proto, err)
131+
}
132+
defer st.Close() //nolint:errcheck
133+
134+
// inherit deadline from the context due to the bizarere duality of needing both
135+
// - a context for NewStream
136+
// - a clock-deadline for the read/write
137+
// if unavailable - do an absurdly long DefaultRPCTimeout
138+
dline, hasDline := ctx.Deadline()
139+
if !hasDline {
140+
dline = time.Now().Add(DefaultRPCTimeout)
141+
}
142+
st.SetDeadline(dline) //nolint:errcheck
143+
defer st.SetDeadline(time.Time{}) //nolint:errcheck
144+
145+
if args != nil {
146+
t0 = time.Now()
147+
err = cborutil.WriteCborRPC(st, args)
148+
t2 := time.Since(t0).Milliseconds()
149+
t.StreamWriteMsecs = &t2
150+
151+
if err != nil {
152+
return t, fmt.Errorf("error while writing to %s stream: %w", proto, err)
153+
}
154+
}
155+
156+
t0 = time.Now()
157+
err = cborutil.ReadCborRPC(st, resp)
158+
t3 := time.Since(t0).Milliseconds()
159+
t.StreamReadMsecs = &t3
160+
161+
if err != nil {
162+
return t, fmt.Errorf("error while reading %s response: %w", proto, err)
163+
}
164+
165+
return t, nil
166+
}
167+
168+
func ConnectAndProtect(ctx context.Context, nodeHost Host, ai AddrInfo) (closer func(), tookMsecs int64, err error) {
169+
t0 := time.Now()
170+
171+
protTag := fmt.Sprintf("conn-%s-%d", ai.PeerID.String(), t0.UnixNano())
172+
nodeHost.ConnManager().Protect(*ai.PeerID, protTag)
173+
closer = func() {
174+
nodeHost.ConnManager().Unprotect(*ai.PeerID, protTag)
175+
}
176+
177+
err = nodeHost.Connect(ctx, ai.ToLp2p())
178+
tookMsecs = time.Since(t0).Milliseconds()
179+
180+
if err != nil {
181+
// unprotect right away on error
182+
closer()
183+
closer = func() {}
184+
err = fmt.Errorf("error ensuring connection to %s: %w", ai.PeerID.String(), err)
185+
}
186+
187+
return
188+
}
189+
190+
func AssembleAddrInfo[M interface{ ~string | ~[]byte }](pID *PeerID, addrs []M) (AddrInfo, error) {
191+
192+
var ai AddrInfo
193+
errs := make([]error, 0, 4)
194+
195+
if pID == nil || *pID == "" {
196+
errs = append(errs, errors.New("no peerID supplied"))
197+
} else {
198+
decID, err := lp2ppeer.Decode(pID.String())
199+
if err != nil {
200+
errs = append(errs, fmt.Errorf("peerID '%s' is not valid: %w", pID.String(), err))
201+
} else {
202+
ai.PeerID = &decID
203+
}
204+
}
205+
206+
maddrs := make([]multiaddr.Multiaddr, 0, len(addrs))
207+
var err error
208+
var ma multiaddr.Multiaddr
209+
for i, encMA := range addrs {
210+
211+
switch any(encMA).(type) {
212+
case string:
213+
ma, err = multiaddr.NewMultiaddr(string(encMA))
214+
default:
215+
ma, err = multiaddr.NewMultiaddrBytes([]byte(encMA))
216+
}
217+
218+
if err != nil {
219+
errs = append(errs, fmt.Errorf("multiaddress entry '%x' (#%d) is not valid: %w", encMA, i, err))
220+
} else {
221+
maddrs = append(maddrs, ma)
222+
}
223+
}
224+
225+
if len(errs) > 0 {
226+
return ai, errors.Join(errs...)
227+
}
228+
229+
ai.MultiAddrs = maddrs
230+
return ai, nil
231+
}

0 commit comments

Comments
 (0)