Skip to content

Commit d90f474

Browse files
committed
Add stream resolver failover and runtime packet duplication
1 parent 9874bd9 commit d90f474

File tree

11 files changed

+604
-181
lines changed

11 files changed

+604
-181
lines changed

client_config.toml.simple

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ RECHECK_INACTIVE_INTERVAL_SECONDS = 1800.0
7272
RECHECK_SERVER_INTERVAL_SECONDS = 3.0
7373
RECHECK_BATCH_SIZE = 5
7474

75+
PACKET_DUPLICATION_COUNT = 1
76+
SETUP_PACKET_DUPLICATION_COUNT = 2
77+
STREAM_RESOLVER_FAILOVER_RESEND_THRESHOLD = 2
78+
STREAM_RESOLVER_FAILOVER_COOLDOWN = 1.0
79+
7580
# ------------------------------------------------------------------------------
7681
# 6) Stream & Control Queue Limits
7782
# ------------------------------------------------------------------------------

internal/client/balancer.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,14 @@ func (b *Balancer) GetUniqueConnections(requiredCount int) []Connection {
222222
}
223223
}
224224

225+
func (b *Balancer) GetAllValidConnections() []Connection {
226+
snap := b.snapshot.Load()
227+
if snap == nil || len(snap.valid) == 0 {
228+
return nil
229+
}
230+
return snapshotConnections(snap.connections, snap.valid)
231+
}
232+
225233
func rebuildValidIndices(connections []*Connection) []int {
226234
valid := make([]int, 0, len(connections))
227235
for idx, conn := range connections {

internal/client/client.go

Lines changed: 69 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -70,24 +70,28 @@ type Client struct {
7070
mtuAddedServerLogFormat string
7171
mtuOutputMu sync.Mutex
7272

73-
exchangeQueryFn func(Connection, []byte, time.Duration) ([]byte, error)
74-
sendOneWayPacketFn func(Connection, []byte, time.Time) error
75-
fragmentLimits sync.Map
76-
stream0Runtime *stream0Runtime
77-
streamsMu sync.RWMutex
78-
streams map[uint16]*clientStream
79-
mtuTestRetries int
80-
mtuTestTimeout time.Duration
81-
streamTXWindow int
82-
streamTXQueueLimit int
83-
streamTXMaxRetries int
84-
streamTXTTL time.Duration
85-
resolverHealthMu sync.Mutex
86-
resolverHealth map[string]*resolverHealthState
87-
resolverRecheck map[string]resolverRecheckState
88-
runtimeDisabled map[string]resolverDisabledState
89-
healthRuntimeRun bool
90-
recheckConnectionFn func(*Connection) bool
73+
exchangeQueryFn func(Connection, []byte, time.Duration) ([]byte, error)
74+
sendOneWayPacketFn func(Connection, []byte, time.Time) error
75+
fragmentLimits sync.Map
76+
stream0Runtime *stream0Runtime
77+
streamsMu sync.RWMutex
78+
streams map[uint16]*clientStream
79+
mtuTestRetries int
80+
mtuTestTimeout time.Duration
81+
packetDuplicationCount int
82+
setupPacketDuplicationCount int
83+
streamResolverFailoverResendThreshold int
84+
streamResolverFailoverCooldown time.Duration
85+
streamTXWindow int
86+
streamTXQueueLimit int
87+
streamTXMaxRetries int
88+
streamTXTTL time.Duration
89+
resolverHealthMu sync.Mutex
90+
resolverHealth map[string]*resolverHealthState
91+
resolverRecheck map[string]resolverRecheckState
92+
runtimeDisabled map[string]resolverDisabledState
93+
healthRuntimeRun bool
94+
recheckConnectionFn func(*Connection) bool
9195

9296
reconnectSignal chan struct{}
9397
reconnectPending atomic.Bool
@@ -113,27 +117,30 @@ type Connection struct {
113117
}
114118

115119
type clientStream struct {
116-
mu sync.Mutex
117-
ID uint16
118-
Conn net.Conn
119-
NextSequence uint16
120-
LocalFinSent bool
121-
RemoteFinRecv bool
122-
ResetSent bool
123-
Closed bool
124-
LastActivityAt time.Time
125-
InboundDataSeq uint16
126-
InboundDataSet bool
127-
RemoteFinSeq uint16
128-
RemoteFinSet bool
129-
TXQueue []clientStreamTXPacket
130-
TXInFlight []clientStreamTXPacket
131-
TXWake chan struct{}
132-
StopCh chan struct{}
133-
stopOnce sync.Once
134-
retryBase time.Duration
135-
srtt time.Duration
136-
rttVar time.Duration
120+
mu sync.Mutex
121+
ID uint16
122+
Conn net.Conn
123+
NextSequence uint16
124+
LocalFinSent bool
125+
RemoteFinRecv bool
126+
ResetSent bool
127+
Closed bool
128+
LastActivityAt time.Time
129+
InboundDataSeq uint16
130+
InboundDataSet bool
131+
RemoteFinSeq uint16
132+
RemoteFinSet bool
133+
PreferredServerKey string
134+
ResolverResendStreak int
135+
LastResolverFailover time.Time
136+
TXQueue []clientStreamTXPacket
137+
TXInFlight []clientStreamTXPacket
138+
TXWake chan struct{}
139+
StopCh chan struct{}
140+
stopOnce sync.Once
141+
retryBase time.Duration
142+
srtt time.Duration
143+
rttVar time.Duration
137144
}
138145

139146
type clientStreamTXPacket struct {
@@ -185,13 +192,17 @@ func New(cfg config.ClientConfig, log *logger.Logger, codec *security.Codec) *Cl
185192
localDNSCacheFlushTick: time.Duration(
186193
cfg.LocalDNSCacheFlushSec * float64(time.Second),
187194
),
188-
localDNSFragTTL: time.Duration(cfg.LocalDNSFragmentTimeoutSec * float64(time.Second)),
189-
streams: make(map[uint16]*clientStream, 16),
190-
mtuTestRetries: cfg.MTUTestRetries,
191-
mtuTestTimeout: time.Duration(cfg.MTUTestTimeout * float64(time.Second)),
192-
mtuCryptoOverhead: mtuCryptoOverhead(cfg.DataEncryptionMethod),
193-
mtuSaveToFile: cfg.SaveMTUServersToFile,
194-
mtuServersFileName: strings.TrimSpace(cfg.MTUServersFileName),
195+
localDNSFragTTL: time.Duration(cfg.LocalDNSFragmentTimeoutSec * float64(time.Second)),
196+
streams: make(map[uint16]*clientStream, 16),
197+
mtuTestRetries: cfg.MTUTestRetries,
198+
mtuTestTimeout: time.Duration(cfg.MTUTestTimeout * float64(time.Second)),
199+
packetDuplicationCount: cfg.PacketDuplicationCount,
200+
setupPacketDuplicationCount: cfg.SetupPacketDuplicationCount,
201+
streamResolverFailoverResendThreshold: cfg.StreamResolverFailoverResendThreshold,
202+
streamResolverFailoverCooldown: time.Duration(cfg.StreamResolverFailoverCooldownSec * float64(time.Second)),
203+
mtuCryptoOverhead: mtuCryptoOverhead(cfg.DataEncryptionMethod),
204+
mtuSaveToFile: cfg.SaveMTUServersToFile,
205+
mtuServersFileName: strings.TrimSpace(cfg.MTUServersFileName),
195206
mtuServersFileFormat: strings.TrimSpace(
196207
cfg.MTUServersFileFormat,
197208
),
@@ -229,6 +240,18 @@ func New(cfg config.ClientConfig, log *logger.Logger, codec *security.Codec) *Cl
229240
if c.mtuTestTimeout <= 0 {
230241
c.mtuTestTimeout = time.Second
231242
}
243+
if c.packetDuplicationCount < 1 {
244+
c.packetDuplicationCount = 1
245+
}
246+
if c.setupPacketDuplicationCount < c.packetDuplicationCount {
247+
c.setupPacketDuplicationCount = c.packetDuplicationCount
248+
}
249+
if c.streamResolverFailoverResendThreshold < 1 {
250+
c.streamResolverFailoverResendThreshold = 1
251+
}
252+
if c.streamResolverFailoverCooldown <= 0 {
253+
c.streamResolverFailoverCooldown = time.Second
254+
}
232255

233256
c.ResetRuntimeState(true)
234257
c.uploadCompression = uint8(cfg.UploadCompressionType)

internal/client/client_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1904,6 +1904,93 @@ func TestStream0RuntimeProcessDequeueHandlesServerDrop(t *testing.T) {
19041904
}
19051905
}
19061906

1907+
func TestSelectTargetConnectionsForPacketPrefersStickyStreamResolver(t *testing.T) {
1908+
c := New(config.ClientConfig{
1909+
PacketDuplicationCount: 2,
1910+
}, nil, nil)
1911+
c.connections = []Connection{
1912+
{Key: "a", Domain: "a.example.com", Resolver: "1.1.1.1", ResolverPort: 53, ResolverLabel: "1.1.1.1:53", IsValid: true},
1913+
{Key: "b", Domain: "b.example.com", Resolver: "2.2.2.2", ResolverPort: 53, ResolverLabel: "2.2.2.2:53", IsValid: true},
1914+
{Key: "c", Domain: "c.example.com", Resolver: "3.3.3.3", ResolverPort: 53, ResolverLabel: "3.3.3.3:53", IsValid: true},
1915+
}
1916+
c.connectionsByKey = map[string]int{"a": 0, "b": 1, "c": 2}
1917+
c.rebuildBalancer()
1918+
1919+
stream := c.createStream(11, nil)
1920+
defer c.deleteStream(stream.ID)
1921+
1922+
stream.mu.Lock()
1923+
stream.PreferredServerKey = "b"
1924+
stream.LastResolverFailover = time.Now()
1925+
stream.mu.Unlock()
1926+
1927+
selected, err := c.selectTargetConnectionsForPacket(Enums.PACKET_STREAM_DATA, stream.ID)
1928+
if err != nil {
1929+
t.Fatalf("selectTargetConnectionsForPacket returned error: %v", err)
1930+
}
1931+
if len(selected) != 2 {
1932+
t.Fatalf("unexpected selected count: got=%d want=2", len(selected))
1933+
}
1934+
if selected[0].Key != "b" {
1935+
t.Fatalf("expected preferred resolver first, got=%q", selected[0].Key)
1936+
}
1937+
}
1938+
1939+
func TestSelectTargetConnectionsForPacketFailsOverOnResendStreak(t *testing.T) {
1940+
c := New(config.ClientConfig{
1941+
PacketDuplicationCount: 1,
1942+
StreamResolverFailoverResendThreshold: 1,
1943+
StreamResolverFailoverCooldownSec: 0.1,
1944+
}, nil, nil)
1945+
c.connections = []Connection{
1946+
{Key: "a", Domain: "a.example.com", Resolver: "1.1.1.1", ResolverPort: 53, ResolverLabel: "1.1.1.1:53", IsValid: true},
1947+
{Key: "b", Domain: "b.example.com", Resolver: "2.2.2.2", ResolverPort: 53, ResolverLabel: "2.2.2.2:53", IsValid: true},
1948+
}
1949+
c.connectionsByKey = map[string]int{"a": 0, "b": 1}
1950+
c.rebuildBalancer()
1951+
1952+
stream := c.createStream(12, nil)
1953+
defer c.deleteStream(stream.ID)
1954+
1955+
stream.mu.Lock()
1956+
stream.PreferredServerKey = "a"
1957+
stream.LastResolverFailover = time.Now().Add(-2 * time.Second)
1958+
stream.mu.Unlock()
1959+
1960+
selected, err := c.selectTargetConnectionsForPacket(Enums.PACKET_STREAM_RESEND, stream.ID)
1961+
if err != nil {
1962+
t.Fatalf("selectTargetConnectionsForPacket returned error: %v", err)
1963+
}
1964+
if len(selected) != 1 {
1965+
t.Fatalf("unexpected selected count: got=%d want=1", len(selected))
1966+
}
1967+
if selected[0].Key != "b" {
1968+
t.Fatalf("expected resend failover to switch preferred resolver, got=%q", selected[0].Key)
1969+
}
1970+
}
1971+
1972+
func TestSelectTargetConnectionsForPacketUsesSetupDuplicationCount(t *testing.T) {
1973+
c := New(config.ClientConfig{
1974+
PacketDuplicationCount: 1,
1975+
SetupPacketDuplicationCount: 3,
1976+
}, nil, nil)
1977+
c.connections = []Connection{
1978+
{Key: "a", Domain: "a.example.com", Resolver: "1.1.1.1", ResolverPort: 53, ResolverLabel: "1.1.1.1:53", IsValid: true},
1979+
{Key: "b", Domain: "b.example.com", Resolver: "2.2.2.2", ResolverPort: 53, ResolverLabel: "2.2.2.2:53", IsValid: true},
1980+
{Key: "c", Domain: "c.example.com", Resolver: "3.3.3.3", ResolverPort: 53, ResolverLabel: "3.3.3.3:53", IsValid: true},
1981+
}
1982+
c.connectionsByKey = map[string]int{"a": 0, "b": 1, "c": 2}
1983+
c.rebuildBalancer()
1984+
1985+
selected, err := c.selectTargetConnectionsForPacket(Enums.PACKET_STREAM_SYN, 99)
1986+
if err != nil {
1987+
t.Fatalf("selectTargetConnectionsForPacket returned error: %v", err)
1988+
}
1989+
if len(selected) != 3 {
1990+
t.Fatalf("unexpected selected count: got=%d want=3", len(selected))
1991+
}
1992+
}
1993+
19071994
func extractTestTunnelLabels(qName string, baseDomain string) string {
19081995
suffix := "." + baseDomain
19091996
if len(qName) <= len(suffix) || qName[len(qName)-len(suffix):] != suffix {

internal/client/runtime_helpers.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,41 @@ func tryConnections[T any](connections []Connection, fallbackErr error, fn func(
4545
}
4646
return zero, lastErr
4747
}
48+
49+
func tryConnectionsParallel[T any](connections []Connection, fallbackErr error, fn func(Connection) (T, error)) (T, error) {
50+
var zero T
51+
switch len(connections) {
52+
case 0:
53+
return zero, fallbackErr
54+
case 1:
55+
value, err := fn(connections[0])
56+
if err != nil {
57+
return zero, err
58+
}
59+
return value, nil
60+
}
61+
62+
type connectionResult struct {
63+
value T
64+
err error
65+
}
66+
67+
results := make(chan connectionResult, len(connections))
68+
for _, connection := range connections {
69+
connection := connection
70+
go func() {
71+
value, err := fn(connection)
72+
results <- connectionResult{value: value, err: err}
73+
}()
74+
}
75+
76+
lastErr := fallbackErr
77+
for range connections {
78+
result := <-results
79+
if result.err == nil {
80+
return result.value, nil
81+
}
82+
lastErr = result.err
83+
}
84+
return zero, lastErr
85+
}

internal/client/stream0_runtime.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ func (r *stream0Runtime) processDequeue(packet arq.QueuedPacket) {
389389
}
390390
case Enums.PACKET_STREAM_DATA_ACK, Enums.PACKET_STREAM_FIN_ACK, Enums.PACKET_STREAM_RST_ACK:
391391
r.noteServerDataActivity()
392+
r.client.noteStreamProgress(response.StreamID)
392393
if stream, ok := r.client.getStream(response.StreamID); ok {
393394
ackClientStreamTX(stream, response.SequenceNum, now)
394395
notifyStreamWake(stream)

internal/client/stream_handshake.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,11 @@ func (c *Client) exchangeStreamControlPacket(packetType uint8, streamID uint16,
9797
return VpnProto.Packet{}, ErrStreamHandshakeFailed
9898
}
9999
timeout = normalizeTimeout(timeout, defaultRuntimeTimeout)
100-
connections, err := c.runtimeConnections(nil)
100+
connections, err := c.selectTargetConnectionsForPacket(packetType, streamID)
101101
if err != nil {
102102
return VpnProto.Packet{}, err
103103
}
104-
return tryConnections(connections, ErrStreamHandshakeFailed, func(connection Connection) (VpnProto.Packet, error) {
104+
return tryConnectionsParallel(connections, ErrStreamHandshakeFailed, func(connection Connection) (VpnProto.Packet, error) {
105105
return c.sendStreamControlPacketWithConnection(connection, packetType, streamID, sequenceNum, payload, timeout)
106106
})
107107
}

0 commit comments

Comments
 (0)