Skip to content

Commit b639b66

Browse files
committed
TUN-6744: On posix platforms, assign unique echo ID per (src, dst, echo ID)
This also refactor FunnelTracker to provide a GetOrRegister method to prevent race condition
1 parent e454994 commit b639b66

File tree

7 files changed

+265
-135
lines changed

7 files changed

+265
-135
lines changed

ingress/icmp_darwin.go

Lines changed: 62 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ type icmpProxy struct {
3737
// then from the beginning to lastAssignment.
3838
// ICMP echo are short lived. By the time an ID is revisited, it should have been released.
3939
type echoIDTracker struct {
40-
lock sync.RWMutex
41-
// maps the source IP to an echo ID obtained from assignment
42-
srcIPMapping map[netip.Addr]uint16
40+
lock sync.Mutex
41+
// maps the source IP, destination IP and original echo ID to a unique echo ID obtained from assignment
42+
mapping map[flow3Tuple]uint16
4343
// assignment tracks if an ID is assigned using index as the ID
4444
// The size of the array is math.MaxUint16 because echo ID is 2 bytes
4545
assignment [math.MaxUint16]bool
@@ -49,20 +49,18 @@ type echoIDTracker struct {
4949

5050
func newEchoIDTracker() *echoIDTracker {
5151
return &echoIDTracker{
52-
srcIPMapping: make(map[netip.Addr]uint16),
52+
mapping: make(map[flow3Tuple]uint16),
5353
}
5454
}
5555

56-
func (eit *echoIDTracker) get(srcIP netip.Addr) (uint16, bool) {
57-
eit.lock.RLock()
58-
defer eit.lock.RUnlock()
59-
id, ok := eit.srcIPMapping[srcIP]
60-
return id, ok
61-
}
62-
63-
func (eit *echoIDTracker) assign(srcIP netip.Addr) (uint16, bool) {
56+
// Get assignment or assign a new ID.
57+
func (eit *echoIDTracker) getOrAssign(key flow3Tuple) (id uint16, success bool) {
6458
eit.lock.Lock()
6559
defer eit.lock.Unlock()
60+
id, exists := eit.mapping[key]
61+
if exists {
62+
return id, true
63+
}
6664

6765
if eit.nextAssignment == math.MaxUint16 {
6866
eit.nextAssignment = 0
@@ -71,35 +69,35 @@ func (eit *echoIDTracker) assign(srcIP netip.Addr) (uint16, bool) {
7169
for i, assigned := range eit.assignment[eit.nextAssignment:] {
7270
if !assigned {
7371
echoID := uint16(i) + eit.nextAssignment
74-
eit.set(srcIP, echoID)
72+
eit.set(key, echoID)
7573
return echoID, true
7674
}
7775
}
7876
for i, assigned := range eit.assignment[0:eit.nextAssignment] {
7977
if !assigned {
8078
echoID := uint16(i)
81-
eit.set(srcIP, echoID)
79+
eit.set(key, echoID)
8280
return echoID, true
8381
}
8482
}
8583
return 0, false
8684
}
8785

8886
// Caller should hold the lock
89-
func (eit *echoIDTracker) set(srcIP netip.Addr, echoID uint16) {
90-
eit.assignment[echoID] = true
91-
eit.srcIPMapping[srcIP] = echoID
92-
eit.nextAssignment = echoID + 1
87+
func (eit *echoIDTracker) set(key flow3Tuple, assignedEchoID uint16) {
88+
eit.assignment[assignedEchoID] = true
89+
eit.mapping[key] = assignedEchoID
90+
eit.nextAssignment = assignedEchoID + 1
9391
}
9492

95-
func (eit *echoIDTracker) release(srcIP netip.Addr, id uint16) bool {
93+
func (eit *echoIDTracker) release(key flow3Tuple, assigned uint16) bool {
9694
eit.lock.Lock()
9795
defer eit.lock.Unlock()
9896

99-
currentID, exists := eit.srcIPMapping[srcIP]
100-
if exists && id == currentID {
101-
delete(eit.srcIPMapping, srcIP)
102-
eit.assignment[id] = false
97+
currentEchoID, exists := eit.mapping[key]
98+
if exists && assigned == currentEchoID {
99+
delete(eit.mapping, key)
100+
eit.assignment[assigned] = false
103101
return true
104102
}
105103
return false
@@ -134,33 +132,46 @@ func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) er
134132
if pk == nil {
135133
return errPacketNil
136134
}
135+
originalEcho, err := getICMPEcho(pk.Message)
136+
if err != nil {
137+
return err
138+
}
139+
echoIDTrackerKey := flow3Tuple{
140+
srcIP: pk.Src,
141+
dstIP: pk.Dst,
142+
originalEchoID: originalEcho.ID,
143+
}
137144
// TODO: TUN-6744 assign unique flow per (src, echo ID)
138-
echoID, exists := ip.echoIDTracker.get(pk.Src)
139-
if !exists {
145+
assignedEchoID, success := ip.echoIDTracker.getOrAssign(echoIDTrackerKey)
146+
if !success {
147+
return fmt.Errorf("failed to assign unique echo ID")
148+
}
149+
newFunnelFunc := func() (packet.Funnel, error) {
140150
originalEcho, err := getICMPEcho(pk.Message)
141151
if err != nil {
142-
return err
152+
return nil, err
143153
}
144-
echoID, exists = ip.echoIDTracker.assign(pk.Src)
145-
if !exists {
146-
return fmt.Errorf("failed to assign unique echo ID")
147-
}
148-
funnelID := echoFunnelID(echoID)
149154
originSender := originSender{
150-
conn: ip.conn,
151-
echoIDTracker: ip.echoIDTracker,
152-
srcIP: pk.Src,
153-
echoID: echoID,
154-
}
155-
icmpFlow := newICMPEchoFlow(pk.Src, &originSender, responder, int(echoID), originalEcho.ID, ip.encoder)
156-
if replaced := ip.srcFunnelTracker.Register(funnelID, icmpFlow); replaced {
157-
ip.logger.Info().Str("src", pk.Src.String()).Msg("Replaced funnel")
155+
conn: ip.conn,
156+
echoIDTracker: ip.echoIDTracker,
157+
echoIDTrackerKey: echoIDTrackerKey,
158+
assignedEchoID: assignedEchoID,
158159
}
159-
return icmpFlow.sendToDst(pk.Dst, pk.Message)
160+
icmpFlow := newICMPEchoFlow(pk.Src, &originSender, responder, int(assignedEchoID), originalEcho.ID, ip.encoder)
161+
return icmpFlow, nil
160162
}
161-
funnel, exists := ip.srcFunnelTracker.Get(echoFunnelID(echoID))
162-
if !exists {
163-
return packet.ErrFunnelNotFound
163+
funnelID := echoFunnelID(assignedEchoID)
164+
funnel, isNew, err := ip.srcFunnelTracker.GetOrRegister(funnelID, newFunnelFunc)
165+
if err != nil {
166+
return err
167+
}
168+
if isNew {
169+
ip.logger.Debug().
170+
Str("src", pk.Src.String()).
171+
Str("dst", pk.Dst.String()).
172+
Int("originalEchoID", originalEcho.ID).
173+
Int("assignedEchoID", int(assignedEchoID)).
174+
Msg("New flow")
164175
}
165176
icmpFlow, err := toICMPEchoFlow(funnel)
166177
if err != nil {
@@ -199,7 +210,7 @@ func (ip *icmpProxy) Serve(ctx context.Context) error {
199210
ip.logger.Debug().Str("dst", from.String()).Msgf("Drop ICMP %s from reply", reply.msg.Type)
200211
continue
201212
}
202-
if ip.sendReply(reply); err != nil {
213+
if err := ip.sendReply(reply); err != nil {
203214
ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply")
204215
continue
205216
}
@@ -227,7 +238,8 @@ func (ip *icmpProxy) handleFullPacket(decoder *packet.ICMPDecoder, rawPacket []b
227238
}
228239

229240
func (ip *icmpProxy) sendReply(reply *echoReply) error {
230-
funnel, ok := ip.srcFunnelTracker.Get(echoFunnelID(reply.echo.ID))
241+
funnelID := echoFunnelID(reply.echo.ID)
242+
funnel, ok := ip.srcFunnelTracker.Get(funnelID)
231243
if !ok {
232244
return packet.ErrFunnelNotFound
233245
}
@@ -240,10 +252,10 @@ func (ip *icmpProxy) sendReply(reply *echoReply) error {
240252

241253
// originSender wraps icmp.PacketConn to implement packet.FunnelUniPipe interface
242254
type originSender struct {
243-
conn *icmp.PacketConn
244-
echoIDTracker *echoIDTracker
245-
srcIP netip.Addr
246-
echoID uint16
255+
conn *icmp.PacketConn
256+
echoIDTracker *echoIDTracker
257+
echoIDTrackerKey flow3Tuple
258+
assignedEchoID uint16
247259
}
248260

249261
func (os *originSender) SendPacket(dst netip.Addr, pk packet.RawPacket) error {
@@ -254,6 +266,6 @@ func (os *originSender) SendPacket(dst netip.Addr, pk packet.RawPacket) error {
254266
}
255267

256268
func (os *originSender) Close() error {
257-
os.echoIDTracker.release(os.srcIP, os.echoID)
269+
os.echoIDTracker.release(os.echoIDTrackerKey, os.assignedEchoID)
258270
return nil
259271
}

ingress/icmp_darwin_test.go

Lines changed: 56 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,80 +12,110 @@ import (
1212

1313
func TestSingleEchoIDTracker(t *testing.T) {
1414
tracker := newEchoIDTracker()
15-
srcIP := netip.MustParseAddr("127.0.0.1")
16-
echoID, ok := tracker.get(srcIP)
17-
require.False(t, ok)
18-
require.Equal(t, uint16(0), echoID)
15+
key := flow3Tuple{
16+
srcIP: netip.MustParseAddr("172.16.0.1"),
17+
dstIP: netip.MustParseAddr("172.16.0.2"),
18+
originalEchoID: 5182,
19+
}
1920

2021
// not assigned yet, so nothing to release
21-
require.False(t, tracker.release(srcIP, echoID))
22+
require.False(t, tracker.release(key, 0))
2223

23-
echoID, ok = tracker.assign(srcIP)
24+
echoID, ok := tracker.getOrAssign(key)
2425
require.True(t, ok)
2526
require.Equal(t, uint16(0), echoID)
2627

27-
echoID, ok = tracker.get(srcIP)
28+
// Second time should return the same echo ID
29+
echoID, ok = tracker.getOrAssign(key)
2830
require.True(t, ok)
2931
require.Equal(t, uint16(0), echoID)
3032

3133
// releasing a different ID returns false
32-
require.False(t, tracker.release(srcIP, 1999))
33-
require.True(t, tracker.release(srcIP, echoID))
34+
require.False(t, tracker.release(key, 1999))
35+
require.True(t, tracker.release(key, echoID))
3436
// releasing the second time returns false
35-
require.False(t, tracker.release(srcIP, echoID))
36-
37-
echoID, ok = tracker.get(srcIP)
38-
require.False(t, ok)
39-
require.Equal(t, uint16(0), echoID)
37+
require.False(t, tracker.release(key, echoID))
4038

4139
// Move to the next IP
42-
echoID, ok = tracker.assign(srcIP)
40+
echoID, ok = tracker.getOrAssign(key)
4341
require.True(t, ok)
4442
require.Equal(t, uint16(1), echoID)
4543
}
4644

4745
func TestFullEchoIDTracker(t *testing.T) {
46+
var (
47+
dstIP = netip.MustParseAddr("192.168.0.1")
48+
originalEchoID = 41820
49+
)
4850
tracker := newEchoIDTracker()
49-
firstIP := netip.MustParseAddr("172.16.0.1")
50-
srcIP := firstIP
51+
firstSrcIP := netip.MustParseAddr("172.16.0.1")
52+
srcIP := firstSrcIP
5153

5254
for i := uint16(0); i < math.MaxUint16; i++ {
53-
echoID, ok := tracker.assign(srcIP)
55+
key := flow3Tuple{
56+
srcIP: srcIP,
57+
dstIP: dstIP,
58+
originalEchoID: originalEchoID,
59+
}
60+
echoID, ok := tracker.getOrAssign(key)
5461
require.True(t, ok)
5562
require.Equal(t, i, echoID)
5663

57-
echoID, ok = tracker.get(srcIP)
64+
echoID, ok = tracker.get(key)
5865
require.True(t, ok)
5966
require.Equal(t, i, echoID)
67+
6068
srcIP = srcIP.Next()
6169
}
6270

71+
key := flow3Tuple{
72+
srcIP: srcIP.Next(),
73+
dstIP: dstIP,
74+
originalEchoID: originalEchoID,
75+
}
6376
// All echo IDs are assigned
64-
echoID, ok := tracker.assign(srcIP.Next())
77+
echoID, ok := tracker.getOrAssign(key)
6578
require.False(t, ok)
6679
require.Equal(t, uint16(0), echoID)
6780

68-
srcIP = firstIP
81+
srcIP = firstSrcIP
6982
for i := uint16(0); i < math.MaxUint16; i++ {
70-
ok := tracker.release(srcIP, i)
83+
key := flow3Tuple{
84+
srcIP: srcIP,
85+
dstIP: dstIP,
86+
originalEchoID: originalEchoID,
87+
}
88+
ok := tracker.release(key, i)
7189
require.True(t, ok)
7290

73-
echoID, ok = tracker.get(srcIP)
91+
echoID, ok = tracker.get(key)
7492
require.False(t, ok)
7593
require.Equal(t, uint16(0), echoID)
7694
srcIP = srcIP.Next()
7795
}
7896

7997
// The IDs are assignable again
80-
srcIP = firstIP
98+
srcIP = firstSrcIP
8199
for i := uint16(0); i < math.MaxUint16; i++ {
82-
echoID, ok := tracker.assign(srcIP)
100+
key := flow3Tuple{
101+
srcIP: srcIP,
102+
dstIP: dstIP,
103+
originalEchoID: originalEchoID,
104+
}
105+
echoID, ok := tracker.getOrAssign(key)
83106
require.True(t, ok)
84107
require.Equal(t, i, echoID)
85108

86-
echoID, ok = tracker.get(srcIP)
109+
echoID, ok = tracker.get(key)
87110
require.True(t, ok)
88111
require.Equal(t, i, echoID)
89112
srcIP = srcIP.Next()
90113
}
91114
}
115+
116+
func (eit *echoIDTracker) get(key flow3Tuple) (id uint16, exist bool) {
117+
eit.lock.Lock()
118+
defer eit.lock.Unlock()
119+
id, exists := eit.mapping[key]
120+
return id, exists
121+
}

0 commit comments

Comments
 (0)