diff --git a/core/sec/security.go b/core/sec/security.go index d9e9183298..ddb7f230e8 100644 --- a/core/sec/security.go +++ b/core/sec/security.go @@ -9,6 +9,9 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/transport/magiselect" + "github.com/multiformats/go-multiaddr" + mafmt "github.com/multiformats/go-multiaddr-fmt" ) // SecureConn is an authenticated, encrypted connection. @@ -31,6 +34,18 @@ type SecureTransport interface { ID() protocol.ID } +// StraightableSecureTransport can be implemented by security transports which support being ran straight on the stream. +// This allows them to skip the multistream security handshake. +type StraightableSecureTransport interface { + SecureTransport + + // Suffix indicate the trailing component which allows to skip the multistream select exchange. + Suffix() multiaddr.Multiaddr + SuffixProtocol() int + SuffixMatcher() mafmt.Pattern + magiselect.Matcher +} + type ErrPeerIDMismatch struct { Expected peer.ID Actual peer.ID diff --git a/core/transport/transport.go b/core/transport/transport.go index d56a3cff06..e6c1100dcd 100644 --- a/core/transport/transport.go +++ b/core/transport/transport.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ma "github.com/multiformats/go-multiaddr" + mafmt "github.com/multiformats/go-multiaddr-fmt" manet "github.com/multiformats/go-multiaddr/net" ) @@ -96,6 +97,14 @@ type Listener interface { Multiaddr() ma.Multiaddr } +// ListenerFromUpgrader is a workaround to let the swarm append suffixes, it is optionally implemented by Listeners. +// FIXME: I want 8962b2ae336d94627f2f4361f96799ee3a5bd9e4 but it was reverted in 1c8eaabfd385346a7c41b988e2dbc2e20ddfa460 and I'm not in the mood to figure out this kind of mess. +type ListenerFromUpgrader interface { + Listener + + Upgrader() Upgrader +} + // ErrListenerClosed is returned by Listener.Accept when the listener is gracefully closed. var ErrListenerClosed = errors.New("listener closed") @@ -121,9 +130,18 @@ type TransportNetwork interface { // to a full transport connection (secure and multiplexed). type Upgrader interface { // UpgradeListener upgrades the passed multiaddr-net listener into a full libp2p-transport listener. - UpgradeListener(Transport, manet.Listener) Listener - // Upgrade upgrades the multiaddr/net connection into a full libp2p-transport connection. - Upgrade(ctx context.Context, t Transport, maconn manet.Conn, dir network.Direction, p peer.ID, scope network.ConnManagementScope) (CapableConn, error) + UpgradeListener(Transport, manet.Listener) ListenerFromUpgrader + + // UpgradeOutbound/Inbound upgrades the multiaddr/net connection into a full libp2p-transport connection. + // suffix can be nil if no suffix is present. + UpgradeOutbound(ctx context.Context, t Transport, maconn manet.Conn, suffix ma.Multiaddr, p peer.ID, scope network.ConnManagementScope) (CapableConn, error) + UpgradeInbound(ctx context.Context, t Transport, maconn manet.Conn, p peer.ID, scope network.ConnManagementScope) (CapableConn, error) + + // Suffixes let the Upgrader indicate optional maddr suffixes which can be used to skip parts of the negociation. + // A nil maddr indicate that no suffix must be applied (multistream-select will be used). + Suffixes() []ma.Multiaddr + SuffixesProtocols() []int + SuffixMatcher() mafmt.Pattern } // DialUpdater provides updates on in progress dials. diff --git a/go.mod b/go.mod index 2e2edc1092..98b6e00451 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/multiformats/go-base32 v0.1.0 github.com/multiformats/go-multiaddr v0.12.4 github.com/multiformats/go-multiaddr-dns v0.3.1 - github.com/multiformats/go-multiaddr-fmt v0.1.0 + github.com/multiformats/go-multiaddr-fmt v0.1.1-0.20240317044316-238036a861f0 github.com/multiformats/go-multibase v0.2.0 github.com/multiformats/go-multicodec v0.9.0 github.com/multiformats/go-multihash v0.2.3 diff --git a/go.sum b/go.sum index 3c44afb25e..deb7b84ff8 100644 --- a/go.sum +++ b/go.sum @@ -233,14 +233,13 @@ github.com/multiformats/go-base32 v0.1.0 h1:pVx9xoSPqEIQG8o+UbAe7DNi51oej1NtK+aG github.com/multiformats/go-base32 v0.1.0/go.mod h1:Kj3tFY6zNr+ABYMqeUNeGvkIC/UYgtWibDcT0rExnbI= github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9rQyccr0= github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4= -github.com/multiformats/go-multiaddr v0.1.1/go.mod h1:aMKBKNEYmzmDmxfX88/vz+J5IU55txyt0p4aiWVohjo= github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4= github.com/multiformats/go-multiaddr v0.12.4 h1:rrKqpY9h+n80EwhhC/kkcunCZZ7URIF8yN1WEUt2Hvc= github.com/multiformats/go-multiaddr v0.12.4/go.mod h1:sBXrNzucqkFJhvKOiwwLyqamGa/P5EIXNPLovyhQCII= github.com/multiformats/go-multiaddr-dns v0.3.1 h1:QgQgR+LQVt3NPTjbrLLpsaT2ufAA2y0Mkk+QRVJbW3A= github.com/multiformats/go-multiaddr-dns v0.3.1/go.mod h1:G/245BRQ6FJGmryJCrOuTdB37AMA5AMOVuO6NY3JwTk= -github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/eQsuaL3/CWe167E= -github.com/multiformats/go-multiaddr-fmt v0.1.0/go.mod h1:hGtDIW4PU4BqJ50gW2quDuPVjyWNZxToGUh/HwTZYJo= +github.com/multiformats/go-multiaddr-fmt v0.1.1-0.20240317044316-238036a861f0 h1:KCd9ZVLosRwAYaZfRk/BeT9dBkTBIJpG/GwMI0sE3hk= +github.com/multiformats/go-multiaddr-fmt v0.1.1-0.20240317044316-238036a861f0/go.mod h1:fU4KrUT9EeccvWYMheDXrJlEgjr38O1V7sEyshNz3BI= github.com/multiformats/go-multibase v0.2.0 h1:isdYCVLvksgWlMW9OZRYJEa9pZETFivncJHmHnnd87g= github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6oPyjtnzK/XTFDPkNuk= github.com/multiformats/go-multicodec v0.9.0 h1:pb/dlPnzee/Sxv/j4PmkDRxCOi3hXTz3IbPKOXWJkmg= diff --git a/libp2p_test.go b/libp2p_test.go index ea52e56470..489f663b28 100644 --- a/libp2p_test.go +++ b/libp2p_test.go @@ -108,10 +108,10 @@ func TestDefaultListenAddrs(t *testing.T) { h.Close() // Test 2: Listen addr only include relay if user defined transport is passed. - h, err = New(Transport(tcp.NewTCPTransport)) + h, err = New(Transport(tcp.NewTCPTransport), Security(noise.ID, noise.New)) require.NoError(t, err) - if len(h.Network().ListenAddresses()) != 1 { + if len(h.Network().ListenAddresses()) != 2 /* /tcp and /tcp/noise */ { t.Error("expected one listen addr with user defined transport") } if reCircuit.FindStringSubmatchIndex(h.Network().ListenAddresses()[0].String()) == nil { diff --git a/p2p/net/swarm/swarm_addr.go b/p2p/net/swarm/swarm_addr.go index b2e3e4e8aa..aae3c26e3f 100644 --- a/p2p/net/swarm/swarm_addr.go +++ b/p2p/net/swarm/swarm_addr.go @@ -3,6 +3,7 @@ package swarm import ( "time" + "github.com/libp2p/go-libp2p/core/transport" manet "github.com/multiformats/go-multiaddr/net" ma "github.com/multiformats/go-multiaddr" @@ -18,7 +19,22 @@ func (s *Swarm) ListenAddresses() []ma.Multiaddr { func (s *Swarm) listenAddressesNoLock() []ma.Multiaddr { addrs := make([]ma.Multiaddr, 0, len(s.listeners.m)+10) // A bit extra so we may avoid an extra allocation in the for loop below. for l := range s.listeners.m { - addrs = append(addrs, l.Multiaddr()) + addr := l.Multiaddr() + + // FIXME: this is a hack because we don't return multimple addresses from .Multiaddr, see the docs of transport.ListenerFromUpgrader. + if lu, ok := l.(transport.ListenerFromUpgrader); ok { + u := lu.Upgrader() + for _, suffix := range u.Suffixes() { + if suffix == nil { + // implicit multistream-select + addrs = append(addrs, addr) + } else { + addrs = append(addrs, addr.Encapsulate(suffix)) + } + } + } else { + addrs = append(addrs, addr) + } } return addrs } diff --git a/p2p/net/upgrader/listener.go b/p2p/net/upgrader/listener.go index 8af2791b36..f66caefcdc 100644 --- a/p2p/net/upgrader/listener.go +++ b/p2p/net/upgrader/listener.go @@ -119,7 +119,7 @@ func (l *listener) handleIncoming() { ctx, cancel := context.WithTimeout(l.ctx, l.upgrader.acceptTimeout) defer cancel() - conn, err := l.upgrader.Upgrade(ctx, l.transport, maconn, network.DirInbound, "", connScope) + conn, err := l.upgrader.UpgradeInbound(ctx, l.transport, maconn, "", connScope) if err != nil { // Don't bother bubbling this up. We just failed // to completely negotiate the connection. @@ -179,4 +179,9 @@ func (l *listener) String() string { return fmt.Sprintf("", l.Multiaddr()) } +func (l *listener) Upgrader() transport.Upgrader { + return l.upgrader +} + var _ transport.Listener = (*listener)(nil) +var _ transport.ListenerFromUpgrader = (*listener)(nil) diff --git a/p2p/net/upgrader/upgrader.go b/p2p/net/upgrader/upgrader.go index 3a6f8b9f52..dbea50bd71 100644 --- a/p2p/net/upgrader/upgrader.go +++ b/p2p/net/upgrader/upgrader.go @@ -15,7 +15,10 @@ import ( "github.com/libp2p/go-libp2p/core/sec" "github.com/libp2p/go-libp2p/core/transport" "github.com/libp2p/go-libp2p/p2p/net/pnet" + "github.com/libp2p/go-libp2p/p2p/transport/magiselect" + ma "github.com/multiformats/go-multiaddr" + mafmt "github.com/multiformats/go-multiaddr-fmt" manet "github.com/multiformats/go-multiaddr/net" mss "github.com/multiformats/go-multistream" ) @@ -61,6 +64,9 @@ type upgrader struct { securityMuxer *mss.MultistreamMuxer[protocol.ID] securityIDs []protocol.ID + straightSecurity []sec.StraightableSecureTransport + straightMatcher mafmt.Pattern + // AcceptTimeout is the maximum duration an Accept is allowed to take. // This includes the time between accepting the raw network connection, // protocol selection as well as the handshake, if applicable. @@ -96,15 +102,25 @@ func New(security []sec.SecureTransport, muxers []StreamMuxer, psk ipnet.PSK, rc u.muxerIDs = append(u.muxerIDs, m.ID) } u.securityIDs = make([]protocol.ID, 0, len(security)) + u.straightSecurity = make([]sec.StraightableSecureTransport, 0, len(security)) + suffixes := []mafmt.Pattern{mafmt.Nothing} for _, s := range security { u.securityMuxer.AddHandler(s.ID(), nil) u.securityIDs = append(u.securityIDs, s.ID()) + if straight, ok := s.(sec.StraightableSecureTransport); ok { + u.straightSecurity = append(u.straightSecurity, straight) + if straight.Suffix() == nil { + return nil, fmt.Errorf("StraightSecureTransport %q returned an empty suffix", straight.ID()) + } + suffixes = append(suffixes, straight.SuffixMatcher()) + } } + u.straightMatcher = mafmt.Or(suffixes...) return u, nil } // UpgradeListener upgrades the passed multiaddr-net listener into a full libp2p-transport listener. -func (u *upgrader) UpgradeListener(t transport.Transport, list manet.Listener) transport.Listener { +func (u *upgrader) UpgradeListener(t transport.Transport, list manet.Listener) transport.ListenerFromUpgrader { ctx, cancel := context.WithCancel(context.Background()) l := &listener{ Listener: list, @@ -120,17 +136,20 @@ func (u *upgrader) UpgradeListener(t transport.Transport, list manet.Listener) t return l } -// Upgrade upgrades the multiaddr/net connection into a full libp2p-transport connection. -func (u *upgrader) Upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, dir network.Direction, p peer.ID, connScope network.ConnManagementScope) (transport.CapableConn, error) { - c, err := u.upgrade(ctx, t, maconn, dir, p, connScope) - if err != nil { - connScope.Done() - return nil, err - } - return c, nil +func (u *upgrader) UpgradeOutbound(ctx context.Context, t transport.Transport, maconn manet.Conn, suffix ma.Multiaddr, p peer.ID, scope network.ConnManagementScope) (transport.CapableConn, error) { + return u.upgrade(ctx, t, maconn, network.DirOutbound, suffix, p, scope) +} +func (u *upgrader) UpgradeInbound(ctx context.Context, t transport.Transport, maconn manet.Conn, p peer.ID, scope network.ConnManagementScope) (transport.CapableConn, error) { + return u.upgrade(ctx, t, maconn, network.DirInbound, nil, p, scope) } -func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, dir network.Direction, p peer.ID, connScope network.ConnManagementScope) (transport.CapableConn, error) { +// suffix is only used for Outbound connections +func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, dir network.Direction, suffix ma.Multiaddr, p peer.ID, connScope network.ConnManagementScope) (_ transport.CapableConn, err error) { + defer func() { + if err != nil { + connScope.Done() + } + }() if dir == network.DirOutbound && p == "" { return nil, ErrNilPeer } @@ -153,7 +172,7 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma } isServer := dir == network.DirInbound - sconn, security, err := u.setupSecurity(ctx, conn, p, isServer) + sconn, security, err := u.setupSecurity(ctx, conn, suffix, p, isServer) if err != nil { conn.Close() return nil, fmt.Errorf("failed to negotiate security protocol: %w", err) @@ -200,8 +219,8 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma return tc, nil } -func (u *upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID, isServer bool) (sec.SecureConn, protocol.ID, error) { - st, err := u.negotiateSecurity(ctx, conn, isServer) +func (u *upgrader) setupSecurity(ctx context.Context, conn net.Conn, suffix ma.Multiaddr, p peer.ID, isServer bool) (_ sec.SecureConn, _ protocol.ID, err error) { + st, conn, err := u.negotiateSecurity(ctx, conn, suffix, isServer) if err != nil { return nil, "", err } @@ -306,38 +325,92 @@ func (u *upgrader) getSecurityByID(id protocol.ID) sec.SecureTransport { return nil } -func (u *upgrader) negotiateSecurity(ctx context.Context, insecure net.Conn, server bool) (sec.SecureTransport, error) { +var ErrNoMagiselectMatch = errors.New("no magiselect match") + +func (u *upgrader) negotiateSecurity(ctx context.Context, insecure net.Conn, suffix ma.Multiaddr, server bool) (sec.SecureTransport, net.Conn, error) { + if suffix != nil { + for _, straight := range u.straightSecurity { + if straight.SuffixMatcher().Matches(suffix) { + return straight, insecure, nil + } + } + return nil, nil, fmt.Errorf("suffix was provided but does not match anything %q %q", insecure, suffix) // buggy transport + } + type result struct { proto protocol.ID + st sec.SecureTransport err error } done := make(chan result, 1) go func() { - if server { - var r result - r.proto, _, r.err = u.securityMuxer.Negotiate(insecure) - done <- r - return - } var r result - r.proto, r.err = mss.SelectOneOf(u.securityIDs, insecure) + r.proto, r.st, r.err = func() (protocol.ID, sec.SecureTransport, error) { + if server { + var s magiselect.Sample + var err error + s, insecure, err = magiselect.ReadSampleFromConn(insecure) + if err != nil { + return "", nil, err + } + + if magiselect.IsMultistreamSelect(s) { + proto, _, err := u.securityMuxer.Negotiate(insecure) + return proto, nil, err + } + + for _, ss := range u.straightSecurity { + if ss.Match(s) { + return "", ss, nil + } + } + + return "", nil, ErrNoMagiselectMatch + } + + proto, err := mss.SelectOneOf(u.securityIDs, insecure) + return proto, nil, err + }() done <- r }() select { case r := <-done: if r.err != nil { - return nil, r.err + return nil, nil, r.err + } + if r.st != nil { + return r.st, insecure, nil } if s := u.getSecurityByID(r.proto); s != nil { - return s, nil + return s, insecure, nil } - return nil, fmt.Errorf("selected unknown security transport: %s", r.proto) + return nil, nil, fmt.Errorf("selected unknown security transport: %s", r.proto) case <-ctx.Done(): // We *must* do this. We have outstanding work on the connection, and it's no longer safe to use. insecure.Close() <-done // wait to stop using the connection. - return nil, ctx.Err() + return nil, nil, ctx.Err() + } +} + +func (u *upgrader) Suffixes() []ma.Multiaddr { + r := make([]ma.Multiaddr, len(u.straightSecurity)+1) // +1 for nil indicating multistream-select + for i, s := range u.straightSecurity { + r[i] = s.Suffix() } + return r +} + +func (u *upgrader) SuffixesProtocols() []int { + r := make([]int, len(u.straightSecurity)) + for i, s := range u.straightSecurity { + r[i] = s.SuffixProtocol() + } + return r +} + +func (u *upgrader) SuffixMatcher() mafmt.Pattern { + return u.straightMatcher } diff --git a/p2p/net/upgrader/upgrader_test.go b/p2p/net/upgrader/upgrader_test.go index 920ccaab9b..a51fd6830e 100644 --- a/p2p/net/upgrader/upgrader_test.go +++ b/p2p/net/upgrader/upgrader_test.go @@ -129,7 +129,7 @@ func dial(t *testing.T, upgrader transport.Upgrader, raddr ma.Multiaddr, p peer. if err != nil { return nil, err } - return upgrader.Upgrade(context.Background(), nil, macon, network.DirOutbound, p, scope) + return upgrader.UpgradeOutbound(context.Background(), nil, macon, nil, p, scope) } func TestOutboundConnectionGating(t *testing.T) { diff --git a/p2p/protocol/circuitv2/client/transport.go b/p2p/protocol/circuitv2/client/transport.go index 2c9e49f509..0ac75f23fb 100644 --- a/p2p/protocol/circuitv2/client/transport.go +++ b/p2p/protocol/circuitv2/client/transport.go @@ -71,7 +71,8 @@ func (c *Client) dialAndUpgrade(ctx context.Context, a ma.Multiaddr, p peer.ID, return nil, err } conn.tagHop() - cc, err := c.upgrader.Upgrade(ctx, c, conn, network.DirOutbound, p, connScope) + // FIXME: remove our leading maddr and compute suffix + cc, err := c.upgrader.UpgradeOutbound(ctx, c, conn, nil, p, connScope) if err != nil { return nil, err } diff --git a/p2p/security/noise/transport.go b/p2p/security/noise/transport.go index e42cea1bf7..483e6d77fd 100644 --- a/p2p/security/noise/transport.go +++ b/p2p/security/noise/transport.go @@ -11,7 +11,10 @@ import ( "github.com/libp2p/go-libp2p/core/sec" tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader" "github.com/libp2p/go-libp2p/p2p/security/noise/pb" + "github.com/libp2p/go-libp2p/p2p/transport/magiselect" + ma "github.com/multiformats/go-multiaddr" + mafmt "github.com/multiformats/go-multiaddr-fmt" manet "github.com/multiformats/go-multiaddr/net" ) @@ -27,6 +30,7 @@ type Transport struct { } var _ sec.SecureTransport = &Transport{} +var _ sec.StraightableSecureTransport = &Transport{} // New creates a new Noise transport using the given private key as its // libp2p identity key. @@ -129,3 +133,21 @@ func (i *transportEarlyDataHandler) MatchMuxers(isInitiator bool) protocol.ID { } return matchMuxers(i.receivedMuxers, i.transport.muxers) } + +func (*Transport) Match(s magiselect.Sample) bool { + return magiselect.IsNoise(s) +} + +var noiseMaddr = ma.StringCast("/noise") + +func (*Transport) Suffix() ma.Multiaddr { + return noiseMaddr +} + +func (*Transport) SuffixProtocol() int { + return ma.P_NOISE +} + +func (*Transport) SuffixMatcher() mafmt.Pattern { + return mafmt.Base(ma.P_NOISE) +} diff --git a/p2p/security/tls/transport.go b/p2p/security/tls/transport.go index 0c494a7fdc..35ff3b63dd 100644 --- a/p2p/security/tls/transport.go +++ b/p2p/security/tls/transport.go @@ -16,7 +16,10 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/sec" tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader" + "github.com/libp2p/go-libp2p/p2p/transport/magiselect" + ma "github.com/multiformats/go-multiaddr" + mafmt "github.com/multiformats/go-multiaddr-fmt" manet "github.com/multiformats/go-multiaddr/net" ) @@ -34,6 +37,7 @@ type Transport struct { } var _ sec.SecureTransport = &Transport{} +var _ sec.StraightableSecureTransport = &Transport{} // New creates a TLS encrypted transport func New(id protocol.ID, key ci.PrivKey, muxers []tptu.StreamMuxer) (*Transport, error) { @@ -180,3 +184,21 @@ func (t *Transport) setupConn(tlsConn *tls.Conn, remotePubKey ci.PubKey) (sec.Se func (t *Transport) ID() protocol.ID { return t.protocolID } + +func (*Transport) Match(s magiselect.Sample) bool { + return magiselect.IsTLS(s) +} + +var tlsMaddr = ma.StringCast("/tls") + +func (*Transport) Suffix() ma.Multiaddr { + return tlsMaddr +} + +func (*Transport) SuffixProtocol() int { + return ma.P_TLS +} + +func (*Transport) SuffixMatcher() mafmt.Pattern { + return mafmt.Base(ma.P_TLS) +} diff --git a/p2p/test/notifications/notification_test.go b/p2p/test/notifications/notification_test.go index eb26a3fe5e..8633f1685f 100644 --- a/p2p/test/notifications/notification_test.go +++ b/p2p/test/notifications/notification_test.go @@ -1,12 +1,15 @@ package notifications import ( + "bytes" + "slices" "strconv" "testing" "time" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/p2p/security/noise" libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" "github.com/libp2p/go-libp2p/p2p/transport/tcp" @@ -26,6 +29,7 @@ func TestListenAddressNotif(t *testing.T) { libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"), libp2p.Transport(tcp.NewTCPTransport), libp2p.Transport(libp2pquic.NewTransport), + libp2p.Security(noise.ID, noise.New), libp2p.DisableRelay(), ) require.NoError(t, err) @@ -34,24 +38,27 @@ func TestListenAddressNotif(t *testing.T) { require.NoError(t, err) defer sub.Close() - var initialAddr ma.Multiaddr + var initialAddrs []ma.Multiaddr // make sure the event is emitted for the initial listen address select { case e := <-sub.Out(): ev := e.(event.EvtLocalAddressesUpdated) require.Empty(t, ev.Removed) - require.Len(t, ev.Current, 1) + require.Len(t, ev.Current, 2) require.Equal(t, event.Added, ev.Current[0].Action) - initialAddr = ev.Current[0].Address + initialAddr := ev.Current[0].Address + initialAddrs = []ma.Multiaddr{initialAddr, initialAddr.Encapsulate(ma.StringCast("/noise"))} portStr, err := initialAddr.ValueForProtocol(ma.P_TCP) require.NoError(t, err) require.NotZero(t, portFromString(t, portStr)) case <-time.After(500 * time.Millisecond): t.Fatal("timeout") } + slices.SortFunc(initialAddrs, func(a, b ma.Multiaddr) int { return bytes.Compare(a.Bytes(), b.Bytes()) }) listenAddrs, err := h.Network().InterfaceListenAddresses() require.NoError(t, err) - require.Equal(t, []ma.Multiaddr{initialAddr}, listenAddrs) + slices.SortFunc(listenAddrs, func(a, b ma.Multiaddr) int { return bytes.Compare(a.Bytes(), b.Bytes()) }) + require.Equal(t, initialAddrs, listenAddrs) // now start listening on another address require.NoError(t, h.Network().Listen(ma.StringCast("/ip4/127.0.0.1/udp/0/quic-v1"))) @@ -60,19 +67,20 @@ func TestListenAddressNotif(t *testing.T) { case e := <-sub.Out(): ev := e.(event.EvtLocalAddressesUpdated) require.Empty(t, ev.Removed) - require.Len(t, ev.Current, 2) - var maintainedAddr ma.Multiaddr + require.Len(t, ev.Current, 3) + var maintainedAddrs []ma.Multiaddr for _, e := range ev.Current { switch e.Action { case event.Added: addedAddr = e.Address case event.Maintained: - maintainedAddr = e.Address + maintainedAddrs = append(maintainedAddrs, e.Address) default: t.Fatal("unexpected action") } } - require.Equal(t, initialAddr, maintainedAddr) + slices.SortFunc(maintainedAddrs, func(a, b ma.Multiaddr) int { return bytes.Compare(a.Bytes(), b.Bytes()) }) + require.Equal(t, initialAddrs, maintainedAddrs) _, err = addedAddr.ValueForProtocol(ma.P_QUIC_V1) require.NoError(t, err) portStr, err := addedAddr.ValueForProtocol(ma.P_UDP) @@ -84,7 +92,9 @@ func TestListenAddressNotif(t *testing.T) { listenAddrs, err = h.Network().InterfaceListenAddresses() require.NoError(t, err) - require.Len(t, listenAddrs, 2) - require.Contains(t, listenAddrs, initialAddr) + require.Len(t, listenAddrs, 3) + for _, addr := range initialAddrs { + require.Contains(t, listenAddrs, addr) + } require.Contains(t, listenAddrs, addedAddr) } diff --git a/p2p/transport/magiselect/magiselect.go b/p2p/transport/magiselect/magiselect.go new file mode 100644 index 0000000000..ad6e562e87 --- /dev/null +++ b/p2p/transport/magiselect/magiselect.go @@ -0,0 +1,142 @@ +package magiselect + +import ( + "bufio" + "encoding/binary" + "errors" + "io" + "math" + "net" +) + +type peekAble interface { + // Peek returns the next n bytes without advancing the reader. The bytes stop + // being valid at the next read call. If Peek returns fewer than n bytes, it + // also returns an error explaining why the read is short. The error is + // [ErrBufferFull] if n is larger than b's buffer size. + Peek(n int) ([]byte, error) +} + +var _ peekAble = (*bufio.Reader)(nil) + +// ReadSample read the sample and returns a reader which still include the sample, so it can be kept undamaged. +// If an error occurs it only return the error. +func ReadSampleFromConn(c net.Conn) (Sample, net.Conn, error) { + if peekAble, ok := c.(peekAble); ok { + b, err := peekAble.Peek(len(Sample{})) + switch { + case err == nil: + return Sample(b), c, nil + case errors.Is(err, bufio.ErrBufferFull): + // fallback to sampledConn + default: + return Sample{}, nil, err + } + } + + sc := &sampledConn{Conn: c} + _, err := io.ReadFull(c, sc.s[:]) + if err != nil { + return Sample{}, nil, err + } + return sc.s, sc, nil +} + +type sampledConn struct { + net.Conn + + s Sample + redFromSample uint8 +} + +var _ = [math.MaxUint8]struct{}{}[len(Sample{})] // compiletime assert sampledConn.redFromSample wont overflow +var _ io.ReaderFrom = (*sampledConn)(nil) +var _ io.WriterTo = (*sampledConn)(nil) + +func (sc *sampledConn) Read(b []byte) (int, error) { + if int(sc.redFromSample) != len(sc.s) { + red := copy(b, sc.s[sc.redFromSample:]) + sc.redFromSample += uint8(red) + return red, nil + } + + return sc.Conn.Read(b) +} + +// forward optimizations +func (sc *sampledConn) ReadFrom(r io.Reader) (int64, error) { + return io.Copy(sc.Conn, r) +} + +// forward optimizations +func (sc *sampledConn) WriteTo(w io.Writer) (total int64, err error) { + if int(sc.redFromSample) != len(sc.s) { + b := sc.s[sc.redFromSample:] + written, err := w.Write(b) + if written < 0 || len(b) < written { + // buggy writter, harden against this + sc.redFromSample = uint8(len(sc.s)) + total = int64(len(sc.s)) + } else { + sc.redFromSample += uint8(written) + total += int64(written) + } + if err != nil { + return total, err + } + } + + written, err := io.Copy(w, sc.Conn) + total += written + return total, err +} + +type Matcher interface { + Match(s Sample) bool +} + +// Sample might evolve over time. +type Sample [3]byte + +// Matchers are implemented here instead of in the transports so we can easily fuzz them together. + +func IsMultistreamSelect(s Sample) bool { + return string(s[:]) == "\x13/m" +} + +func IsHTTP(s Sample) bool { + switch string(s[:]) { + case "GET", "HEA", "POS", "PUT", "DEL", "CON", "OPT", "TRA", "PAT": + return true + default: + return false + } +} + +func IsTLS(s Sample) bool { + switch string(s[:]) { + case "\x16\x03\x01", "\x16\x03\x02", "\x16\x03\x03", "\x16\x03\x04": + return true + default: + return false + } +} + +func IsNoise(s Sample) bool { + length := binary.BigEndian.Uint16(s[:]) + if length < 2 { + return false + } + + b := s[2] + typ := b & 0b111 + field := (b & 0b11111000) >> 3 + switch field { + case 1, 2, 3: + return typ == 2 + case 4: + return typ == 2 || typ == 3 + default: + return false + } +} diff --git a/p2p/transport/magiselect/magiselect_test.go b/p2p/transport/magiselect/magiselect_test.go new file mode 100644 index 0000000000..5e68cc74a9 --- /dev/null +++ b/p2p/transport/magiselect/magiselect_test.go @@ -0,0 +1,57 @@ +package magiselect + +import "testing" + +func FuzzClash(f *testing.F) { + // make untyped literals type correctly + add := func(a, b, c byte) { f.Add(a, b, c) } + + // multistream-select + add('\x13', '/', 'm') + // http + add('G', 'E', 'T') + add('H', 'E', 'A') + add('P', 'O', 'S') + add('P', 'U', 'T') + add('D', 'E', 'L') + add('C', 'O', 'N') + add('O', 'P', 'T') + add('T', 'R', 'A') + add('P', 'A', 'T') + // tls + add('\x16', '\x03', '\x01') + add('\x16', '\x03', '\x02') + add('\x16', '\x03', '\x03') + add('\x16', '\x03', '\x04') + // noise + add(0, 85, 1<<3|2) + + f.Fuzz(func(t *testing.T, a, b, c byte) { + s := Sample{a, b, c} + var total uint + + ms := IsMultistreamSelect(s) + if ms { + total++ + } + + http := IsHTTP(s) + if http { + total++ + } + + tls := IsTLS(s) + if tls { + total++ + } + + noise := IsNoise(s) + if noise { + total++ + } + + if total > 1 { + t.Errorf("clash on: %q; ms: %v; http: %v; tls: %v; noise: %v", s, ms, http, tls, noise) + } + }) +} diff --git a/p2p/transport/tcp/tcp.go b/p2p/transport/tcp/tcp.go index d52bb96019..5e8377a360 100644 --- a/p2p/transport/tcp/tcp.go +++ b/p2p/transport/tcp/tcp.go @@ -117,7 +117,9 @@ func WithMetrics() Option { type TcpTransport struct { // Connection upgrader for upgrading insecure stream connections to // secure multiplex connections. - upgrader transport.Upgrader + upgrader transport.Upgrader + matcher mafmt.Pattern + protocols []int disableReuseport bool // Explicitly disable reuseport. enableMetrics bool @@ -141,9 +143,15 @@ func NewTCPTransport(upgrader transport.Upgrader, rcmgr network.ResourceManager, } tr := &TcpTransport{ upgrader: upgrader, + matcher: dialMatcher, + protocols: []int{ma.P_TCP}, connectTimeout: defaultConnectTimeout, // can be set by using the WithConnectionTimeout option rcmgr: rcmgr, } + if upgrader != nil { + tr.matcher = mafmt.And(tr.matcher, upgrader.SuffixMatcher()) + tr.protocols = append(tr.protocols, upgrader.SuffixesProtocols()...) + } for _, o := range opts { if err := o(tr); err != nil { return nil, err @@ -157,7 +165,7 @@ var dialMatcher = mafmt.And(mafmt.IP, mafmt.Base(ma.P_TCP)) // CanDial returns true if this transport believes it can dial the given // multiaddr. func (t *TcpTransport) CanDial(addr ma.Multiaddr) bool { - return dialMatcher.Matches(addr) + return t.matcher.Matches(addr) } func (t *TcpTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (manet.Conn, error) { @@ -224,11 +232,14 @@ func (t *TcpTransport) dialWithScope(ctx context.Context, raddr ma.Multiaddr, p // It is better to skip the update than to delay upgrading the connection } } - direction := network.DirOutbound if ok, isClient, _ := network.GetSimultaneousConnect(ctx); ok && !isClient { - direction = network.DirInbound + return t.upgrader.UpgradeInbound(ctx, t, c, p, connScope) } - return t.upgrader.Upgrade(ctx, t, c, direction, p, connScope) + + _, suffix := ma.SplitFirst(raddr) // ip + _, suffix = ma.SplitFirst(suffix) // tcp + + return t.upgrader.UpgradeOutbound(ctx, t, c, suffix, p, connScope) } // UseReuseport returns true if reuseport is enabled and available. @@ -257,7 +268,7 @@ func (t *TcpTransport) Listen(laddr ma.Multiaddr) (transport.Listener, error) { // Protocols returns the list of terminal protocols this transport can dial. func (t *TcpTransport) Protocols() []int { - return []int{ma.P_TCP} + return t.protocols } // Proxy always returns false for the TCP transport. diff --git a/p2p/transport/websocket/websocket.go b/p2p/transport/websocket/websocket.go index 5142ca97a1..832f581d4c 100644 --- a/p2p/transport/websocket/websocket.go +++ b/p2p/transport/websocket/websocket.go @@ -174,7 +174,9 @@ func (t *WebsocketTransport) dialWithScope(ctx context.Context, raddr ma.Multiad if err != nil { return nil, err } - conn, err := t.upgrader.Upgrade(ctx, t, macon, network.DirOutbound, p, connScope) + + // FIXME: remove our leading maddr and compute suffix + conn, err := t.upgrader.UpgradeOutbound(ctx, t, macon, nil, p, connScope) if err != nil { return nil, err } diff --git a/test-plans/go.mod b/test-plans/go.mod index e19fd63899..e09648ff7e 100644 --- a/test-plans/go.mod +++ b/test-plans/go.mod @@ -55,7 +55,7 @@ require ( github.com/multiformats/go-base32 v0.1.0 // indirect github.com/multiformats/go-base36 v0.2.0 // indirect github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect - github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect + github.com/multiformats/go-multiaddr-fmt v0.1.1-0.20240317044316-238036a861f0 // indirect github.com/multiformats/go-multibase v0.2.0 // indirect github.com/multiformats/go-multicodec v0.9.0 // indirect github.com/multiformats/go-multihash v0.2.3 // indirect diff --git a/test-plans/go.sum b/test-plans/go.sum index 5e9755f1f0..12a9e14c40 100644 --- a/test-plans/go.sum +++ b/test-plans/go.sum @@ -185,14 +185,13 @@ github.com/multiformats/go-base32 v0.1.0 h1:pVx9xoSPqEIQG8o+UbAe7DNi51oej1NtK+aG github.com/multiformats/go-base32 v0.1.0/go.mod h1:Kj3tFY6zNr+ABYMqeUNeGvkIC/UYgtWibDcT0rExnbI= github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9rQyccr0= github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4= -github.com/multiformats/go-multiaddr v0.1.1/go.mod h1:aMKBKNEYmzmDmxfX88/vz+J5IU55txyt0p4aiWVohjo= github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4= github.com/multiformats/go-multiaddr v0.12.4 h1:rrKqpY9h+n80EwhhC/kkcunCZZ7URIF8yN1WEUt2Hvc= github.com/multiformats/go-multiaddr v0.12.4/go.mod h1:sBXrNzucqkFJhvKOiwwLyqamGa/P5EIXNPLovyhQCII= github.com/multiformats/go-multiaddr-dns v0.3.1 h1:QgQgR+LQVt3NPTjbrLLpsaT2ufAA2y0Mkk+QRVJbW3A= github.com/multiformats/go-multiaddr-dns v0.3.1/go.mod h1:G/245BRQ6FJGmryJCrOuTdB37AMA5AMOVuO6NY3JwTk= -github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/eQsuaL3/CWe167E= -github.com/multiformats/go-multiaddr-fmt v0.1.0/go.mod h1:hGtDIW4PU4BqJ50gW2quDuPVjyWNZxToGUh/HwTZYJo= +github.com/multiformats/go-multiaddr-fmt v0.1.1-0.20240317044316-238036a861f0 h1:KCd9ZVLosRwAYaZfRk/BeT9dBkTBIJpG/GwMI0sE3hk= +github.com/multiformats/go-multiaddr-fmt v0.1.1-0.20240317044316-238036a861f0/go.mod h1:fU4KrUT9EeccvWYMheDXrJlEgjr38O1V7sEyshNz3BI= github.com/multiformats/go-multibase v0.2.0 h1:isdYCVLvksgWlMW9OZRYJEa9pZETFivncJHmHnnd87g= github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6oPyjtnzK/XTFDPkNuk= github.com/multiformats/go-multicodec v0.9.0 h1:pb/dlPnzee/Sxv/j4PmkDRxCOi3hXTz3IbPKOXWJkmg=