Skip to content

Commit bae9703

Browse files
committed
implemented contextualized waiting for peers
introduced new functions with `WithContext` suffix to not break API compatibility.
1 parent 5ab2f23 commit bae9703

File tree

4 files changed

+130
-1
lines changed

4 files changed

+130
-1
lines changed

api/smp.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,66 @@ func (mp *MPPeerSock) WaitForPeerConnect(sel pathselection.CustomPathSelection)
207207
return remote, err
208208
}
209209

210+
//
211+
// This method waits until a remote MPPeerSock calls connect to this
212+
// socket's local address
213+
// A pathselection may be passed, which lets the socket dialing back to its remote
214+
// (e.g. for server-side path selection)
215+
// Since the MPPeerSock waits for only one incoming connection to determine a new peer
216+
// it starts waiting for other connections (if no selection passed) and fires the
217+
// OnConnectionsChange event for each new incoming connection
218+
//
219+
func (mp *MPPeerSock) WaitForPeerConnectWithContext(ctx context.Context, sel pathselection.CustomPathSelection) (*snet.UDPAddr, error) {
220+
log.Debugf("Waiting for incoming connection")
221+
remote, err := mp.UnderlaySocket.WaitForDialInWithContext(ctx)
222+
if err != nil {
223+
return nil, err
224+
}
225+
log.Debugf("Accepted connection from %s", remote.String())
226+
mp.Peer = remote
227+
mp.selection = sel
228+
// Start selection process -> will update DB
229+
mp.StartPathSelection(sel, sel == nil)
230+
log.Debugf("Done path selection")
231+
// wait until first signal on channel
232+
// selectedPathSet := <-mp.OnPathsetChange
233+
// time.Sleep(1 * time.Second)
234+
// dial all paths selected by user algorithm
235+
if sel != nil {
236+
err = mp.DialAll(mp.SelectedPathSet, &socket.ConnectOptions{
237+
SendAddrPacket: false,
238+
})
239+
mp.collectMetrics()
240+
} else {
241+
mp.collectMetrics()
242+
go func() {
243+
conns := mp.UnderlaySocket.GetConnections()
244+
mp.PacketScheduler.SetConnections(conns)
245+
mp.PathQualityDB.SetConnections(conns)
246+
mp.connectionSetChange(conns)
247+
for {
248+
log.Debugf("Waiting for new connections...")
249+
conn, err := mp.UnderlaySocket.WaitForIncomingConnWithContext(ctx)
250+
if conn == nil && err == nil {
251+
log.Debugf("Socket does not implement WaitForIncomingConn, stopping here...")
252+
return
253+
}
254+
if err != nil {
255+
log.Errorf("Failed to wait for incoming connection %s", err.Error())
256+
return
257+
}
258+
259+
conns := mp.UnderlaySocket.GetConnections()
260+
mp.PacketScheduler.SetConnections(conns)
261+
mp.PathQualityDB.SetConnections(conns)
262+
mp.connectionSetChange(conns)
263+
}
264+
}()
265+
}
266+
267+
return remote, err
268+
}
269+
210270
func (mp *MPPeerSock) collectMetrics() {
211271
mp.metricsTicker = time.NewTicker(mp.MetricsInterval)
212272
go func() {

socket/quicsocket.go

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,70 @@ func (s *QUICSocket) WaitForIncomingConn() (packets.UDPConn, error) {
137137
}
138138
}
139139

140+
func (s *QUICSocket) WaitForIncomingConnWithContext(ctx context.Context) (packets.UDPConn, error) {
141+
if s.options == nil || !s.options.MultiportMode {
142+
log.Debugf("Waiting for new connection")
143+
stream, err := s.listenConns[0].AcceptStreamWithContext(ctx)
144+
if err != nil {
145+
log.Fatalf("QUIC Accept err %s", err.Error())
146+
}
147+
148+
log.Debugf("Accepted new Stream on listen socket")
149+
150+
bts := make([]byte, packets.PACKET_SIZE)
151+
_, err = stream.Read(bts)
152+
153+
if s.listenConns[0].GetInternalConn() == nil {
154+
s.listenConns[0].SetStream(stream)
155+
select {
156+
case s.listenConns[0].Ready <- true:
157+
default:
158+
}
159+
160+
return s.listenConns[0], nil
161+
} else {
162+
newConn := &packets.QUICReliableConn{}
163+
id := RandStringBytes(32)
164+
newConn.SetId(id)
165+
newConn.SetLocal(*s.localAddr)
166+
newConn.SetRemote(s.listenConns[0].GetRemote())
167+
newConn.SetStream(stream)
168+
s.listenConns = append(s.listenConns, newConn)
169+
170+
_, err = stream.Read(bts)
171+
if err != nil {
172+
return nil, err
173+
}
174+
return newConn, nil
175+
}
176+
} else {
177+
addr := s.localAddr.Copy()
178+
addr.Host.Port = s.localAddr.Host.Port + len(s.listenConns)
179+
conn := &packets.QUICReliableConn{}
180+
err := conn.Listen(*addr)
181+
if err != nil {
182+
return nil, err
183+
}
184+
185+
stream, err := conn.AcceptStreamWithContext(ctx)
186+
if err != nil {
187+
return nil, err
188+
}
189+
190+
id := RandStringBytes(32)
191+
conn.SetId(id)
192+
193+
conn.SetStream(stream)
194+
s.listenConns = append(s.listenConns, conn)
195+
bts := make([]byte, packets.PACKET_SIZE)
196+
_, err = stream.Read(bts)
197+
if err != nil {
198+
return nil, err
199+
}
200+
return conn, nil
201+
}
202+
}
203+
140204
func (s *QUICSocket) WaitForDialIn() (*snet.UDPAddr, error) {
141205
bts := make([]byte, packets.PACKET_SIZE)
142206
log.Debugf("Wait for Dial In")
@@ -210,7 +274,7 @@ func (s *QUICSocket) WaitForDialInWithContext(ctx context.Context) (*snet.UDPAdd
210274
log.Debugf("Waiting for %d more connections", p.NumPaths-1)
211275

212276
for i := 1; i < p.NumPaths; i++ {
213-
_, err := s.WaitForIncomingConn()
277+
_, err := s.WaitForIncomingConnWithContext(ctx)
214278
if err != nil {
215279
return nil, err
216280
}

socket/scionsocket.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,10 @@ func (s *SCIONSocket) WaitForIncomingConn() (packets.UDPConn, error) {
140140
return nil, nil
141141
}
142142

143+
func (s *SCIONSocket) WaitForIncomingConnWithContext(ctx context.Context) (packets.UDPConn, error) {
144+
return nil, nil
145+
}
146+
143147
func (s *SCIONSocket) DialAll(remote snet.UDPAddr, path []pathselection.PathQuality, options DialOptions) ([]packets.UDPConn, error) {
144148
// There is always one listening connection
145149
conns := make([]packets.UDPConn, 1)

socket/socket.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type UnderlaySocket interface {
2929
WaitForDialIn() (*snet.UDPAddr, error)
3030
WaitForDialInWithContext(ctx context.Context) (*snet.UDPAddr, error)
3131
WaitForIncomingConn() (packets.UDPConn, error)
32+
WaitForIncomingConnWithContext(ctx context.Context) (packets.UDPConn, error)
3233
Dial(remote snet.UDPAddr, path snet.Path, options DialOptions, i int) (packets.UDPConn, error)
3334
DialAll(remote snet.UDPAddr, path []pathselection.PathQuality, options DialOptions) ([]packets.UDPConn, error)
3435
CloseAll() []error

0 commit comments

Comments
 (0)