diff --git a/core/event/migration.go b/core/event/migration.go new file mode 100644 index 0000000000..733d7ebe16 --- /dev/null +++ b/core/event/migration.go @@ -0,0 +1,96 @@ +package event + +import ( + "time" + + "github.com/libp2p/go-libp2p/core/peer" + + ma "github.com/multiformats/go-multiaddr" +) + +// EvtConnectionMigrationStarted is emitted when a connection migration begins. +// This event would be emitted when AddPath() completes successfully, before +// the user calls Probe() on the returned path. +// +// Note: Event emission is not yet implemented. These types are defined for +// future use and API stability. +// +// This is an EXPERIMENTAL event and may change in future versions. +type EvtConnectionMigrationStarted struct { + // Peer is the remote peer ID of the connection being migrated. + Peer peer.ID + // ConnID is the unique identifier of the connection. + ConnID string + // FromLocalAddr is the current local address before migration. + FromLocalAddr ma.Multiaddr + // ToLocalAddr is the target local address to migrate to. + ToLocalAddr ma.Multiaddr +} + +// EvtConnectionMigrationCompleted is emitted when a connection migration succeeds. +// At this point, the connection is using the new path for all communication. +// +// Note: In client-side QUIC migration, only the local address changes. The remote +// address remains the same since we're changing which local interface we use to +// reach the same peer. +// +// This is an EXPERIMENTAL event and may change in future versions. +type EvtConnectionMigrationCompleted struct { + // Peer is the remote peer ID of the migrated connection. + Peer peer.ID + // ConnID is the unique identifier of the connection. + ConnID string + // FromLocalAddr is the previous local address. + FromLocalAddr ma.Multiaddr + // ToLocalAddr is the new local address after migration. + ToLocalAddr ma.Multiaddr + // ProbeRTT is the round-trip time measured during path probing. + ProbeRTT time.Duration +} + +// EvtConnectionMigrationFailed is emitted when a connection migration fails. +// The connection may still be usable on the original path, or it may be closed +// if both the new path and rollback failed. +// +// This is an EXPERIMENTAL event and may change in future versions. +type EvtConnectionMigrationFailed struct { + // Peer is the remote peer ID of the connection. + Peer peer.ID + // ConnID is the unique identifier of the connection. + ConnID string + // FromLocalAddr is the original local address. + FromLocalAddr ma.Multiaddr + // ToLocalAddr is the target local address that failed. + ToLocalAddr ma.Multiaddr + // Error describes why the migration failed. + Error error + // ConnectionClosed indicates whether the connection was closed due to + // the failure (e.g., if rollback to the original path also failed). + ConnectionClosed bool +} + +// EvtPathAdded is emitted when a new path is added to a connection. +// This happens when MigratableConn.AddPath() is called successfully. +// +// This is an EXPERIMENTAL event and may change in future versions. +type EvtPathAdded struct { + // Peer is the remote peer ID of the connection. + Peer peer.ID + // ConnID is the unique identifier of the connection. + ConnID string + // LocalAddr is the local address of the new path. + LocalAddr ma.Multiaddr +} + +// EvtPathRemoved is emitted when a path is removed from a connection. +// This happens when Path.Close() is called. +// +// This is an EXPERIMENTAL event and may change in future versions. +type EvtPathRemoved struct { + // Peer is the remote peer ID of the connection. + Peer peer.ID + // ConnID is the unique identifier of the connection. + ConnID string + // LocalAddr is the local address of the removed path. + LocalAddr ma.Multiaddr +} diff --git a/core/network/migration.go b/core/network/migration.go new file mode 100644 index 0000000000..e0bd7263dd --- /dev/null +++ b/core/network/migration.go @@ -0,0 +1,76 @@ +package network + +import ( + "context" + "time" + + ma "github.com/multiformats/go-multiaddr" +) + +// PathInfo contains information about a network path. +// +// This is an EXPERIMENTAL type and may change in future versions. +type PathInfo struct { + // LocalAddr is the local address of this path. + LocalAddr ma.Multiaddr + // RemoteAddr is the remote address of this path. + RemoteAddr ma.Multiaddr + // Active indicates whether this is the currently active path. + Active bool + // RTT is the round-trip time measured for this path. + RTT time.Duration +} + +// Path represents a network path that can be used for connection migration. +// A path is created via MigratableConn.AddPath() and can be probed and +// switched to. +// +// This is an EXPERIMENTAL interface and may change in future versions. +type Path interface { + // Probe tests the path connectivity by sending a probe packet and + // waiting for acknowledgment. This validates that the path is usable + // before switching to it. + // + // The context can be used to set a timeout for the probe operation. + Probe(ctx context.Context) error + + // Switch makes this path the active path for the connection. + // This should only be called after a successful Probe(). + // After switching, all subsequent packets will use this path. + Switch() error + + // Info returns information about this path. + Info() PathInfo + + // Close removes this path from the connection. + // If this is the active path, Close will fail; switch to a different + // path first. + Close() error +} + +// MigratableConn is implemented by connections that support path migration. +// This allows switching the underlying network path (e.g., from a primary +// interface to a failover interface) without disrupting active streams. +// +// Only client-initiated (outbound) QUIC connections support migration per +// the QUIC specification. +// +// This is an EXPERIMENTAL interface and may change in future versions. +type MigratableConn interface { + // SupportsMigration returns true if this connection supports path migration. + // Returns false for server-side connections or when migration is disabled. + SupportsMigration() bool + + // AddPath adds a new potential path using the given local address. + // The returned Path can be probed and then switched to. + // + // The context can be used to set a timeout for path creation. + AddPath(ctx context.Context, localAddr ma.Multiaddr) (Path, error) + + // ActivePath returns information about the currently active path. + ActivePath() PathInfo + + // AvailablePaths returns information about all available paths, + // including the active path and any paths added via AddPath(). + AvailablePaths() []PathInfo +} diff --git a/docs/connection-migration.md b/docs/connection-migration.md new file mode 100644 index 0000000000..95c818a126 --- /dev/null +++ b/docs/connection-migration.md @@ -0,0 +1,261 @@ +# QUIC Connection Migration + +**Status: EXPERIMENTAL** + +This document describes the client-side QUIC connection migration feature in go-libp2p, which allows seamlessly switching network paths without disrupting active streams. + +## Overview + +Connection migration is a QUIC protocol feature that enables a connection to continue operating when the client's network path changes. This is useful for scenarios such as: +- Switching from a primary network interface to a failover interface +- Migrating connections when network conditions change +- Maintaining connectivity during network transitions + +**Key Points:** +- Only client-initiated (outbound) connections support migration (per QUIC specification) +- Server-side connections cannot initiate migration +- Requires explicit opt-in via the `EnableExperimentalConnectionMigration()` option +- Each alternate path requires a separate network interface + +## Enabling Connection Migration + +To enable connection migration, use the `EnableExperimentalConnectionMigration()` option when creating a QUIC transport: + +```go +import ( + "github.com/libp2p/go-libp2p" + quic "github.com/libp2p/go-libp2p/p2p/transport/quic" +) + +// When using the transport directly +transport, err := quic.NewTransport( + privateKey, + connManager, + nil, // psk + nil, // gater + nil, // rcmgr + quic.EnableExperimentalConnectionMigration(), +) +``` + +## Using the Migration API + +### Checking Migration Support + +```go +import "github.com/libp2p/go-libp2p/core/network" + +// Check if a connection supports migration +var migratable network.MigratableConn +if conn.As(&migratable) && migratable.SupportsMigration() { + // Connection supports migration +} +``` + +### Adding and Switching Paths + +The typical migration flow is: +1. **AddPath** - Add a new path using a different local address +2. **Probe** - Test connectivity on the new path +3. **Switch** - Migrate the connection to the new path + +```go +import ( + "context" + "time" + + "github.com/libp2p/go-libp2p/core/network" + ma "github.com/multiformats/go-multiaddr" +) + +// Get the MigratableConn interface +var migratable network.MigratableConn +if !conn.As(&migratable) || !migratable.SupportsMigration() { + return errors.New("connection does not support migration") +} + +// Define the new local address (e.g., secondary/failover interface) +newLocalAddr, _ := ma.NewMultiaddr("/ip4/10.0.0.1/udp/0/quic-v1") + +// Add a new path +path, err := migratable.AddPath(context.Background(), newLocalAddr) +if err != nil { + return fmt.Errorf("failed to add path: %w", err) +} + +// Probe the new path (with timeout) +ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) +defer cancel() + +if err := path.Probe(ctx); err != nil { + // Probe failed, path is not viable + path.Close() + return fmt.Errorf("path probe failed: %w", err) +} + +// Get path info (includes RTT from probe) +info := path.Info() +fmt.Printf("Path RTT: %v\n", info.RTT) + +// Switch to the new path +if err := path.Switch(); err != nil { + return fmt.Errorf("failed to switch path: %w", err) +} + +// Connection is now using the new path +fmt.Printf("Migrated to: %s\n", path.Info().LocalAddr) +``` + +### Querying Available Paths + +```go +// Get the currently active path +activePath := migratable.ActivePath() +fmt.Printf("Active path: %s -> %s\n", activePath.LocalAddr, activePath.RemoteAddr) + +// Get all available paths +paths := migratable.AvailablePaths() +for _, p := range paths { + status := "inactive" + if p.Active { + status = "active" + } + fmt.Printf("Path [%s]: %s -> %s (RTT: %v)\n", status, p.LocalAddr, p.RemoteAddr, p.RTT) +} +``` + +### Closing a Path + +```go +import ( + "errors" + "fmt" + + libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" +) + +// Close a non-active path +if err := path.Close(); err != nil { + // Note: Cannot close the active path + if errors.Is(err, libp2pquic.ErrActivePathClose) { + fmt.Println("Switch to another path before closing this one") + } +} +``` + +## Event Types + +The `core/event` package defines events for monitoring migration: + +```go +import "github.com/libp2p/go-libp2p/core/event" + +// Subscribe to migration events +sub, _ := eventBus.Subscribe([]interface{}{ + new(event.EvtConnectionMigrationStarted), + new(event.EvtConnectionMigrationCompleted), + new(event.EvtConnectionMigrationFailed), + new(event.EvtPathAdded), + new(event.EvtPathRemoved), +}) + +for evt := range sub.Out() { + switch e := evt.(type) { + case event.EvtConnectionMigrationCompleted: + fmt.Printf("Migration completed: %s -> %s (RTT: %v)\n", + e.FromLocalAddr, e.ToLocalAddr, e.ProbeRTT) + case event.EvtConnectionMigrationFailed: + fmt.Printf("Migration failed: %v (closed: %v)\n", e.Error, e.ConnectionClosed) + } +} +``` + +## Constraints and Limitations + +1. **Client-only migration**: Only connections initiated by your node (outbound/dial) support migration. Incoming connections cannot be migrated. + +2. **Single active path**: Only one path can be active at a time. The active path is used for all communication. + +3. **Separate network interfaces**: Each path requires a different local address, typically from a different network interface. + +4. **Experimental API**: This API is experimental and may change in future versions. + +## Resource Management + +Paths hold references to network transports that must be properly cleaned up: + +1. **After switching**: The old path remains available but holds transport resources. Close it if you don't need rollback capability: + ```go + oldPath := currentPath // save reference before switching + if err := newPath.Switch(); err == nil { + oldPath.Close() // release old path resources + } + ``` + +2. **On failure**: If `AddPath` fails, resources are automatically cleaned up. If `Probe` or `Switch` fails, close the path: + ```go + path, err := migratable.AddPath(ctx, newAddr) + if err != nil { + return err // no cleanup needed + } + if err := path.Probe(ctx); err != nil { + path.Close() // clean up failed path + return err + } + ``` + +3. **On connection close**: All path resources are automatically cleaned up when the connection is closed. + +4. **Getting current local address**: After switching paths, use `ActivePath().LocalAddr` to get the current local address. The `LocalMultiaddr()` method returns the original address. + +## Error Handling + +Common errors you may encounter: + +| Error | Cause | Resolution | +|-------|-------|------------| +| `ErrMigrationNotEnabled` | Migration option not set | Use `EnableExperimentalConnectionMigration()` | +| `ErrMigrationNotSupported` | Server-side connection | Only client connections support migration | +| `ErrPathAlreadyExists` | Path with same local address exists | Use a different local address or close the existing path | +| `ErrActivePathClose` | Attempted to close active path | Switch to another path first | +| Context deadline exceeded | Probe timed out | Network path may be unavailable | + +## Example: Failover to Secondary Network + +```go +func migrateToFailoverNetwork(conn network.Conn, failoverAddr ma.Multiaddr) error { + var migratable network.MigratableConn + if !conn.As(&migratable) || !migratable.SupportsMigration() { + return errors.New("migration not supported") + } + + // Add the failover path + path, err := migratable.AddPath(context.Background(), failoverAddr) + if err != nil { + return err + } + + // Probe with timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := path.Probe(ctx); err != nil { + path.Close() + return fmt.Errorf("failover network unreachable: %w", err) + } + + // Switch to failover + if err := path.Switch(); err != nil { + path.Close() + return err + } + + log.Printf("Successfully migrated to failover network") + return nil +} +``` + +## See Also + +- [quic-go Connection Migration](https://quic-go.net/docs/quic/connection-migration/) - Upstream documentation +- [RFC 9000 Section 9](https://datatracker.ietf.org/doc/html/rfc9000#section-9) - QUIC Connection Migration specification diff --git a/p2p/transport/quic/conn.go b/p2p/transport/quic/conn.go index 0bd5e5ab31..c1a69df61a 100644 --- a/p2p/transport/quic/conn.go +++ b/p2p/transport/quic/conn.go @@ -2,6 +2,9 @@ package libp2pquic import ( "context" + "errors" + "sync" + "time" ic "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/network" @@ -12,6 +15,14 @@ import ( "github.com/quic-go/quic-go" ) +// ErrMigrationNotSupported is returned when migration is attempted on a connection +// that does not support it (e.g., server-side connections). +var ErrMigrationNotSupported = errors.New("connection does not support migration") + +// ErrMigrationNotEnabled is returned when migration is attempted but the +// experimental migration feature is not enabled. +var ErrMigrationNotEnabled = errors.New("connection migration is not enabled; use EnableExperimentalConnectionMigration option") + type conn struct { quicConn *quic.Conn transport *transport @@ -23,6 +34,32 @@ type conn struct { remotePeerID peer.ID remotePubKey ic.PubKey remoteMultiaddr ma.Multiaddr + + // isOutbound indicates if this connection was initiated by us (client-side). + // Only outbound connections support migration per QUIC spec. + isOutbound bool + + // migrationEnabled indicates if the experimental migration feature is enabled. + migrationEnabled bool + + // Migration-related fields (protected by migrationMu) + migrationMu sync.RWMutex + paths map[string]*connPath // keyed by local addr string + activePath *connPath +} + +// connPath represents a potential path for connection migration. +type connPath struct { + localAddr ma.Multiaddr + remoteAddr ma.Multiaddr + rtt time.Duration + active bool + transport refCountedTransport // for cleanup on close +} + +// refCountedTransport is an interface for transports that support reference counting. +type refCountedTransport interface { + DecreaseCount() } func (c *conn) As(target any) bool { @@ -30,7 +67,10 @@ func (c *conn) As(target any) bool { *t = c.quicConn return true } - + if t, ok := target.(*network.MigratableConn); ok { + *t = c + return true + } return false } @@ -51,12 +91,30 @@ func (c *conn) CloseWithError(errCode network.ConnErrorCode) error { } func (c *conn) closeWithError(errCode quic.ApplicationErrorCode, errString string) error { + // Clean up migration path resources before closing. + c.cleanupMigrationPaths() + c.transport.removeConn(c.quicConn) err := c.quicConn.CloseWithError(errCode, errString) c.scope.Done() return err } +// cleanupMigrationPaths releases all transport references held by migration paths. +func (c *conn) cleanupMigrationPaths() { + c.migrationMu.Lock() + defer c.migrationMu.Unlock() + + for _, p := range c.paths { + if p.transport != nil { + p.transport.DecreaseCount() + p.transport = nil + } + } + c.paths = nil + c.activePath = nil +} + // IsClosed returns whether a connection is fully closed. func (c *conn) IsClosed() bool { return c.quicConn.Context().Err() != nil @@ -111,3 +169,143 @@ func (c *conn) ConnState() network.ConnectionState { } return network.ConnectionState{Transport: t} } + +// Compile-time check that conn implements MigratableConn. +var _ network.MigratableConn = &conn{} + +// SupportsMigration returns true if this connection supports path migration. +// Only outbound (client-initiated) QUIC connections with the experimental +// migration feature enabled support migration. +func (c *conn) SupportsMigration() bool { + return c.isOutbound && c.migrationEnabled +} + +// ErrPathAlreadyExists is returned when attempting to add a path with a local +// address that already has a path associated with it. +var ErrPathAlreadyExists = errors.New("path with this local address already exists") + +// AddPath adds a new potential path using the given local address. +// This is an EXPERIMENTAL API. +func (c *conn) AddPath(ctx context.Context, localAddr ma.Multiaddr) (network.Path, error) { + if !c.migrationEnabled { + return nil, ErrMigrationNotEnabled + } + if !c.isOutbound { + return nil, ErrMigrationNotSupported + } + + // Get a QUIC transport for the local address from the transport's connection manager. + refCountedTr, quicTr, err := c.transport.getTransportForMigration(localAddr) + if err != nil { + return nil, err + } + + // Add the path using quic-go's API. + // Note: quic-go's AddPath returns a quic.Path interface. + quicPath, err := c.quicConn.AddPath(quicTr) + if err != nil { + // Release the transport reference on failure to prevent resource leak. + refCountedTr.DecreaseCount() + return nil, err + } + + // Create the connPath with transport reference for cleanup. + cp := &connPath{ + localAddr: localAddr, + remoteAddr: c.remoteMultiaddr, + transport: refCountedTr, + } + + // Create our path wrapper. + // The remote address doesn't change during client-side migration + path := &pathWrapper{ + conn: c, + quicPath: quicPath, + connPath: cp, + } + + // Track the path under lock. + // We check for duplicates here (not earlier) to avoid a TOCTOU race condition + // where two goroutines could both pass an early check and then both attempt + // to add paths with the same address. + c.migrationMu.Lock() + if c.paths == nil { + c.paths = make(map[string]*connPath) + } + addrKey := localAddr.String() + if _, exists := c.paths[addrKey]; exists { + c.migrationMu.Unlock() + // Clean up resources since we can't use this path. + if quicPath != nil { + _ = quicPath.Close() + } + refCountedTr.DecreaseCount() + return nil, ErrPathAlreadyExists + } + c.paths[addrKey] = cp + c.migrationMu.Unlock() + + return path, nil +} + +// ActivePath returns information about the currently active path. +func (c *conn) ActivePath() network.PathInfo { + c.migrationMu.RLock() + defer c.migrationMu.RUnlock() + + if c.activePath != nil { + return network.PathInfo{ + LocalAddr: c.activePath.localAddr, + RemoteAddr: c.activePath.remoteAddr, + Active: true, + RTT: c.activePath.rtt, + } + } + + // No explicit active path set, return current connection addresses. + return network.PathInfo{ + LocalAddr: c.localMultiaddr, + RemoteAddr: c.remoteMultiaddr, + Active: true, + } +} + +// AvailablePaths returns information about all available paths. +func (c *conn) AvailablePaths() []network.PathInfo { + c.migrationMu.RLock() + defer c.migrationMu.RUnlock() + + paths := make([]network.PathInfo, 0, len(c.paths)+1) + + var activePathInfo network.PathInfo + if c.activePath != nil { + activePathInfo = network.PathInfo{ + LocalAddr: c.activePath.localAddr, + RemoteAddr: c.activePath.remoteAddr, + Active: true, + RTT: c.activePath.rtt, + } + } else { + // No explicit active path set, use current connection addresses. + activePathInfo = network.PathInfo{ + LocalAddr: c.localMultiaddr, + RemoteAddr: c.remoteMultiaddr, + Active: true, + } + } + paths = append(paths, activePathInfo) + + // Add any additional paths. + for _, p := range c.paths { + if !p.active { + paths = append(paths, network.PathInfo{ + LocalAddr: p.localAddr, + RemoteAddr: p.remoteAddr, + Active: false, + RTT: p.rtt, + }) + } + } + + return paths +} diff --git a/p2p/transport/quic/migration_test.go b/p2p/transport/quic/migration_test.go new file mode 100644 index 0000000000..eef873c306 --- /dev/null +++ b/p2p/transport/quic/migration_test.go @@ -0,0 +1,204 @@ +package libp2pquic + +import ( + "context" + "io" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/p2p/transport/quicreuse" + + ma "github.com/multiformats/go-multiaddr" + "github.com/quic-go/quic-go" + "github.com/stretchr/testify/require" +) + +func TestMigrationDisabledByDefault(t *testing.T) { + serverID, serverKey := createPeer(t) + _, clientKey := createPeer(t) + + serverTransport, err := NewTransport(serverKey, newConnManager(t), nil, nil, nil) + require.NoError(t, err) + defer serverTransport.(io.Closer).Close() + + ln := runServer(t, serverTransport, "/ip4/127.0.0.1/udp/0/quic-v1") + defer ln.Close() + + // Client without migration enabled + clientTransport, err := NewTransport(clientKey, newConnManager(t), nil, nil, nil) + require.NoError(t, err) + defer clientTransport.(io.Closer).Close() + + conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID) + require.NoError(t, err) + defer conn.Close() + + // Try to get the MigratableConn interface + var migratable network.MigratableConn + if conn.As(&migratable) { + // Even if we can get the interface, migration should not be supported + require.False(t, migratable.SupportsMigration(), "migration should not be supported by default") + } +} + +func TestMigrationEnabledWithOption(t *testing.T) { + serverID, serverKey := createPeer(t) + _, clientKey := createPeer(t) + + serverTransport, err := NewTransport(serverKey, newConnManager(t), nil, nil, nil) + require.NoError(t, err) + defer serverTransport.(io.Closer).Close() + + ln := runServer(t, serverTransport, "/ip4/127.0.0.1/udp/0/quic-v1") + defer ln.Close() + + // Client with migration enabled + clientTransport, err := NewTransport(clientKey, newConnManager(t), nil, nil, nil, EnableExperimentalConnectionMigration()) + require.NoError(t, err) + defer clientTransport.(io.Closer).Close() + + conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID) + require.NoError(t, err) + defer conn.Close() + + // Try to get the MigratableConn interface + var migratable network.MigratableConn + require.True(t, conn.As(&migratable), "should be able to get MigratableConn interface") + require.True(t, migratable.SupportsMigration(), "migration should be supported when enabled via option") + + // Check ActivePath returns valid info + activePath := migratable.ActivePath() + require.NotNil(t, activePath.LocalAddr, "active path should have a local address") + require.NotNil(t, activePath.RemoteAddr, "active path should have a remote address") + require.True(t, activePath.Active, "active path should be marked as active") +} + +func TestServerConnectionDoesNotSupportMigration(t *testing.T) { + serverID, serverKey := createPeer(t) + clientID, clientKey := createPeer(t) + + // Server with migration enabled (but server connections still don't support migration) + serverTransport, err := NewTransport(serverKey, newConnManager(t), nil, nil, nil, EnableExperimentalConnectionMigration()) + require.NoError(t, err) + defer serverTransport.(io.Closer).Close() + + ln := runServer(t, serverTransport, "/ip4/127.0.0.1/udp/0/quic-v1") + defer ln.Close() + + // Client with migration enabled + clientTransport, err := NewTransport(clientKey, newConnManager(t), nil, nil, nil, EnableExperimentalConnectionMigration()) + require.NoError(t, err) + defer clientTransport.(io.Closer).Close() + + // Connect + clientDone := make(chan struct{}) + go func() { + defer close(clientDone) + conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID) + require.NoError(t, err) + defer conn.Close() + + // Client connection should support migration + var migratable network.MigratableConn + require.True(t, conn.As(&migratable), "client should be able to get MigratableConn interface") + require.True(t, migratable.SupportsMigration(), "client connection should support migration") + + time.Sleep(100 * time.Millisecond) // Give time for server to check + }() + + // Accept on server + serverConn, err := ln.Accept() + require.NoError(t, err) + defer serverConn.Close() + + require.Equal(t, clientID, serverConn.RemotePeer()) + + // Server connection should NOT support migration (per QUIC spec) + var migratable network.MigratableConn + if serverConn.As(&migratable) { + require.False(t, migratable.SupportsMigration(), "server connection should not support migration") + } + + <-clientDone +} + +func TestAddPathRequiresMigrationEnabled(t *testing.T) { + serverID, serverKey := createPeer(t) + _, clientKey := createPeer(t) + + serverTransport, err := NewTransport(serverKey, newConnManager(t), nil, nil, nil) + require.NoError(t, err) + defer serverTransport.(io.Closer).Close() + + ln := runServer(t, serverTransport, "/ip4/127.0.0.1/udp/0/quic-v1") + defer ln.Close() + + // Client without migration enabled + clientTransport, err := NewTransport(clientKey, newConnManager(t), nil, nil, nil) + require.NoError(t, err) + defer clientTransport.(io.Closer).Close() + + conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID) + require.NoError(t, err) + defer conn.Close() + + // Try to get the MigratableConn interface and call AddPath + var migratable network.MigratableConn + require.True(t, conn.As(&migratable), "should be able to get MigratableConn interface") + + // AddPath should fail because migration is not enabled + newLocalAddr := ma.StringCast("/ip4/127.0.0.1/udp/0/quic-v1") + _, err = migratable.AddPath(context.Background(), newLocalAddr) + require.ErrorIs(t, err, ErrMigrationNotEnabled, "AddPath should fail when migration is not enabled") +} + +func TestAvailablePaths(t *testing.T) { + serverID, serverKey := createPeer(t) + _, clientKey := createPeer(t) + + serverTransport, err := NewTransport(serverKey, newConnManager(t), nil, nil, nil) + require.NoError(t, err) + defer serverTransport.(io.Closer).Close() + + ln := runServer(t, serverTransport, "/ip4/127.0.0.1/udp/0/quic-v1") + defer ln.Close() + + // Client with migration enabled + clientTransport, err := NewTransport(clientKey, newConnManager(t), nil, nil, nil, EnableExperimentalConnectionMigration()) + require.NoError(t, err) + defer clientTransport.(io.Closer).Close() + + conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID) + require.NoError(t, err) + defer conn.Close() + + // Get the MigratableConn interface + var migratable network.MigratableConn + require.True(t, conn.As(&migratable), "should be able to get MigratableConn interface") + + // Initially should have one path (the active one) + paths := migratable.AvailablePaths() + require.Len(t, paths, 1, "should have exactly one path initially") + require.True(t, paths[0].Active, "the initial path should be active") +} + +func TestMigrationConnManagerTransportForMigration(t *testing.T) { + cm, err := quicreuse.NewConnManager(quic.StatelessResetKey{}, quic.TokenGeneratorKey{}) + require.NoError(t, err) + defer cm.Close() + + // Test that we can get a transport for migration + localAddr := ma.StringCast("/ip4/127.0.0.1/udp/0/quic-v1") + tr, err := cm.TransportForMigration(localAddr) + require.NoError(t, err) + require.NotNil(t, tr) + defer tr.DecreaseCount() + + // The transport should have a valid local address + require.NotNil(t, tr.LocalAddr()) + + // The underlying transport should be available + underlyingTr := tr.UnderlyingTransport() + require.NotNil(t, underlyingTr, "should be able to get underlying *quic.Transport") +} diff --git a/p2p/transport/quic/path.go b/p2p/transport/quic/path.go new file mode 100644 index 0000000000..4041f176d8 --- /dev/null +++ b/p2p/transport/quic/path.go @@ -0,0 +1,112 @@ +package libp2pquic + +import ( + "context" + "errors" + "time" + + "github.com/libp2p/go-libp2p/core/network" + + "github.com/quic-go/quic-go" +) + +// ErrActivePathClose is returned when attempting to close the active path. +var ErrActivePathClose = errors.New("cannot close active path; switch to a different path first") + +// pathWrapper wraps a quic-go Path to implement network.Path. +type pathWrapper struct { + conn *conn + quicPath *quic.Path + connPath *connPath +} + +// Compile-time check that pathWrapper implements network.Path. +var _ network.Path = &pathWrapper{} + +// Probe tests connectivity on this path by sending PATH_CHALLENGE frames. +// Returns nil if the path is valid and can be switched to. +func (p *pathWrapper) Probe(ctx context.Context) error { + start := time.Now() + if err := p.quicPath.Probe(ctx); err != nil { + return err + } + // Protect rtt write with lock to avoid data race with Info(). + p.conn.migrationMu.Lock() + p.connPath.rtt = time.Since(start) + p.conn.migrationMu.Unlock() + return nil +} + +// Switch migrates the connection to use this path for all future communication. +// Should only be called after a successful Probe(). +// +// Note: After switching, use ActivePath().LocalAddr to get the current local address. +// The LocalMultiaddr() method returns the original address to avoid data races. +func (p *pathWrapper) Switch() error { + if err := p.quicPath.Switch(); err != nil { + return err + } + + // Update path tracking after successful switch. + p.conn.migrationMu.Lock() + defer p.conn.migrationMu.Unlock() + + // Mark the old active path as inactive. + if p.conn.activePath != nil { + p.conn.activePath.active = false + } + + // Mark this path as active. + p.connPath.active = true + p.conn.activePath = p.connPath + + // Note: We intentionally do NOT modify p.conn.localMultiaddr here. + // That field may be read concurrently from other goroutines without + // holding migrationMu (e.g., LocalMultiaddr()), so writing to it would + // introduce a data race. Use ActivePath().LocalAddr to get the current + // local address after migration. + // + // The remote address doesn't change during client-side migration. + // We're only changing which local interface we use to reach the same peer. + + return nil +} + +// Info returns information about this path. +func (p *pathWrapper) Info() network.PathInfo { + p.conn.migrationMu.RLock() + defer p.conn.migrationMu.RUnlock() + + return network.PathInfo{ + LocalAddr: p.connPath.localAddr, + RemoteAddr: p.connPath.remoteAddr, + Active: p.connPath.active, + RTT: p.connPath.rtt, + } +} + +// Close removes this path from the connection and releases its resources. +// Returns an error if this is the active path. +func (p *pathWrapper) Close() error { + p.conn.migrationMu.Lock() + defer p.conn.migrationMu.Unlock() + + if p.connPath.active { + return ErrActivePathClose + } + + delete(p.conn.paths, p.connPath.localAddr.String()) + + // Close the underlying quic-go path. + if p.quicPath != nil { + _ = p.quicPath.Close() + } + + // Release the transport reference to prevent resource leak. + if p.connPath.transport != nil { + p.connPath.transport.DecreaseCount() + p.connPath.transport = nil + } + + return nil +} diff --git a/p2p/transport/quic/transport.go b/p2p/transport/quic/transport.go index 0176409e48..0355902c4c 100644 --- a/p2p/transport/quic/transport.go +++ b/p2p/transport/quic/transport.go @@ -34,6 +34,25 @@ var ErrHolePunching = errors.New("hole punching attempted; no active dial") var HolePunchTimeout = 5 * time.Second +// Option is a function that configures the QUIC transport. +type Option func(*transport) error + +// EnableExperimentalConnectionMigration enables the experimental connection +// migration feature. When enabled, outbound QUIC connections will support +// path migration via the MigratableConn interface. +// +// This is an EXPERIMENTAL feature and may change in future versions. +// Connection migration allows switching network paths (e.g., from a primary +// interface to a failover interface) without disrupting active streams. +// +// Only client-initiated (outbound) connections support migration per the QUIC spec. +func EnableExperimentalConnectionMigration() Option { + return func(t *transport) error { + t.migrationEnabled = true + return nil + } +} + // The Transport implements the tpt.Transport interface for QUIC connections. type transport struct { privKey ic.PrivKey @@ -55,6 +74,9 @@ type transport struct { listenersMu sync.Mutex // map of UDPAddr as string to a virtualListeners listeners map[string][]*virtualListener + + // migrationEnabled indicates if the experimental connection migration feature is enabled. + migrationEnabled bool } var _ tpt.Transport = &transport{} @@ -70,7 +92,7 @@ type activeHolePunch struct { } // NewTransport creates a new QUIC transport -func NewTransport(key ic.PrivKey, connManager *quicreuse.ConnManager, psk pnet.PSK, gater connmgr.ConnectionGater, rcmgr network.ResourceManager) (tpt.Transport, error) { +func NewTransport(key ic.PrivKey, connManager *quicreuse.ConnManager, psk pnet.PSK, gater connmgr.ConnectionGater, rcmgr network.ResourceManager, opts ...Option) (tpt.Transport, error) { if len(psk) > 0 { log.Error("QUIC doesn't support private networks yet.") return nil, errors.New("QUIC doesn't support private networks yet") @@ -88,7 +110,7 @@ func NewTransport(key ic.PrivKey, connManager *quicreuse.ConnManager, psk pnet.P rcmgr = &network.NullResourceManager{} } - return &transport{ + t := &transport{ privKey: key, localPeer: localPeer, identity: identity, @@ -100,7 +122,15 @@ func NewTransport(key ic.PrivKey, connManager *quicreuse.ConnManager, psk pnet.P rnd: *rand.New(rand.NewSource(time.Now().UnixNano())), listeners: make(map[string][]*virtualListener), - }, nil + } + + for _, o := range opts { + if err := o(t); err != nil { + return nil, err + } + } + + return t, nil } func (t *transport) ListenOrder() int { @@ -157,14 +187,16 @@ func (t *transport) dialWithScope(ctx context.Context, raddr ma.Multiaddr, p pee return nil, err } c := &conn{ - quicConn: pconn, - transport: t, - scope: scope, - localPeer: t.localPeer, - localMultiaddr: localMultiaddr, - remotePubKey: remotePubKey, - remotePeerID: p, - remoteMultiaddr: raddr, + quicConn: pconn, + transport: t, + scope: scope, + localPeer: t.localPeer, + localMultiaddr: localMultiaddr, + remotePubKey: remotePubKey, + remotePeerID: p, + remoteMultiaddr: raddr, + isOutbound: true, + migrationEnabled: t.migrationEnabled, } if t.gater != nil && !t.gater.InterceptSecured(network.DirOutbound, p, c) { pconn.CloseWithError(quic.ApplicationErrorCode(network.ConnGated), "connection gated") @@ -402,3 +434,25 @@ func (t *transport) CloseVirtualListener(l *virtualListener) error { return nil } + +// getTransportForMigration returns a RefCountedQUICTransport and its underlying *quic.Transport +// bound to the specified local address for use in connection migration. +// +// The caller is responsible for calling DecreaseCount() on the RefCountedQUICTransport +// when the path is no longer needed (either on AddPath failure or when the path is closed). +// +// This is an EXPERIMENTAL API for connection migration support. +func (t *transport) getTransportForMigration(localAddr ma.Multiaddr) (quicreuse.RefCountedQUICTransport, *quic.Transport, error) { + refCountedTr, err := t.connManager.TransportForMigration(localAddr) + if err != nil { + return nil, nil, err + } + + quicTr := refCountedTr.UnderlyingTransport() + if quicTr == nil { + refCountedTr.DecreaseCount() + return nil, nil, errors.New("failed to get underlying QUIC transport for migration") + } + + return refCountedTr, quicTr, nil +} diff --git a/p2p/transport/quicreuse/connmgr.go b/p2p/transport/quicreuse/connmgr.go index 023b82ee0e..b528dea6b1 100644 --- a/p2p/transport/quicreuse/connmgr.go +++ b/p2p/transport/quicreuse/connmgr.go @@ -392,6 +392,37 @@ func (c *ConnManager) Protocols() []int { return []int{ma.P_QUIC_V1} } +// TransportForMigration returns a RefCountedQUICTransport for the given local +// multiaddr, suitable for use in connection migration. This allows adding new +// paths to existing QUIC connections. +// +// This is an EXPERIMENTAL API for connection migration support. +func (c *ConnManager) TransportForMigration(localAddr ma.Multiaddr) (RefCountedQUICTransport, error) { + netw, host, err := manet.DialArgs(localAddr) + if err != nil { + return nil, err + } + laddr, err := net.ResolveUDPAddr(netw, host) + if err != nil { + return nil, err + } + + if c.enableReuseport { + reuse, err := c.getReuse(netw) + if err != nil { + return nil, err + } + return reuse.TransportForMigration(netw, laddr) + } + + // Without reuseport, create a new single-owner transport. + conn, err := c.listenUDP(netw, laddr) + if err != nil { + return nil, err + } + return c.newSingleOwnerTransport(conn), nil +} + func (c *ConnManager) Close() error { if !c.enableReuseport { return nil diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index 1ce05750e0..4771a4f432 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -27,6 +27,9 @@ type RefCountedQUICTransport interface { Dial(ctx context.Context, addr net.Addr, tlsConf *tls.Config, conf *quic.Config) (*quic.Conn, error) Listen(tlsConf *tls.Config, conf *quic.Config) (QUICListener, error) + + // UnderlyingTransport returns the underlying *quic.Transport. + UnderlyingTransport() *quic.Transport } type singleOwnerTransport struct { @@ -65,6 +68,13 @@ func (c *singleOwnerTransport) Listen(tlsConf *tls.Config, conf *quic.Config) (Q return c.Transport.Listen(tlsConf, conf) } +func (c *singleOwnerTransport) UnderlyingTransport() *quic.Transport { + if wt, ok := c.Transport.(*wrappedQUICTransport); ok { + return wt.Transport + } + return nil +} + // Constant. Defined as variables to simplify testing. var ( garbageCollectInterval = 30 * time.Second @@ -166,6 +176,13 @@ func (c *refcountedTransport) Listen(tlsConf *tls.Config, conf *quic.Config) (QU return c.QUICTransport.Listen(tlsConf, conf) } +func (c *refcountedTransport) UnderlyingTransport() *quic.Transport { + if wt, ok := c.QUICTransport.(*wrappedQUICTransport); ok { + return wt.Transport + } + return nil +} + func (c *refcountedTransport) DecreaseCount() { c.mutex.Lock() c.refCount-- @@ -463,6 +480,74 @@ func (r *reuse) newTransport(conn net.PacketConn) *refcountedTransport { } } +// TransportForMigration returns a transport for the given local address for use +// in connection migration. It tries to find an existing transport bound to the +// specified address. If no suitable transport exists, it creates a new one. +// +// This is an EXPERIMENTAL API for connection migration support. +func (r *reuse) TransportForMigration(network string, laddr *net.UDPAddr) (*refcountedTransport, error) { + r.mutex.Lock() + defer r.mutex.Unlock() + + // First check unicast transports for the specific IP. + if !laddr.IP.IsUnspecified() { + if trs, ok := r.unicast[laddr.IP.String()]; ok { + // If a specific port is requested, look for that exact port. + if laddr.Port != 0 { + if tr, ok := trs[laddr.Port]; ok { + tr.IncreaseCount() + return tr, nil + } + // Specific port requested but not found - don't return a different port. + } else { + // Port 0 means any port on this IP is acceptable. + for _, tr := range trs { + tr.IncreaseCount() + return tr, nil + } + } + } + } + + // Check global listeners for the specific port (only if IP is unspecified). + if laddr.IP.IsUnspecified() && laddr.Port != 0 { + if tr, ok := r.globalListeners[laddr.Port]; ok { + tr.IncreaseCount() + return tr, nil + } + } + + // Check global dialers (only if IP is unspecified). + // If a specific IP was requested, don't fall back to 0.0.0.0 transports. + if laddr.IP.IsUnspecified() { + for _, tr := range r.globalDialers { + tr.IncreaseCount() + return tr, nil + } + } + + // No existing transport found, create a new one. + conn, err := r.listenUDP(network, laddr) + if err != nil { + return nil, err + } + tr := r.newTransport(conn) + tr.IncreaseCount() + + localAddr := conn.LocalAddr().(*net.UDPAddr) + if localAddr.IP.IsUnspecified() { + r.globalDialers[localAddr.Port] = tr + } else { + if _, ok := r.unicast[localAddr.IP.String()]; !ok { + r.unicast[localAddr.IP.String()] = make(map[int]*refcountedTransport) + r.routes, _ = r.sourceIPSelectorFn() + } + r.unicast[localAddr.IP.String()][localAddr.Port] = tr + } + + return tr, nil +} + func (r *reuse) Close() error { close(r.closeChan) <-r.gcStopChan