diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 00000000..f81bbab9 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,26 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Launch Client", + "type": "go", + "request": "launch", + "mode": "auto", + "program": "${workspaceFolder}/_examples/helloworld_socket/helloworld_socket.go", + "args": ["-remote" ,"1-151,10.151.0.30:4444"] + + }, + { + "name": "Launch Server", + "type": "go", + "request": "launch", + "mode": "auto", + "program": "${workspaceFolder}/_examples/helloworld_socket/helloworld_socket.go", + "args": ["-listen" ,"10.150.0.30:4444"] + + } + ] +} \ No newline at end of file diff --git a/_examples/go.mod b/_examples/go.mod index 6728aacb..58b999b7 100644 --- a/_examples/go.mod +++ b/_examples/go.mod @@ -44,6 +44,7 @@ require ( github.com/quic-go/qtls-go1-20 v0.3.3 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/scionproto/scion v0.9.1 // indirect + github.com/spf13/pflag v1.0.5 // indirect github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect github.com/uber/jaeger-lib v2.0.0+incompatible // indirect go.uber.org/atomic v1.9.0 // indirect diff --git a/_examples/go.sum b/_examples/go.sum index bcc90eee..0919ae33 100644 --- a/_examples/go.sum +++ b/_examples/go.sum @@ -264,6 +264,8 @@ github.com/scionproto/scion v0.9.1/go.mod h1:5oZGBv6ZI6gK5MgYlZ70FaNUXihxYLrRfol github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/_examples/helloworld_socket/helloworld_socket b/_examples/helloworld_socket/helloworld_socket new file mode 100755 index 00000000..17d2b197 Binary files /dev/null and b/_examples/helloworld_socket/helloworld_socket differ diff --git a/_examples/helloworld_socket/helloworld_socket.go b/_examples/helloworld_socket/helloworld_socket.go new file mode 100644 index 00000000..0fe6e5a2 --- /dev/null +++ b/_examples/helloworld_socket/helloworld_socket.go @@ -0,0 +1,118 @@ +// Copyright 2018 ETH Zurich +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "net/netip" + "os" + "time" + + "github.com/netsec-ethz/scion-apps/pkg/pan" +) + +func main() { + var err error + // get local and remote addresses from program arguments: + var listen pan.IPPortValue + flag.Var(&listen, "listen", "[Server] local IP:port to listen on") + remoteAddr := flag.String("remote", "", "[Client] Remote (i.e. the server's) SCION Address (e.g. 17-ffaa:1:1,[127.0.0.1]:12345)") + count := flag.Uint("count", 1, "[Client] Number of messages to send") + flag.Parse() + + if (listen.Get().Port() > 0) == (len(*remoteAddr) > 0) { + check(fmt.Errorf("either specify -listen for server or -remote for client")) + } + + if listen.Get().Port() > 0 { + err = runServer(listen.Get()) + check(err) + } else { + err = runClient(*remoteAddr, int(*count)) + check(err) + } +} + +func runServer(listen netip.AddrPort) error { + sock, err := pan.NewScionSocket(context.Background(), listen) + + if err != nil { + return err + } + defer sock.Close() + fmt.Println(sock.LocalAddr()) + + buffer := make([]byte, 16*1024) + for { + n, from, err := sock.ReadFrom(buffer) + if err != nil { + return err + } + data := buffer[:n] + fmt.Printf("Received %s: %s\n", from, data) + msg := fmt.Sprintf("take it back! %s", time.Now().Format("15:04:05.0")) + n, err = sock.WriteTo([]byte(msg), from) + if err != nil { + return err + } + fmt.Printf("Wrote %d bytes.\n", n) + } +} + +func runClient(address string, count int) error { + addr, err := pan.ResolveUDPAddr(context.TODO(), address) + if err != nil { + return err + } + + sock, err := pan.NewScionSocket(context.Background(), netip.AddrPort{}) + if err != nil { + return err + } + defer sock.Close() + + for i := 0; i < count; i++ { + nBytes, err := sock.WriteTo([]byte(fmt.Sprintf("hello world %s", time.Now().Format("15:04:05.0"))), addr) + if err != nil { + return err + } + fmt.Printf("Wrote %d bytes.\n", nBytes) + + buffer := make([]byte, 16*1024) + /* if err = conn.SetReadDeadline(time.Now().Add(1 * time.Second)); err != nil { + return err + } */ + n, _, err := sock.ReadFrom(buffer) + if errors.Is(err, os.ErrDeadlineExceeded) { + continue + } else if err != nil { + return err + } + data := buffer[:n] + fmt.Printf("Received reply: %s\n", data) + } + return nil +} + +// Check just ensures the error is nil, or complains and quits +func check(e error) { + if e != nil { + fmt.Fprintln(os.Stderr, "Fatal error:", e) + os.Exit(1) + } +} diff --git a/go.mod b/go.mod index 7fb9c594..6aee4efe 100644 --- a/go.mod +++ b/go.mod @@ -61,6 +61,7 @@ require ( github.com/quic-go/qtls-go1-20 v0.3.3 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/smartystreets/assertions v1.2.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect github.com/uber/jaeger-lib v2.0.0+incompatible // indirect go.uber.org/atomic v1.9.0 // indirect diff --git a/go.sum b/go.sum index ad0bad46..8e424213 100644 --- a/go.sum +++ b/go.sum @@ -284,6 +284,8 @@ github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg= github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= diff --git a/pkg/pan/combi_selector.go b/pkg/pan/combi_selector.go new file mode 100644 index 00000000..1b306264 --- /dev/null +++ b/pkg/pan/combi_selector.go @@ -0,0 +1,221 @@ +package pan + +import ( + "context" + "errors" +) + +// socket roles (selector internals) +const dialer = 0 +const listener = 1 + +/* +! +\brief this class provides the service of path selection for a remote address to scion-sockets +Every scion-socket has one. + + It should not be cluttered with Refresh()/Update()/PathDown or any other technical + methods that are required by the pathStatedDB or pathPool to update the selectors state. +*/ +type CombiSelector interface { + Close() error + // PathDown(PathFingerprint, PathInterface) // pathDownNotifyee + + // called when the scion-socket is bound to an address + LocalAddrChanged(newlocal UDPAddr) + Path(remote UDPAddr) (*Path, error) + //Refresh([]*Path) Selector + // Refresh(paths []*Path, remote UDPAddr) + Record(remote UDPAddr, path *Path) + //Update( )# + // setter the respective defaults + SetReplySelector(ReplySelector) + SetSelector(sel func() Selector) + SetPolicy(pol func() Policy) + + // setter for AS specific path policies + // SetSelectorFor(remote IA, sel Selector) + // SetPolicyFor(remote IA, pol Policy) + // SetPolicedSelectorFor(remote IA, sel Selector, pol Policy) + + // initialize(local UDPAddr, remote UDPAddr, paths []*Path) + // maybe make this pubilc and let the ScionCocketCall it, + // in its ctor (or once the local addr is known i.e after Bind() was called ) +} + +/* +ensures that its Path(remote) method returns a path to the remote that is valid at the moment it was requested. +It can be used to send the current packet, but not any subsequent ones, +as theoretically the path could have expired in between (so a new one has to be requested from the selector). +Beyond that, the DefaultCombiSelector (if default constructed) makes no promises, +i.e. the returned Path probably wont be the one with the LeastLatency, Hops or Highest Bandwidth. +If you want to get this kind of behaviour you have to mutate the selector by explicitly setting one or more +of its aggregate strategies: Selector,Policy and ReplySelector +*/ +type DefaultCombiSelector struct { + local_ia IA + local UDPAddr + roles map[UDPAddr]int // is this host the dialer or listener for the connection to this remote host + // decided from which method is called first for a remote address X + // Record(X)->listener or Path(X)->dialer + + // maybe make map of pair (, ,) ?! + // policies map[UDPAddr]Policy + policy_factory func() Policy + selector_factory func() Selector + + // TODO: this state should be confined in size somehow + // i.e. drop selectors with LRU scheme + // Note that this is not an attack vector, as this state can only be increased + // by deliberate decisions of this host to dial a remote for which it does not yet has a selector + selectors map[IA]Selector + subscribers map[IA]*pathRefreshSubscriber + replyselector ReplySelector +} + +func (s *DefaultCombiSelector) needPathTo(remote UDPAddr) bool { + return s.local.IA != remote.IA +} + +func (s *DefaultCombiSelector) Close() error { + + for _, v := range s.subscribers { + if e := v.Close(); e != nil { + return e + } + } + + for _, v := range s.selectors { + if e := v.Close(); e != nil { + return e + } + } + if e := s.replyselector.Close(); e != nil { + return e + } + + return nil +} + +func NewDefaultCombiSelector(local UDPAddr) (CombiSelector, error) { + selector := &DefaultCombiSelector{ + local: local, + local_ia: local.IA, + roles: make(map[UDPAddr]int), + // policies: make(map[UDPAddr]Policy), + policy_factory: func() Policy { + var s Policy = nil + return s + }, + selector_factory: func() Selector { return NewDefaultSelector() }, + selectors: make(map[IA]Selector), + subscribers: make(map[IA]*pathRefreshSubscriber), + replyselector: NewDefaultReplySelector(), + } + + selector.replyselector.Initialize(local.IA) + + return selector, nil +} + +func NewDefaultCombiSelector2() (CombiSelector, error) { + selector := &DefaultCombiSelector{ + local_ia: GetLocalIA(), + roles: make(map[UDPAddr]int), + // policies: make(map[UDPAddr]Policy), + policy_factory: func() Policy { + var s Policy = nil + return s + }, + selector_factory: func() Selector { return NewDefaultSelector() }, + selectors: make(map[IA]Selector), + subscribers: make(map[IA]*pathRefreshSubscriber), + replyselector: NewDefaultReplySelector(), + } + + selector.replyselector.Initialize(selector.local_ia) + + return selector, nil +} + +func (s *DefaultCombiSelector) SetReplySelector(rep ReplySelector) { + s.replyselector = rep +} + +func (s *DefaultCombiSelector) LocalAddrChanged(newlocal UDPAddr) { + s.local = newlocal + s.replyselector.LocalAddrChanged(newlocal) +} + +func (s *DefaultCombiSelector) Path(remote UDPAddr) (*Path, error) { + if r, ok := s.roles[remote]; ok { + // the role is already decided + if r == dialer { + if s.needPathTo(remote) { + sel := s.selectors[remote.IA] + sel.NewRemote(remote) + return sel.Path(remote) + } else { + return nil, errors.New("if src and dst are in same AS and no scion path is required, the connection shouldnt request one") + } + } else { + return s.replyselector.Path(remote) + } + } else { + // no role yet -> no path to remote has been requested yet Path() + // so we are acting as a server + s.roles[remote] = dialer + + // set up a refresherSubscriber etc .. + if s.needPathTo(remote) { + var selector Selector + var policy Policy + + if s.policy_factory != nil { + policy = s.policy_factory() + } + if s.selector_factory != nil { + selector = s.selector_factory() + } else { + selector = NewDefaultSelector() + } + var ctx context.Context = context.Background() + // Todo: set timeout for path request + subscriber, err := openPathRefreshSubscriber(ctx, s.local, remote, policy, selector) + if err != nil { + + return nil, err + } + s.selectors[remote.IA] = selector + s.subscribers[remote.IA] = subscriber + + return selector.Path(remote) + } else { + return nil, errors.New("if src and dst are in same AS and no scion path is required, the connection shouldnt request one") + } + } +} + +func (s *DefaultCombiSelector) SetPolicy(pol func() Policy) { + s.policy_factory = pol +} + +func (s *DefaultCombiSelector) SetSelector(sel func() Selector) { + s.selector_factory = sel +} + +func (s *DefaultCombiSelector) Record(remote UDPAddr, path *Path) { + + if r, ok := s.roles[remote]; ok { + // the role is already decided + if r == listener { + s.replyselector.Record(remote, path) + } + } else { + // no role yet -> no path to remote has been requested yet Path() + // so we are acting as a server + s.roles[remote] = listener + s.replyselector.Record(remote, path) + } + +} diff --git a/pkg/pan/refresher.go b/pkg/pan/refresher.go index 1da18b10..d7b1a1af 100644 --- a/pkg/pan/refresher.go +++ b/pkg/pan/refresher.go @@ -16,6 +16,7 @@ package pan import ( "context" + "fmt" "math/rand" "sync" "time" @@ -124,6 +125,7 @@ func (r *refresher) refresh() { // when the paths actually expire). // TODO: check whether there are errors that could be handled, like try to reconnect // to sciond or something like that. + fmt.Printf("unable to queryPaths: %v \n", err) continue } r.subscribersMutex.Lock() diff --git a/pkg/pan/scion_socket.go b/pkg/pan/scion_socket.go new file mode 100644 index 00000000..17ccc7ec --- /dev/null +++ b/pkg/pan/scion_socket.go @@ -0,0 +1,243 @@ +package pan + +import ( + "context" + "fmt" + "net" + "net/netip" + "time" + + "github.com/scionproto/scion/pkg/daemon" + "github.com/scionproto/scion/private/app" + "github.com/scionproto/scion/private/app/flag" +) + +/* +! +\brief a SCION UDP datagram socket +\details + + this is required by bindings to languages + whose networking libraries main abstraction are berkley sockets, + not connections as in Go. It is a prerequisite for i.e. P2P applications + where you want to act as both client and server on the same address/socket + utilizing QUIC implementations that support it i.e rust-quinn. + +the socket's task is to wrap any application layer data it gets passed into a valid Scion Header +and send it off the IP underlay. +Any received ScionPackets will be stripped off their underlay and ScionHeader and the payload +passed to the application. +Packets with data other than application payload i.e. SCMP messages will be handled by the socket +and are not propagated to the application. +*/ +type ScionSocket interface { + Close() error + Bind(context.Context, netip.AddrPort) error + WriteToVia(b []byte, dst UDPAddr, path *Path) (int, error) + ReadFromVia(b []byte) (int, UDPAddr, *Path, error) + LocalAddr() net.Addr + WriteTo(b []byte, dst net.Addr) (int, error) + ReadFrom(b []byte) (int, net.Addr, error) + + // SetCombiSelector(CombiSelector) + SetReplySelector(ReplySelector) + SetSelector(sel func() Selector) + SetPolicy(pol func() Policy) + + // setter for AS specific path policies + // SetSelectorFor(remote IA, sel Selector) + // SetPolicyFor(remote IA, pol Policy) + // SetPolicedSelectorFor(remote IA, sel Selector, pol Policy) + + SetReadDeadline(time.Time) error + SetWriteDeadline(time.Time) error + SetDeadline(time.Time) error +} + +type scionSocket struct { + local_ia IA + local UDPAddr + conn baseUDPConn + selector CombiSelector +} + +func (s *scionSocket) SetReadDeadline(t time.Time) error { + return s.conn.SetReadDeadline(t) +} + +func (s *scionSocket) SetWriteDeadline(t time.Time) error { + return s.conn.SetWriteDeadline(t) +} + +func (s *scionSocket) SetDeadline(t time.Time) error { + return s.conn.SetDeadline(t) +} + +/* +! +\brief open a scion-socket that is bound to the specified local addr + +TODO: + - it should be possible to defer the binding of the socket + from ctor to a later explicit Bind() call. + - accept Selector, ReplySelector and Policy arguments here + or a CombiSelector to customize the behaviour of the socket +*/ +func NewScionSocket(ctx context.Context /*rsel ReplySelector, pol Policy, sel Selector,*/, local netip.AddrPort) (ScionSocket, error) { + local, err := defaultLocalAddr(local) + if err != nil { + return nil, err + } + raw, slocal, err := openBaseUDPConn(ctx, local) + if err != nil { + return nil, err + } + + // as of now the local parameter is only required + // for the selector to determine if a remote is in the same AS as itself + sel, e := NewDefaultCombiSelector(slocal) + if e != nil { + return nil, e + } + + return &scionSocket{ + local_ia: slocal.IA, + local: slocal, + conn: baseUDPConn{raw: raw}, + selector: sel, + }, nil +} + +/* +returns a socket which is not bound to an address yet +Bind() has to be called later in order to receive packets +*/ +func NewScionSocket2() (ScionSocket, error) { + + // as of now the local parameter is only required + // for the selector to determine if a remote is in the same AS as itself + sel, e := NewDefaultCombiSelector2() + if e != nil { + return nil, e + } + + return &scionSocket{ + local_ia: GetLocalIA(), + + selector: sel, + }, nil +} + +var local_ia_init bool = false +var local_ia IA + +func GetLocalIA() IA { + if !local_ia_init { + var envFlags flag.SCIONEnvironment + var service daemon.Service + + if err := envFlags.LoadExternalVars(); err != nil { + panic(fmt.Sprintf("pan initialization failed: %v", err)) + } + + // is this even necessary ?! or can the IA be read from the envFlags already?! docs dont tell it + daemonAddr := envFlags.Daemon() + service = daemon.NewService(daemonAddr) + ctx, cancelF := context.WithTimeout(context.Background(), time.Second) + defer cancelF() + conn, err := service.Connect(ctx) + if err != nil { + panic(fmt.Sprintf("connecting to SCION Daemon: %v", err)) + } + defer conn.Close() + + info, err := app.QueryASInfo(ctx, conn) + if err != nil { + panic(fmt.Sprintf("%v", err)) + } + local_ia_init = true + local_ia = IA(info.IA) + + } + return local_ia +} + +func (s *scionSocket) Close() error { + if err := s.selector.Close(); err != nil { + return err + } + if err := s.conn.Close(); err != nil { + return err + } + return nil +} + +func (s *scionSocket) SetReplySelector(rsel ReplySelector) { + s.selector.SetReplySelector(rsel) +} +func (s *scionSocket) SetSelector(sel func() Selector) { + s.selector.SetSelector(sel) +} +func (s *scionSocket) SetPolicy(pol func() Policy) { + s.selector.SetPolicy(pol) +} + +func (s *scionSocket) Bind(ctx context.Context, local netip.AddrPort) error { + local, err := defaultLocalAddr(local) + if err != nil { + return err + } + raw, slocal, err := openBaseUDPConn(ctx, local) + if err != nil { + return err + } + s.conn.raw = raw + s.local = slocal + + // notify CombiSelector about address change + s.selector.LocalAddrChanged(slocal) + return nil + +} + +func (s *scionSocket) needPathTo(remote UDPAddr) bool { + return s.local.IA != remote.IA +} + +func (s *scionSocket) WriteToVia(b []byte, dst UDPAddr, path *Path) (int, error) { + return s.conn.writeMsg(s.local, dst, path, b) +} + +func (s *scionSocket) ReadFromVia(b []byte) (int, UDPAddr, *Path, error) { + n, remote, fwPath, err := s.conn.readMsg(b) + if err != nil { + return n, UDPAddr{}, nil, err + } + path, err := reversePathFromForwardingPath(remote.IA, s.local.IA, fwPath) + s.selector.Record(remote, path) + return n, remote, path, err +} + +func (s *scionSocket) LocalAddr() net.Addr { + return s.local +} + +func (s *scionSocket) WriteTo(b []byte, dst net.Addr) (int, error) { + sdst, ok := dst.(UDPAddr) + if !ok { + return 0, errBadDstAddress + } + var path *Path + if s.needPathTo(sdst) { + path, _ = s.selector.Path(sdst) + if path == nil { + return 0, errNoPathTo(sdst.IA) + } + } + return s.WriteToVia(b, sdst, path) +} + +func (s *scionSocket) ReadFrom(b []byte) (int, net.Addr, error) { + n, a, _, e := s.ReadFromVia(b) + return n, a, e +} diff --git a/pkg/pan/selector.go b/pkg/pan/selector.go index 5bbea671..bf4ab40b 100644 --- a/pkg/pan/selector.go +++ b/pkg/pan/selector.go @@ -16,8 +16,10 @@ package pan import ( "context" + "errors" "fmt" "net" + "slices" "sync" "sync/atomic" "time" @@ -31,7 +33,17 @@ import ( type Selector interface { // Path selects the path for the next packet. // Invoked for each packet sent with Write. - Path() *Path + Path(remote UDPAddr) (*Path, error) + + NewRemote(remote UDPAddr) error + // inform the selector, that it is now responsible for another address + // which must be in the same AS as any previous addresses + // this gives more complex selectors (i.e. Pinging) the chance to + // ping more than one scion address + + // get the IA for which this selector provides paths + GetIA() IA + // Initialize the selector for a connection with the initial list of paths, // filtered/ordered by the Policy. // Invoked once during the creation of a Conn. @@ -55,28 +67,41 @@ type Selector interface { // switch to the first path (in the order defined by the policy) that is not // affected by down notifications. type DefaultSelector struct { - mutex sync.Mutex - paths []*Path - current int + mutex sync.Mutex + paths []*Path + current int + remote_ia IA +} + +func (s *DefaultSelector) GetIA() IA { + return s.remote_ia +} + +func (s *DefaultSelector) NewRemote(remote UDPAddr) error { + if remote.IA != s.remote_ia { + return errors.New("address must be inside the AS, which the selector was initialized with") + } + return nil } func NewDefaultSelector() *DefaultSelector { return &DefaultSelector{} } -func (s *DefaultSelector) Path() *Path { +func (s *DefaultSelector) Path(remote UDPAddr) (*Path, error) { s.mutex.Lock() defer s.mutex.Unlock() if len(s.paths) == 0 { - return nil + return nil, errors.New("DefaultPathSelector initialized with empty path list and never refreshed") } - return s.paths[s.current] + return s.paths[s.current], nil } func (s *DefaultSelector) Initialize(local, remote UDPAddr, paths []*Path) { s.mutex.Lock() defer s.mutex.Unlock() + s.remote_ia = remote.IA s.paths = paths s.current = 0 @@ -119,6 +144,10 @@ func (s *DefaultSelector) Close() error { return nil } +/* +selects the path to each of set of remote hosts +in the same remote AS by periodically pinging them +*/ type PingingSelector struct { // Interval for pinging. Must be positive. Interval time.Duration @@ -127,9 +156,9 @@ type PingingSelector struct { mutex sync.Mutex paths []*Path - current int + current map[scionAddr]int local scionAddr - remote scionAddr + remotes []scionAddr numActive int64 pingerCtx context.Context @@ -137,20 +166,32 @@ type PingingSelector struct { pinger *ping.Pinger } +func (s *PingingSelector) GetIA() IA { + return s.remotes[0].IA +} + +func (s *PingingSelector) NewRemote(remote UDPAddr) error { + if remote.IA != s.remotes[0].IA { + return errors.New("path selection domain for selectors can only contain addresses inside same AS!") + } + s.remotes = append(s.remotes, scionAddr{IA: remote.IA, IP: remote.IP}) + return nil +} + // SetActive enables active pinging on at most numActive paths. func (s *PingingSelector) SetActive(numActive int) { s.ensureRunning() atomic.SwapInt64(&s.numActive, int64(numActive)) } -func (s *PingingSelector) Path() *Path { +func (s *PingingSelector) Path(remote UDPAddr) (*Path, error) { s.mutex.Lock() defer s.mutex.Unlock() if len(s.paths) == 0 { - return nil + return nil, errors.New("PingingSelector initialized with empty path list and never refreshed") } - return s.paths[s.current] + return s.paths[s.current[scionAddr{IA: remote.IA, IP: remote.IP}]], nil } func (s *PingingSelector) Initialize(local, remote UDPAddr, paths []*Path) { @@ -158,9 +199,12 @@ func (s *PingingSelector) Initialize(local, remote UDPAddr, paths []*Path) { defer s.mutex.Unlock() s.local = local.scionAddr() - s.remote = remote.scionAddr() + s.remotes = append(s.remotes, remote.scionAddr()) s.paths = paths - s.current = stats.LowestLatency(s.remote, s.paths) + for _, rem := range s.remotes { + s.current[rem] = stats.LowestLatency(rem, s.paths) + } + //s.current = stats.LowestLatency(s.remote, s.paths) } func (s *PingingSelector) Refresh(paths []*Path) { @@ -168,25 +212,31 @@ func (s *PingingSelector) Refresh(paths []*Path) { defer s.mutex.Unlock() s.paths = paths - s.current = stats.LowestLatency(s.remote, s.paths) + // s.current = stats.LowestLatency(s.remote, s.paths) + for _, rem := range s.remotes { + s.current[rem] = stats.LowestLatency(rem, s.paths) + } } func (s *PingingSelector) PathDown(pf PathFingerprint, pi PathInterface) { - s.reselectPath() + s.reselectPaths() } -func (s *PingingSelector) reselectPath() { +func (s *PingingSelector) reselectPaths() { s.mutex.Lock() defer s.mutex.Unlock() - s.current = stats.LowestLatency(s.remote, s.paths) + // s.current = stats.LowestLatency(s.remote, s.paths) + for _, rem := range s.remotes { + s.current[rem] = stats.LowestLatency(rem, s.paths) + } } func (s *PingingSelector) ensureRunning() { s.mutex.Lock() defer s.mutex.Unlock() - if s.local.IA == s.remote.IA { + if s.local.IA == s.remotes[0].IA { return } if s.pinger != nil { @@ -203,6 +253,11 @@ func (s *PingingSelector) ensureRunning() { go s.run() } +type probeKey struct { + pf PathFingerprint + addr scionAddr +} + func (s *PingingSelector) run() { pingTicker := time.NewTicker(s.Interval) pingTimeout := time.NewTimer(0) @@ -211,7 +266,7 @@ func (s *PingingSelector) run() { } var sequenceNo uint16 - replyPending := make(map[PathFingerprint]struct{}) + replyPending := make(map[probeKey]struct{}) for { select { @@ -228,7 +283,9 @@ func (s *PingingSelector) run() { activePaths := s.paths[:numActive] for _, p := range activePaths { - replyPending[p.Fingerprint] = struct{}{} + for _, rem := range s.remotes { + replyPending[probeKey{p.Fingerprint, rem}] = struct{}{} + } } sequenceNo++ s.sendPings(activePaths, sequenceNo) @@ -237,35 +294,37 @@ func (s *PingingSelector) run() { s.handlePingReply(r, replyPending, sequenceNo) if len(replyPending) == 0 { pingTimeout.Stop() - s.reselectPath() + s.reselectPaths() } case <-pingTimeout.C: if len(replyPending) == 0 { continue // already handled above } - for pf := range replyPending { - stats.RecordLatency(s.remote, pf, s.Timeout) - delete(replyPending, pf) + for probe := range replyPending { + stats.RecordLatency(probe.addr, probe.pf, s.Timeout) + delete(replyPending, probe) } - s.reselectPath() + s.reselectPaths() } } } func (s *PingingSelector) sendPings(paths []*Path, sequenceNo uint16) { for _, p := range paths { - remote := s.remote.snetUDPAddr() - remote.Path = p.ForwardingPath.dataplanePath - remote.NextHop = net.UDPAddrFromAddrPort(p.ForwardingPath.underlay) - err := s.pinger.Send(s.pingerCtx, remote, sequenceNo, 16) - if err != nil { - panic(err) + for _, rem := range s.remotes { + remote := rem.snetUDPAddr() + remote.Path = p.ForwardingPath.dataplanePath + remote.NextHop = net.UDPAddrFromAddrPort(p.ForwardingPath.underlay) + err := s.pinger.Send(s.pingerCtx, remote, sequenceNo, 16) + if err != nil { + panic(err) + } } } } func (s *PingingSelector) handlePingReply(reply ping.Reply, - expectedReplies map[PathFingerprint]struct{}, + expectedReplies map[probeKey]struct{}, expectedSequenceNo uint16) { if reply.Error != nil { // handle NotifyPathDown. @@ -299,18 +358,18 @@ func (s *PingingSelector) handlePingReply(reply ping.Reply, IA: IA(reply.Source.IA), IP: reply.Source.Host.IP(), } - if src != s.remote || reply.Reply.SeqNumber != expectedSequenceNo { + if !slices.Contains(s.remotes, src) || reply.Reply.SeqNumber != expectedSequenceNo { return } pf, err := reversePathFingerprint(reply.Path) if err != nil { return } - if _, expected := expectedReplies[pf]; !expected { + if _, expected := expectedReplies[probeKey{pf, src}]; !expected { return } - stats.RecordLatency(s.remote, pf, reply.RTT()) - delete(expectedReplies, pf) + stats.RecordLatency(src, pf, reply.RTT()) + delete(expectedReplies, probeKey{pf, src}) } func (s *PingingSelector) Close() error { diff --git a/pkg/pan/udp_dial.go b/pkg/pan/udp_dial.go index 5ef55aad..508a0124 100644 --- a/pkg/pan/udp_dial.go +++ b/pkg/pan/udp_dial.go @@ -98,7 +98,8 @@ func (c *dialedConn) LocalAddr() net.Addr { } func (c *dialedConn) GetPath() *Path { - return c.selector.Path() + p, _ := c.selector.Path(c.remote) + return p } func (c *dialedConn) RemoteAddr() net.Addr { @@ -108,9 +109,15 @@ func (c *dialedConn) RemoteAddr() net.Addr { func (c *dialedConn) Write(b []byte) (int, error) { var path *Path if c.local.IA != c.remote.IA { - path = c.selector.Path() - if path == nil { - return 0, errNoPathTo(c.remote.IA) + + var err error + path, err = c.selector.Path(c.remote) + if err != nil { + + return 0, err + /*if path == nil { + return 0, errNoPathTo(c.remote.IA) + }*/ } } return c.baseUDPConn.writeMsg(c.local, c.remote, path, b) diff --git a/pkg/pan/udp_listen.go b/pkg/pan/udp_listen.go index 685f0109..ffedb19b 100644 --- a/pkg/pan/udp_listen.go +++ b/pkg/pan/udp_listen.go @@ -31,10 +31,13 @@ var errBadDstAddress error = errors.New("dst address not a UDPAddr") type ReplySelector interface { // Path selects the path for the next packet to remote. // Invoked for each packet sent with WriteTo. - Path(remote UDPAddr) *Path + Path(remote UDPAddr) (*Path, error) // Initialize the selector. // Invoked once during the creation of a ListenConn. - Initialize(local UDPAddr) + // local Isd-As is enough ?! i cant think of an application for the address here + Initialize(local IA) + // called when the scion-socket is bound to an address + LocalAddrChanged(newlocal UDPAddr) // Record a path used by the remote for a packet received. // Invoked whenever a packet is received. // The path is reversed, i.e. it's the path from here to remote. @@ -72,7 +75,7 @@ func ListenUDP(ctx context.Context, local netip.AddrPort, if err != nil { return nil, err } - selector.Initialize(slocal) + selector.Initialize(slocal.IA) if len(os.Getenv("SCION_GO_INTEGRATION")) > 0 { fmt.Printf("Listening addr=%s\n", slocal) @@ -120,7 +123,8 @@ func (c *listenConn) WriteTo(b []byte, dst net.Addr) (int, error) { } var path *Path if c.local.IA != sdst.IA { - path = c.selector.Path(sdst) + + path, _ = c.selector.Path(sdst) if path == nil { return 0, errNoPathTo(sdst.IA) } @@ -150,17 +154,21 @@ func NewDefaultReplySelector() *DefaultReplySelector { } } -func (s *DefaultReplySelector) Initialize(local UDPAddr) { +func (s *DefaultReplySelector) Initialize(local IA) { +} + +func (s *DefaultReplySelector) LocalAddrChanged(newlocal UDPAddr) { + } -func (s *DefaultReplySelector) Path(remote UDPAddr) *Path { +func (s *DefaultReplySelector) Path(remote UDPAddr) (*Path, error) { s.mtx.RLock() defer s.mtx.RUnlock() r, ok := s.remotes[remote] if !ok || len(r.paths) == 0 { - return nil + return nil, errors.New("path requested for remote from which no package was received earlier") } - return r.paths[0] + return r.paths[0], nil } func (s *DefaultReplySelector) Record(remote UDPAddr, path *Path) {