Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/network/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ type Conn interface {
// IsClosed returns whether a connection is fully closed, so it can
// be garbage collected.
IsClosed() bool

// As finds the first conn in Conn's wrapped types that matches target, and
// if one is found, sets target to that conn value and returns true.
// Otherwise, it returns false. Similar to errors.As.
//
// target must be a pointer to the type you are matching against.
//
// This is an EXPERIMENTAL API. Getting access to the underlying type can
// lead to hard to debug issues. For example, if you mutate connection state
// on the underlying type, hooks that relied on only mutating that state
// from the wrapped connection would never be called.
//
// You very likely do not need to use this method.
As(target any) bool
}

// ConnectionState holds information about the connection.
Expand Down
14 changes: 14 additions & 0 deletions core/network/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,20 @@ type MuxedConn interface {

// AcceptStream accepts a stream opened by the other side.
AcceptStream() (MuxedStream, error)

// As finds the first conn in MuxedConn's wrapped types that matches target,
// and if one is found, sets target to that conn value and returns true.
// Otherwise, it returns false. Similar to errors.As.
//
// target must be a pointer to the type you are matching against.
//
// This is an EXPERIMENTAL API. Getting access to the underlying type can
// lead to hard to debug issues. For example, if you mutate connection state
// on the underlying type, hooks that relied on only mutating that state
// from the wrapped connection would never be called.
//
// You very likely do not need to use this method.
As(target any) bool
}

// Multiplexer wraps a net.Conn with a stream multiplexing
Expand Down
121 changes: 121 additions & 0 deletions libp2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ import (
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"
"github.com/libp2p/go-libp2p/p2p/transport/websocket"
webtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
"github.com/libp2p/go-yamux/v5"
"github.com/pion/webrtc/v4"
quicgo "github.com/quic-go/quic-go"
wtgo "github.com/quic-go/webtransport-go"
"go.uber.org/goleak"

ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -842,3 +846,120 @@ func BenchmarkAllAddrs(b *testing.B) {
addrsHost.AllAddrs()
}
}

func TestConnAs(t *testing.T) {
// t.Run("quic conns", func(t *testing.T) {
// h1, err := New(ListenAddrStrings(
// "/ip4/0.0.0.0/udp/0/quic-v1",
// ))
// require.NoError(t, err)
// defer h1.Close()
// h2, err := New(ListenAddrStrings(
// "/ip4/0.0.0.0/udp/0/quic-v1",
// ))
// require.NoError(t, err)
// defer h2.Close()
// err = h1.Connect(context.Background(), peer.AddrInfo{
// ID: h2.ID(),
// Addrs: h2.Addrs(),
// })
// require.NoError(t, err)
// c := h1.Network().ConnsToPeer(h2.ID())[0]

// var quicConn *quicgo.Conn
// require.True(t, c.As(&quicConn))
// })

// t.Run("TCP conn with metrics", func(t *testing.T) {
// h1, err := New(ListenAddrStrings(
// "/ip4/0.0.0.0/tcp/0",
// ), Transport(tcp.NewTCPTransport, tcp.WithMetrics()))
// require.NoError(t, err)
// defer h1.Close()
// h2, err := New(ListenAddrStrings(
// "/ip4/0.0.0.0/tcp/0",
// ), Transport(tcp.NewTCPTransport, tcp.WithMetrics()))
// require.NoError(t, err)
// defer h2.Close()
// err = h1.Connect(context.Background(), peer.AddrInfo{
// ID: h2.ID(),
// Addrs: h2.Addrs(),
// })
// require.NoError(t, err)
// c := h1.Network().ConnsToPeer(h2.ID())[0]

// var yamuxSession *yamux.Session
// require.True(t, c.As(&yamuxSession))
// })

type testCase struct {
name string
listenAddr string
testAs func(t *testing.T, c network.Conn)
}

testCases := []testCase{
{
"QUIC",
"/ip4/0.0.0.0/udp/0/quic-v1",
func(t *testing.T, c network.Conn) {
var quicConn *quicgo.Conn
require.True(t, c.As(&quicConn))
},
},
{
"TCP+Yamux",
"/ip4/0.0.0.0/tcp/0",
func(t *testing.T, c network.Conn) {
var yamuxSession *yamux.Session
require.True(t, c.As(&yamuxSession))
},
},
{
"WebRTC",
"/ip4/0.0.0.0/udp/0/webrtc-direct",
func(t *testing.T, c network.Conn) {
var webrtcPC *webrtc.PeerConnection
require.True(t, c.As(&webrtcPC))
},
},
{
"WebTransport Session",
"/ip4/0.0.0.0/udp/0/quic-v1/webtransport",
func(t *testing.T, c network.Conn) {
var s *wtgo.Session
require.True(t, c.As(&s))
},
},
{
"WebTransport QUIC Conn",
"/ip4/0.0.0.0/udp/0/quic-v1/webtransport",
func(t *testing.T, c network.Conn) {
var quicConn *quicgo.Conn
require.True(t, c.As(&quicConn))
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
h1, err := New(ListenAddrStrings(
tc.listenAddr,
))
require.NoError(t, err)
defer h1.Close()
h2, err := New(ListenAddrStrings(
tc.listenAddr,
))
require.NoError(t, err)
defer h2.Close()
err = h1.Connect(context.Background(), peer.AddrInfo{
ID: h2.ID(),
Addrs: h2.Addrs(),
})
require.NoError(t, err)
c := h1.Network().ConnsToPeer(h2.ID())[0]
tc.testAs(t, c)
})
}
}
8 changes: 8 additions & 0 deletions p2p/muxer/yamux/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ type conn yamux.Session

