Skip to content

Commit 0c6f671

Browse files
authored
Refactor healthcheck sender and receiver to use configurable options (#4433)
1 parent cf7f6c3 commit 0c6f671

File tree

10 files changed

+190
-180
lines changed

10 files changed

+190
-180
lines changed

.github/workflows/golang-test-linux.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ jobs:
217217
- arch: "386"
218218
raceFlag: ""
219219
- arch: "amd64"
220-
raceFlag: ""
220+
raceFlag: "-race"
221221
runs-on: ubuntu-22.04
222222
steps:
223223
- name: Install Go

shared/relay/client/manager.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,10 @@ func NewManager(ctx context.Context, serverURLs []string, peerID string, mtu uin
7878
tokenStore: tokenStore,
7979
mtu: mtu,
8080
serverPicker: &ServerPicker{
81-
TokenStore: tokenStore,
82-
PeerID: peerID,
83-
MTU: mtu,
81+
TokenStore: tokenStore,
82+
PeerID: peerID,
83+
MTU: mtu,
84+
ConnectionTimeout: defaultConnectionTimeout,
8485
},
8586
relayClients: make(map[string]*RelayTrack),
8687
onDisconnectedListeners: make(map[string]*list.List),

shared/relay/client/picker.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,8 @@ import (
1313
)
1414

1515
const (
16-
maxConcurrentServers = 7
17-
)
18-
19-
var (
20-
connectionTimeout = 30 * time.Second
16+
maxConcurrentServers = 7
17+
defaultConnectionTimeout = 30 * time.Second
2118
)
2219

2320
type connResult struct {
@@ -27,14 +24,15 @@ type connResult struct {
2724
}
2825

2926
type ServerPicker struct {
30-
TokenStore *auth.TokenStore
31-
ServerURLs atomic.Value
32-
PeerID string
33-
MTU uint16
27+
TokenStore *auth.TokenStore
28+
ServerURLs atomic.Value
29+
PeerID string
30+
MTU uint16
31+
ConnectionTimeout time.Duration
3432
}
3533

3634
func (sp *ServerPicker) PickServer(parentCtx context.Context) (*Client, error) {
37-
ctx, cancel := context.WithTimeout(parentCtx, connectionTimeout)
35+
ctx, cancel := context.WithTimeout(parentCtx, sp.ConnectionTimeout)
3836
defer cancel()
3937

4038
totalServers := len(sp.ServerURLs.Load().([]string))

shared/relay/client/picker_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ import (
88
)
99

1010
func TestServerPicker_UnavailableServers(t *testing.T) {
11-
connectionTimeout = 5 * time.Second
12-
11+
timeout := 5 * time.Second
1312
sp := ServerPicker{
14-
TokenStore: nil,
15-
PeerID: "test",
13+
TokenStore: nil,
14+
PeerID: "test",
15+
ConnectionTimeout: timeout,
1616
}
1717
sp.ServerURLs.Store([]string{"rel://dummy1", "rel://dummy2"})
1818

19-
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout+1)
19+
ctx, cancel := context.WithTimeout(context.Background(), timeout+1)
2020
defer cancel()
2121

2222
go func() {

shared/relay/healthcheck/env.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package healthcheck
2+
3+
import (
4+
"os"
5+
"strconv"
6+
7+
log "github.com/sirupsen/logrus"
8+
)
9+
10+
const (
11+
defaultAttemptThresholdEnv = "NB_RELAY_HC_ATTEMPT_THRESHOLD"
12+
)
13+
14+
func getAttemptThresholdFromEnv() int {
15+
if attemptThreshold := os.Getenv(defaultAttemptThresholdEnv); attemptThreshold != "" {
16+
threshold, err := strconv.ParseInt(attemptThreshold, 10, 64)
17+
if err != nil {
18+
log.Errorf("Failed to parse attempt threshold from environment variable \"%s\" should be an integer. Using default value", attemptThreshold)
19+
return defaultAttemptThreshold
20+
}
21+
return int(threshold)
22+
}
23+
return defaultAttemptThreshold
24+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package healthcheck
2+
3+
import (
4+
"os"
5+
"testing"
6+
)
7+
8+
//nolint:tenv
9+
func TestGetAttemptThresholdFromEnv(t *testing.T) {
10+
tests := []struct {
11+
name string
12+
envValue string
13+
expected int
14+
}{
15+
{"Default attempt threshold when env is not set", "", defaultAttemptThreshold},
16+
{"Custom attempt threshold when env is set to a valid integer", "3", 3},
17+
{"Default attempt threshold when env is set to an invalid value", "invalid", defaultAttemptThreshold},
18+
}
19+
20+
for _, tt := range tests {
21+
t.Run(tt.name, func(t *testing.T) {
22+
if tt.envValue == "" {
23+
os.Unsetenv(defaultAttemptThresholdEnv)
24+
} else {
25+
os.Setenv(defaultAttemptThresholdEnv, tt.envValue)
26+
}
27+
28+
result := getAttemptThresholdFromEnv()
29+
if result != tt.expected {
30+
t.Fatalf("Expected %d, got %d", tt.expected, result)
31+
}
32+
33+
os.Unsetenv(defaultAttemptThresholdEnv)
34+
})
35+
}
36+
}

shared/relay/healthcheck/receiver.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,15 @@ import (
77
log "github.com/sirupsen/logrus"
88
)
99

10-
var (
11-
heartbeatTimeout = healthCheckInterval + 10*time.Second
10+
const (
11+
defaultHeartbeatTimeout = defaultHealthCheckInterval + 10*time.Second
1212
)
1313

14+
type ReceiverOptions struct {
15+
HeartbeatTimeout time.Duration
16+
AttemptThreshold int
17+
}
18+
1419
// Receiver is a healthcheck receiver
1520
// It will listen for heartbeat and check if the heartbeat is not received in a certain time
1621
// If the heartbeat is not received in a certain time, it will send a timeout signal and stop to work
@@ -27,6 +32,23 @@ type Receiver struct {
2732

2833
// NewReceiver creates a new healthcheck receiver and start the timer in the background
2934
func NewReceiver(log *log.Entry) *Receiver {
35+
opts := ReceiverOptions{
36+
HeartbeatTimeout: defaultHeartbeatTimeout,
37+
AttemptThreshold: getAttemptThresholdFromEnv(),
38+
}
39+
return NewReceiverWithOpts(log, opts)
40+
}
41+
42+
func NewReceiverWithOpts(log *log.Entry, opts ReceiverOptions) *Receiver {
43+
heartbeatTimeout := opts.HeartbeatTimeout
44+
if heartbeatTimeout <= 0 {
45+
heartbeatTimeout = defaultHeartbeatTimeout
46+
}
47+
attemptThreshold := opts.AttemptThreshold
48+
if attemptThreshold <= 0 {
49+
attemptThreshold = defaultAttemptThreshold
50+
}
51+
3052
ctx, ctxCancel := context.WithCancel(context.Background())
3153

3254
r := &Receiver{
@@ -35,10 +57,10 @@ func NewReceiver(log *log.Entry) *Receiver {
3557
ctx: ctx,
3658
ctxCancel: ctxCancel,
3759
heartbeat: make(chan struct{}, 1),
38-
attemptThreshold: getAttemptThresholdFromEnv(),
60+
attemptThreshold: attemptThreshold,
3961
}
4062

41-
go r.waitForHealthcheck()
63+
go r.waitForHealthcheck(heartbeatTimeout)
4264
return r
4365
}
4466

@@ -55,7 +77,7 @@ func (r *Receiver) Stop() {
5577
r.ctxCancel()
5678
}
5779

58-
func (r *Receiver) waitForHealthcheck() {
80+
func (r *Receiver) waitForHealthcheck(heartbeatTimeout time.Duration) {
5981
ticker := time.NewTicker(heartbeatTimeout)
6082
defer ticker.Stop()
6183
defer r.ctxCancel()

shared/relay/healthcheck/receiver_test.go

Lines changed: 23 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -2,31 +2,18 @@ package healthcheck
22

33
import (
44
"context"
5-
"fmt"
6-
"os"
7-
"sync"
85
"testing"
96
"time"
107

118
log "github.com/sirupsen/logrus"
129
)
1310

14-
// Mutex to protect global variable access in tests
15-
var testMutex sync.Mutex
16-
1711
func TestNewReceiver(t *testing.T) {
18-
testMutex.Lock()
19-
originalTimeout := heartbeatTimeout
20-
heartbeatTimeout = 5 * time.Second
21-
testMutex.Unlock()
22-
23-
defer func() {
24-
testMutex.Lock()
25-
heartbeatTimeout = originalTimeout
26-
testMutex.Unlock()
27-
}()
28-
29-
r := NewReceiver(log.WithContext(context.Background()))
12+
13+
opts := ReceiverOptions{
14+
HeartbeatTimeout: 5 * time.Second,
15+
}
16+
r := NewReceiverWithOpts(log.WithContext(context.Background()), opts)
3017
defer r.Stop()
3118

3219
select {
@@ -38,18 +25,10 @@ func TestNewReceiver(t *testing.T) {
3825
}
3926

4027
func TestNewReceiverNotReceive(t *testing.T) {
41-
testMutex.Lock()
42-
originalTimeout := heartbeatTimeout
43-
heartbeatTimeout = 1 * time.Second
44-
testMutex.Unlock()
45-
46-
defer func() {
47-
testMutex.Lock()
48-
heartbeatTimeout = originalTimeout
49-
testMutex.Unlock()
50-
}()
51-
52-
r := NewReceiver(log.WithContext(context.Background()))
28+
opts := ReceiverOptions{
29+
HeartbeatTimeout: 1 * time.Second,
30+
}
31+
r := NewReceiverWithOpts(log.WithContext(context.Background()), opts)
5332
defer r.Stop()
5433

5534
select {
@@ -61,18 +40,10 @@ func TestNewReceiverNotReceive(t *testing.T) {
6140
}
6241

6342
func TestNewReceiverAck(t *testing.T) {
64-
testMutex.Lock()
65-
originalTimeout := heartbeatTimeout
66-
heartbeatTimeout = 2 * time.Second
67-
testMutex.Unlock()
68-
69-
defer func() {
70-
testMutex.Lock()
71-
heartbeatTimeout = originalTimeout
72-
testMutex.Unlock()
73-
}()
74-
75-
r := NewReceiver(log.WithContext(context.Background()))
43+
opts := ReceiverOptions{
44+
HeartbeatTimeout: 2 * time.Second,
45+
}
46+
r := NewReceiverWithOpts(log.WithContext(context.Background()), opts)
7647
defer r.Stop()
7748

7849
r.Heartbeat()
@@ -97,30 +68,19 @@ func TestReceiverHealthCheckAttemptThreshold(t *testing.T) {
9768

9869
for _, tc := range testsCases {
9970
t.Run(tc.name, func(t *testing.T) {
100-
testMutex.Lock()
101-
originalInterval := healthCheckInterval
102-
originalTimeout := heartbeatTimeout
103-
healthCheckInterval = 1 * time.Second
104-
heartbeatTimeout = healthCheckInterval + 500*time.Millisecond
105-
testMutex.Unlock()
106-
107-
defer func() {
108-
testMutex.Lock()
109-
healthCheckInterval = originalInterval
110-
heartbeatTimeout = originalTimeout
111-
testMutex.Unlock()
112-
}()
113-
//nolint:tenv
114-
os.Setenv(defaultAttemptThresholdEnv, fmt.Sprintf("%d", tc.threshold))
115-
defer os.Unsetenv(defaultAttemptThresholdEnv)
116-
117-
receiver := NewReceiver(log.WithField("test_name", tc.name))
118-
119-
testTimeout := heartbeatTimeout*time.Duration(tc.threshold) + healthCheckInterval
71+
healthCheckInterval := 1 * time.Second
72+
73+
opts := ReceiverOptions{
74+
HeartbeatTimeout: healthCheckInterval + 500*time.Millisecond,
75+
AttemptThreshold: tc.threshold,
76+
}
77+
78+
receiver := NewReceiverWithOpts(log.WithField("test_name", tc.name), opts)
79+
80+
testTimeout := opts.HeartbeatTimeout*time.Duration(tc.threshold) + healthCheckInterval
12081

12182
if tc.resetCounterOnce {
12283
receiver.Heartbeat()
123-
t.Logf("reset counter once")
12484
}
12585

12686
select {
@@ -134,7 +94,6 @@ func TestReceiverHealthCheckAttemptThreshold(t *testing.T) {
13494
}
13595
t.Fatalf("should have timed out before %s", testTimeout)
13696
}
137-
13897
})
13998
}
14099
}

0 commit comments

Comments
 (0)