77 "time"
88
99 "github.com/google/uuid"
10- "github.com/lucas-clemente/quic-go"
1110 "github.com/rs/zerolog"
12- "golang.org/x/sync/errgroup"
11+
12+ quicpogs "github.com/cloudflare/cloudflared/quic"
1313)
1414
1515const (
@@ -36,26 +36,25 @@ type Manager interface {
3636type manager struct {
3737 registrationChan chan * registerSessionEvent
3838 unregistrationChan chan * unregisterSessionEvent
39- datagramChan chan * newDatagram
40- closedChan chan struct {}
41- transport transport
39+ sendFunc transportSender
40+ receiveChan <- chan * quicpogs. SessionDatagram
41+ closedChan <- chan struct {}
4242 sessions map [uuid.UUID ]* Session
4343 log * zerolog.Logger
4444 // timeout waiting for an API to finish. This can be overriden in test
4545 timeout time.Duration
4646}
4747
48- func NewManager (transport transport , log * zerolog.Logger ) * manager {
48+ func NewManager (log * zerolog.Logger , sendF transportSender , receiveChan <- chan * quicpogs. SessionDatagram ) * manager {
4949 return & manager {
5050 registrationChan : make (chan * registerSessionEvent ),
5151 unregistrationChan : make (chan * unregisterSessionEvent ),
52- // datagramChan is buffered, so it can read more datagrams from transport while the event loop is processing other events
53- datagramChan : make (chan * newDatagram , requestChanCapacity ),
54- closedChan : make (chan struct {}),
55- transport : transport ,
56- sessions : make (map [uuid.UUID ]* Session ),
57- log : log ,
58- timeout : defaultReqTimeout ,
52+ sendFunc : sendF ,
53+ receiveChan : receiveChan ,
54+ closedChan : make (chan struct {}),
55+ sessions : make (map [uuid.UUID ]* Session ),
56+ log : log ,
57+ timeout : defaultReqTimeout ,
5958 }
6059}
6160
@@ -65,49 +64,21 @@ func (m *manager) UpdateLogger(log *zerolog.Logger) {
6564}
6665
6766func (m * manager ) Serve (ctx context.Context ) error {
68- errGroup , ctx := errgroup .WithContext (ctx )
69- errGroup .Go (func () error {
70- for {
71- sessionID , payload , err := m .transport .ReceiveFrom ()
72- if err != nil {
73- if aerr , ok := err .(* quic.ApplicationError ); ok && uint64 (aerr .ErrorCode ) == uint64 (quic .NoError ) {
74- return nil
75- } else {
76- return err
77- }
78- }
79- datagram := & newDatagram {
80- sessionID : sessionID ,
81- payload : payload ,
82- }
83- select {
84- case <- ctx .Done ():
85- return ctx .Err ()
86- // Only the event loop routine can update/lookup the sessions map to avoid concurrent access
87- // Send the datagram to the event loop. It will find the session to send to
88- case m .datagramChan <- datagram :
89- }
90- }
91- })
92- errGroup .Go (func () error {
93- for {
94- select {
95- case <- ctx .Done ():
96- return nil
97- case datagram := <- m .datagramChan :
98- m .sendToSession (datagram )
99- case registration := <- m .registrationChan :
100- m .registerSession (ctx , registration )
101- // TODO: TUN-5422: Unregister inactive session upon timeout
102- case unregistration := <- m .unregistrationChan :
103- m .unregisterSession (unregistration )
104- }
67+ for {
68+ select {
69+ case <- ctx .Done ():
70+ m .shutdownSessions (ctx .Err ())
71+ return ctx .Err ()
72+ // receiveChan is buffered, so the transport can read more datagrams from transport while the event loop is
73+ // processing other events
74+ case datagram := <- m .receiveChan :
75+ m .sendToSession (datagram )
76+ case registration := <- m .registrationChan :
77+ m .registerSession (ctx , registration )
78+ case unregistration := <- m .unregistrationChan :
79+ m .unregisterSession (unregistration )
10580 }
106- })
107- err := errGroup .Wait ()
108- close (m .closedChan )
109- m .shutdownSessions (err )
110- return err
81+ }
11182}
11283
11384func (m * manager ) shutdownSessions (err error ) {
@@ -149,16 +120,17 @@ func (m *manager) registerSession(ctx context.Context, registration *registerSes
149120}
150121
151122func (m * manager ) newSession (id uuid.UUID , dstConn io.ReadWriteCloser ) * Session {
123+ logger := m .log .With ().Str ("sessionID" , id .String ()).Logger ()
152124 return & Session {
153- ID : id ,
154- transport : m .transport ,
155- dstConn : dstConn ,
125+ ID : id ,
126+ sendFunc : m .sendFunc ,
127+ dstConn : dstConn ,
156128 // activeAtChan has low capacity. It can be full when there are many concurrent read/write. markActive() will
157129 // drop instead of blocking because last active time only needs to be an approximation
158130 activeAtChan : make (chan time.Time , 2 ),
159131 // capacity is 2 because close() and dstToTransport routine in Serve() can write to this channel
160132 closeChan : make (chan error , 2 ),
161- log : m . log ,
133+ log : & logger ,
162134 }
163135}
164136
@@ -191,16 +163,13 @@ func (m *manager) unregisterSession(unregistration *unregisterSessionEvent) {
191163 }
192164}
193165
194- func (m * manager ) sendToSession (datagram * newDatagram ) {
195- session , ok := m .sessions [datagram .sessionID ]
166+ func (m * manager ) sendToSession (datagram * quicpogs. SessionDatagram ) {
167+ session , ok := m .sessions [datagram .ID ]
196168 if ! ok {
197- m .log .Error ().Str ("sessionID" , datagram .sessionID .String ()).Msg ("session not found" )
169+ m .log .Error ().Str ("sessionID" , datagram .ID .String ()).Msg ("session not found" )
198170 return
199171 }
200172 // session writes to destination over a connected UDP socket, which should not be blocking, so this call doesn't
201173 // need to run in another go routine
202- _ , err := session .transportToDst (datagram .payload )
203- if err != nil {
204- m .log .Err (err ).Str ("sessionID" , datagram .sessionID .String ()).Msg ("Failed to write payload to session" )
205- }
174+ session .transportToDst (datagram .Payload )
206175}
0 commit comments