var _ network.MuxedConn = &conn{}

func (c *conn) As(target any) bool {
if t, ok := target.(**yamux.Session); ok {
*t = (*yamux.Session)(c)
return true
}
return false
}

// NewMuxedConn constructs a new MuxedConn from a yamux.Session.
func NewMuxedConn(m *yamux.Session) network.MuxedConn {
return (*conn)(m)
Expand Down
1 change: 1 addition & 0 deletions p2p/net/connmgr/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ func (m mockConn) NewStream(_ context.Context) (network.Stream, error) { panic("
func (m mockConn) GetStreams() []network.Stream { panic("implement me") }
func (m mockConn) Scope() network.ConnScope { panic("implement me") }
func (m mockConn) ConnState() network.ConnectionState { return network.ConnectionState{} }
func (m mockConn) As(_ any) bool { return false }

func makeSegmentsWithPeerInfos(peerInfos peerInfos) *segments {
var s = func() *segments {
Expand Down
4 changes: 4 additions & 0 deletions p2p/net/mock/mock_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func (c *conn) Close() error {
return nil
}

func (c *conn) As(_ any) bool {
return false
}

func (c *conn) teardown() {
for _, s := range c.allStreams() {
s.Reset()
Expand Down
4 changes: 4 additions & 0 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,10 @@ func wrapWithMetrics(capableConn transport.CapableConn, metricsTracer MetricsTra
return c
}

func (c *connWithMetrics) As(target any) bool {
return c.CapableConn.As(target)
}

func (c *connWithMetrics) completedHandshake() {
c.metricsTracer.CompletedHandshake(time.Since(c.opened), c.ConnState(), c.LocalMultiaddr())
}
Expand Down
4 changes: 4 additions & 0 deletions p2p/net/swarm/swarm_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type Conn struct {

var _ network.Conn = &Conn{}

func (c *Conn) As(target any) bool {
return c.conn.As(target)
}

func (c *Conn) IsClosed() bool {
return c.conn.IsClosed()
}
Expand Down
4 changes: 4 additions & 0 deletions p2p/net/upgrader/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type transportConn struct {

var _ transport.CapableConn = &transportConn{}

func (c *transportConn) As(target any) bool {
return c.MuxedConn.As(target)
}

func (t *transportConn) Transport() transport.Transport {
return t.transport
}
Expand Down
9 changes: 9 additions & 0 deletions p2p/transport/quic/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ type conn struct {
remoteMultiaddr ma.Multiaddr
}

func (c *conn) As(target any) bool {
if t, ok := target.(**quic.Conn); ok {
*t = c.quicConn
return true
}

return false
}

var _ tpt.CapableConn = &conn{}

// Close closes the connection.
Expand Down
8 changes: 8 additions & 0 deletions p2p/transport/webrtc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ func (c *connection) Close() error {
return nil
}

func (c *connection) As(target any) bool {
if target, ok := target.(**webrtc.PeerConnection); ok {
*target = c.pc
return true
}
return false
}

// CloseWithError closes the connection ignoring the error code. As there's no way to signal
// the remote peer on closing the underlying peerconnection, we ignore the error code.
func (c *connection) CloseWithError(_ network.ConnErrorCode) error {
Expand Down
12 changes: 12 additions & 0 deletions p2p/transport/webtransport/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,15 @@ func (c *conn) Transport() tpt.Transport { return c.transport }
func (c *conn) ConnState() network.ConnectionState {
return network.ConnectionState{Transport: "webtransport"}
}

func (c *conn) As(target any) bool {
if target, ok := target.(**quic.Conn); ok {
*target = c.qconn
return true
}
if target, ok := target.(**webtransport.Session); ok {
*target = c.session
return true
}
return false
}
Loading