@@ -51,7 +51,7 @@ type QUICConnection struct {
5151 sessionManager datagramsession.Manager
5252 // datagramMuxer mux/demux datagrams from quic connection
5353 datagramMuxer quicpogs.BaseDatagramMuxer
54- packetRouter * packetRouter
54+ packetRouter * packet. Router
5555 controlStreamHandler ControlStreamHandler
5656 connOptions * tunnelpogs.ConnectionOptions
5757}
@@ -65,7 +65,7 @@ func NewQUICConnection(
6565 connOptions * tunnelpogs.ConnectionOptions ,
6666 controlStreamHandler ControlStreamHandler ,
6767 logger * zerolog.Logger ,
68- icmpProxy ingress. ICMPProxy ,
68+ icmpRouter packet. ICMPRouter ,
6969) (* QUICConnection , error ) {
7070 session , err := quic .DialAddr (edgeAddr .String (), tlsConfig , quicConfig )
7171 if err != nil {
@@ -75,15 +75,12 @@ func NewQUICConnection(
7575 sessionDemuxChan := make (chan * packet.Session , demuxChanCapacity )
7676 var (
7777 datagramMuxer quicpogs.BaseDatagramMuxer
78- pr * packetRouter
78+ pr * packet. Router
7979 )
80- if icmpProxy != nil {
81- pr = & packetRouter {
82- muxer : quicpogs .NewDatagramMuxerV2 (session , logger , sessionDemuxChan ),
83- icmpProxy : icmpProxy ,
84- logger : logger ,
85- }
86- datagramMuxer = pr .muxer
80+ if icmpRouter != nil {
81+ datagramMuxerV2 := quicpogs .NewDatagramMuxerV2 (session , logger , sessionDemuxChan )
82+ pr = packet .NewRouter (datagramMuxerV2 , & returnPipe {muxer : datagramMuxerV2 }, icmpRouter , logger )
83+ datagramMuxer = datagramMuxerV2
8784 } else {
8885 datagramMuxer = quicpogs .NewDatagramMuxer (session , logger , sessionDemuxChan )
8986 }
@@ -139,7 +136,7 @@ func (q *QUICConnection) Serve(ctx context.Context) error {
139136 if q .packetRouter != nil {
140137 errGroup .Go (func () error {
141138 defer cancel ()
142- return q .packetRouter .serve (ctx )
139+ return q .packetRouter .Serve (ctx )
143140 })
144141 }
145142
@@ -348,50 +345,6 @@ func (q *QUICConnection) UpdateConfiguration(ctx context.Context, version int32,
348345 return q .orchestrator .UpdateConfig (version , config )
349346}
350347
351- type packetRouter struct {
352- muxer * quicpogs.DatagramMuxerV2
353- icmpProxy ingress.ICMPProxy
354- logger * zerolog.Logger
355- }
356-
357- func (pr * packetRouter ) serve (ctx context.Context ) error {
358- icmpDecoder := packet .NewICMPDecoder ()
359- for {
360- pk , err := pr .muxer .ReceivePacket (ctx )
361- if err != nil {
362- return err
363- }
364- icmpPacket , err := icmpDecoder .Decode (pk )
365- if err != nil {
366- pr .logger .Err (err ).Msg ("Failed to decode ICMP packet from quic datagram" )
367- continue
368- }
369-
370- flowPipe := muxerResponder {muxer : pr .muxer }
371- if err := pr .icmpProxy .Request (icmpPacket , & flowPipe ); err != nil {
372- pr .logger .Err (err ).
373- Str ("src" , icmpPacket .Src .String ()).
374- Str ("dst" , icmpPacket .Dst .String ()).
375- Interface ("type" , icmpPacket .Type ).
376- Msg ("Failed to send ICMP packet" )
377- continue
378- }
379- }
380- }
381-
382- // muxerResponder wraps DatagramMuxerV2 to satisfy the packet.FunnelUniPipe interface
383- type muxerResponder struct {
384- muxer * quicpogs.DatagramMuxerV2
385- }
386-
387- func (mr * muxerResponder ) SendPacket (dst netip.Addr , pk packet.RawPacket ) error {
388- return mr .muxer .SendPacket (pk )
389- }
390-
391- func (mr * muxerResponder ) Close () error {
392- return nil
393- }
394-
395348// streamReadWriteAcker is a light wrapper over QUIC streams with a callback to send response back to
396349// the client.
397350type streamReadWriteAcker struct {
@@ -538,3 +491,16 @@ func (np *nopCloserReadWriter) Close() error {
538491
539492 return nil
540493}
494+
495+ // returnPipe wraps DatagramMuxerV2 to satisfy the packet.FunnelUniPipe interface
496+ type returnPipe struct {
497+ muxer * quicpogs.DatagramMuxerV2
498+ }
499+
500+ func (rp * returnPipe ) SendPacket (dst netip.Addr , pk packet.RawPacket ) error {
501+ return rp .muxer .SendPacket (pk )
502+ }
503+
504+ func (rp * returnPipe ) Close () error {
505+ return nil
506+ }
0 commit comments