Skip to content

Commit 49438f3

Browse files
committed
TUN-6813: Only proxy ICMP packets when warp-routing is enabled
1 parent eacc8c6 commit 49438f3

File tree

7 files changed

+145
-22
lines changed

7 files changed

+145
-22
lines changed

connection/connection.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type Orchestrator interface {
3939
UpdateConfig(version int32, config []byte) *pogs.UpdateConfigurationResponse
4040
GetConfigJSON() ([]byte, error)
4141
GetOriginProxy() (OriginProxy, error)
42+
WarpRoutingEnabled() (enabled bool)
4243
}
4344

4445
type NamedTunnelProperties struct {

connection/connection_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ func (mcr *mockOrchestrator) GetOriginProxy() (OriginProxy, error) {
5656
return mcr.originProxy, nil
5757
}
5858

59+
func (mcr *mockOrchestrator) WarpRoutingEnabled() (enabled bool) {
60+
return true
61+
}
62+
5963
type mockOriginProxy struct{}
6064

6165
func (moc *mockOriginProxy) ProxyHTTP(

connection/quic.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func NewQUICConnection(
7575
sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity)
7676
datagramMuxer := quicpogs.NewDatagramMuxerV2(session, logger, sessionDemuxChan)
7777
sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, sessionDemuxChan)
78-
packetRouter := packet.NewRouter(packetRouterConfig, datagramMuxer, &returnPipe{muxer: datagramMuxer}, logger)
78+
packetRouter := packet.NewRouter(packetRouterConfig, datagramMuxer, &returnPipe{muxer: datagramMuxer}, logger, orchestrator.WarpRoutingEnabled)
7979

8080
return &QUICConnection{
8181
session: session,

orchestration/orchestrator.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ type Orchestrator struct {
2727
// Used by UpdateConfig to make sure one update at a time
2828
lock sync.RWMutex
2929
// Underlying value is proxy.Proxy, can be read without the lock, but still needs the lock to update
30-
proxy atomic.Value
31-
config *Config
32-
tags []tunnelpogs.Tag
33-
log *zerolog.Logger
30+
proxy atomic.Value
31+
// TODO: TUN-6815 Use atomic.Bool once we upgrade to go 1.19. 1 Means enabled and 0 means disabled
32+
warpRoutingEnabled uint32
33+
config *Config
34+
tags []tunnelpogs.Tag
35+
log *zerolog.Logger
3436

3537
// orchestrator must not handle any more updates after shutdownC is closed
3638
shutdownC <-chan struct{}
@@ -122,6 +124,11 @@ func (o *Orchestrator) updateIngress(ingressRules ingress.Ingress, warpRouting i
122124
o.proxy.Store(newProxy)
123125
o.config.Ingress = &ingressRules
124126
o.config.WarpRouting = warpRouting
127+
if warpRouting.Enabled {
128+
atomic.StoreUint32(&o.warpRoutingEnabled, 1)
129+
} else {
130+
atomic.StoreUint32(&o.warpRoutingEnabled, 0)
131+
}
125132

126133
// If proxyShutdownC is nil, there is no previous running proxy
127134
if o.proxyShutdownC != nil {
@@ -190,6 +197,14 @@ func (o *Orchestrator) GetOriginProxy() (connection.OriginProxy, error) {
190197
return proxy, nil
191198
}
192199

200+
// TODO: TUN-6815 consider storing WarpRouting.Enabled as atomic.Bool once we upgrade to go 1.19
201+
func (o *Orchestrator) WarpRoutingEnabled() (enabled bool) {
202+
if atomic.LoadUint32(&o.warpRoutingEnabled) == 0 {
203+
return false
204+
}
205+
return true
206+
}
207+
193208
func (o *Orchestrator) waitToCloseLastProxy() {
194209
<-o.shutdownC
195210
o.lock.Lock()

orchestration/orchestrator_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func TestUpdateConfiguration(t *testing.T) {
5555
initOriginProxy, err := orchestrator.GetOriginProxy()
5656
require.NoError(t, err)
5757
require.IsType(t, &proxy.Proxy{}, initOriginProxy)
58+
require.False(t, orchestrator.WarpRoutingEnabled())
5859

5960
configJSONV2 := []byte(`
6061
{
@@ -122,6 +123,7 @@ func TestUpdateConfiguration(t *testing.T) {
122123
require.Equal(t, false, configV2.Ingress.Rules[2].Config.NoTLSVerify)
123124
require.Equal(t, true, configV2.Ingress.Rules[2].Config.NoHappyEyeballs)
124125
require.True(t, configV2.WarpRouting.Enabled)
126+
require.Equal(t, configV2.WarpRouting.Enabled, orchestrator.WarpRoutingEnabled())
125127
require.Equal(t, configV2.WarpRouting.ConnectTimeout.Duration, 10*time.Second)
126128

127129
originProxyV2, err := orchestrator.GetOriginProxy()
@@ -166,6 +168,7 @@ func TestUpdateConfiguration(t *testing.T) {
166168
require.True(t, configV10.Ingress.Rules[0].Matches("blogs.tunnel.io", "/2022/02/10"))
167169
require.Equal(t, ingress.HelloWorldService, configV10.Ingress.Rules[0].Service.String())
168170
require.False(t, configV10.WarpRouting.Enabled)
171+
require.Equal(t, configV10.WarpRouting.Enabled, orchestrator.WarpRoutingEnabled())
169172

170173
originProxyV10, err := orchestrator.GetOriginProxy()
171174
require.NoError(t, err)

packet/router.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ type Upstream interface {
2323

2424
// Router routes packets between Upstream and ICMPRouter. Currently it rejects all other type of ICMP packets
2525
type Router struct {
26-
upstream Upstream
27-
returnPipe FunnelUniPipe
28-
globalConfig *GlobalRouterConfig
29-
logger *zerolog.Logger
26+
upstream Upstream
27+
returnPipe FunnelUniPipe
28+
globalConfig *GlobalRouterConfig
29+
logger *zerolog.Logger
30+
checkRouterEnabledFunc func() bool
3031
}
3132

3233
// GlobalRouterConfig is the configuration shared by all instance of Router.
@@ -37,12 +38,13 @@ type GlobalRouterConfig struct {
3738
Zone string
3839
}
3940

40-
func NewRouter(globalConfig *GlobalRouterConfig, upstream Upstream, returnPipe FunnelUniPipe, logger *zerolog.Logger) *Router {
41+
func NewRouter(globalConfig *GlobalRouterConfig, upstream Upstream, returnPipe FunnelUniPipe, logger *zerolog.Logger, checkRouterEnabledFunc func() bool) *Router {
4142
return &Router{
42-
upstream: upstream,
43-
returnPipe: returnPipe,
44-
globalConfig: globalConfig,
45-
logger: logger,
43+
upstream: upstream,
44+
returnPipe: returnPipe,
45+
globalConfig: globalConfig,
46+
logger: logger,
47+
checkRouterEnabledFunc: checkRouterEnabledFunc,
4648
}
4749
}
4850

@@ -54,10 +56,16 @@ func (r *Router) Serve(ctx context.Context) error {
5456
if err != nil {
5557
return err
5658
}
59+
5760
// Drop packets if ICMPRouter wasn't created
5861
if r.globalConfig == nil {
5962
continue
6063
}
64+
65+
if enabled := r.checkRouterEnabledFunc(); !enabled {
66+
continue
67+
}
68+
6169
icmpPacket, err := icmpDecoder.Decode(rawPacket)
6270
if err != nil {
6371
r.logger.Err(err).Msg("Failed to decode ICMP packet from quic datagram")

packet/router_test.go

Lines changed: 100 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import (
55
"context"
66
"fmt"
77
"net/netip"
8+
"sync/atomic"
89
"testing"
10+
"time"
911

1012
"github.com/google/gopacket/layers"
1113
"github.com/rs/zerolog"
@@ -16,7 +18,12 @@ import (
1618
)
1719

1820
var (
19-
noopLogger = zerolog.Nop()
21+
noopLogger = zerolog.Nop()
22+
packetConfig = &GlobalRouterConfig{
23+
ICMPRouter: &mockICMPRouter{},
24+
IPv4Src: netip.MustParseAddr("172.16.0.1"),
25+
IPv6Src: netip.MustParseAddr("fd51:2391:523:f4ee::1"),
26+
}
2027
)
2128

2229
func TestRouterReturnTTLExceed(t *testing.T) {
@@ -26,12 +33,9 @@ func TestRouterReturnTTLExceed(t *testing.T) {
2633
returnPipe := &mockFunnelUniPipe{
2734
uniPipe: make(chan RawPacket),
2835
}
29-
packetConfig := &GlobalRouterConfig{
30-
ICMPRouter: &mockICMPRouter{},
31-
IPv4Src: netip.MustParseAddr("172.16.0.1"),
32-
IPv6Src: netip.MustParseAddr("fd51:2391:523:f4ee::1"),
33-
}
34-
router := NewRouter(packetConfig, upstream, returnPipe, &noopLogger)
36+
routerEnabled := &routerEnabledChecker{}
37+
routerEnabled.set(true)
38+
router := NewRouter(packetConfig, upstream, returnPipe, &noopLogger, routerEnabled.isEnabled)
3539
ctx, cancel := context.WithCancel(context.Background())
3640
routerStopped := make(chan struct{})
3741
go func() {
@@ -80,12 +84,71 @@ func TestRouterReturnTTLExceed(t *testing.T) {
8084
<-routerStopped
8185
}
8286

87+
func TestRouterCheckEnabled(t *testing.T) {
88+
upstream := &mockUpstream{
89+
source: make(chan RawPacket),
90+
}
91+
returnPipe := &mockFunnelUniPipe{
92+
uniPipe: make(chan RawPacket),
93+
}
94+
routerEnabled := &routerEnabledChecker{}
95+
router := NewRouter(packetConfig, upstream, returnPipe, &noopLogger, routerEnabled.isEnabled)
96+
ctx, cancel := context.WithCancel(context.Background())
97+
routerStopped := make(chan struct{})
98+
go func() {
99+
router.Serve(ctx)
100+
close(routerStopped)
101+
}()
102+
103+
pk := ICMP{
104+
IP: &IP{
105+
Src: netip.MustParseAddr("192.168.1.1"),
106+
Dst: netip.MustParseAddr("10.0.0.1"),
107+
Protocol: layers.IPProtocolICMPv4,
108+
TTL: 1,
109+
},
110+
Message: &icmp.Message{
111+
Type: ipv4.ICMPTypeEcho,
112+
Code: 0,
113+
Body: &icmp.Echo{
114+
ID: 12481,
115+
Seq: 8036,
116+
Data: []byte(t.Name()),
117+
},
118+
},
119+
}
120+
121+
// router is disabled
122+
require.NoError(t, upstream.send(&pk))
123+
select {
124+
case <-time.After(time.Millisecond * 10):
125+
case <-returnPipe.uniPipe:
126+
t.Error("Unexpected reply when router is disabled")
127+
}
128+
routerEnabled.set(true)
129+
// router is enabled, expects reply
130+
require.NoError(t, upstream.send(&pk))
131+
<-returnPipe.uniPipe
132+
133+
routerEnabled.set(false)
134+
// router is disabled
135+
require.NoError(t, upstream.send(&pk))
136+
select {
137+
case <-time.After(time.Millisecond * 10):
138+
case <-returnPipe.uniPipe:
139+
t.Error("Unexpected reply when router is disabled")
140+
}
141+
142+
cancel()
143+
<-routerStopped
144+
}
145+
83146
func assertTTLExceed(t *testing.T, originalPacket *ICMP, expectedSrc netip.Addr, upstream *mockUpstream, returnPipe *mockFunnelUniPipe) {
84147
encoder := NewEncoder()
85148
rawPacket, err := encoder.Encode(originalPacket)
86149
require.NoError(t, err)
87-
88150
upstream.source <- rawPacket
151+
89152
resp := <-returnPipe.uniPipe
90153
decoder := NewICMPDecoder()
91154
decoded, err := decoder.Decode(resp)
@@ -111,6 +174,16 @@ type mockUpstream struct {
111174
source chan RawPacket
112175
}
113176

177+
func (ms *mockUpstream) send(pk Packet) error {
178+
encoder := NewEncoder()
179+
rawPacket, err := encoder.Encode(pk)
180+
if err != nil {
181+
return err
182+
}
183+
ms.source <- rawPacket
184+
return nil
185+
}
186+
114187
func (ms *mockUpstream) ReceivePacket(ctx context.Context) (RawPacket, error) {
115188
select {
116189
case <-ctx.Done():
@@ -129,3 +202,22 @@ func (mir mockICMPRouter) Serve(ctx context.Context) error {
129202
func (mir mockICMPRouter) Request(pk *ICMP, responder FunnelUniPipe) error {
130203
return fmt.Errorf("Request not implemented by mockICMPRouter")
131204
}
205+
206+
type routerEnabledChecker struct {
207+
enabled uint32
208+
}
209+
210+
func (rec *routerEnabledChecker) isEnabled() bool {
211+
if atomic.LoadUint32(&rec.enabled) == 0 {
212+
return false
213+
}
214+
return true
215+
}
216+
217+
func (rec *routerEnabledChecker) set(enabled bool) {
218+
if enabled {
219+
atomic.StoreUint32(&rec.enabled, 1)
220+
} else {
221+
atomic.StoreUint32(&rec.enabled, 0)
222+
}
223+
}

0 commit comments

Comments
 (0)