Skip to content

Commit 998fb30

Browse files
authored
[client] Check the client status in the earlier phase (#4509)
This PR improves the NetBird client's status checking mechanism by implementing earlier detection of client state changes and better handling of connection lifecycle management. The key improvements focus on: • Enhanced status detection - Added waitForReady option to StatusRequest for improved client status handling • Better connection management - Improved context handling for signal and management gRPC connections• Reduced connection timeouts - Increased gRPC dial timeout from 3 to 10 seconds for better reliability • Cleaner error handling - Enhanced error propagation and context cancellation in retry loops Key Changes Core Status Improvements: - Added waitForReady optional field to StatusRequest proto (daemon.proto:190) - Enhanced status checking logic to detect client state changes earlier in the connection process - Improved handling of client permanent exit scenarios from retry loops Connection & Context Management: - Fixed context cancellation in management and signal client retry mechanisms - Added proper context propagation for Login operations - Enhanced gRPC connection handling with better timeout management Error Handling & Cleanup: - Moved feedback channels to upper layers for better separation of concerns - Improved error handling patterns throughout the client server implementation - Fixed synchronization issues and removed debug logging
1 parent e254b4c commit 998fb30

File tree

10 files changed

+128
-54
lines changed

10 files changed

+128
-54
lines changed

client/cmd/root.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func FlagNameToEnvVar(cmdFlag string, prefix string) string {
231231

232232
// DialClientGRPCServer returns client connection to the daemon server.
233233
func DialClientGRPCServer(ctx context.Context, addr string) (*grpc.ClientConn, error) {
234-
ctx, cancel := context.WithTimeout(ctx, time.Second*3)
234+
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
235235
defer cancel()
236236

237237
return grpc.DialContext(

client/cmd/up.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,9 @@ func runInDaemonMode(ctx context.Context, cmd *cobra.Command, pm *profilemanager
230230

231231
client := proto.NewDaemonServiceClient(conn)
232232

233-
status, err := client.Status(ctx, &proto.StatusRequest{})
233+
status, err := client.Status(ctx, &proto.StatusRequest{
234+
WaitForReady: func() *bool { b := true; return &b }(),
235+
})
234236
if err != nil {
235237
return fmt.Errorf("unable to get daemon status: %v", err)
236238
}

client/embed/embed.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func (c *Client) Start(startCtx context.Context) error {
135135

136136
// either startup error (permanent backoff err) or nil err (successful engine up)
137137
// TODO: make after-startup backoff err available
138-
run := make(chan struct{}, 1)
138+
run := make(chan struct{})
139139
clientErr := make(chan error, 1)
140140
go func() {
141141
if err := client.Run(run); err != nil {

client/grpc/dialer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func Backoff(ctx context.Context) backoff.BackOff {
5858
return backoff.WithContext(b, ctx)
5959
}
6060

61-
func CreateConnection(addr string, tlsEnabled bool) (*grpc.ClientConn, error) {
61+
func CreateConnection(ctx context.Context, addr string, tlsEnabled bool) (*grpc.ClientConn, error) {
6262
transportOption := grpc.WithTransportCredentials(insecure.NewCredentials())
6363
if tlsEnabled {
6464
certPool, err := x509.SystemCertPool()
@@ -72,7 +72,7 @@ func CreateConnection(addr string, tlsEnabled bool) (*grpc.ClientConn, error) {
7272
}))
7373
}
7474

75-
connCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
75+
connCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
7676
defer cancel()
7777

7878
conn, err := grpc.DialContext(

client/proto/daemon.pb.go

Lines changed: 17 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/proto/daemon.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,8 @@ message UpResponse {}
186186
message StatusRequest{
187187
bool getFullPeerStatus = 1;
188188
bool shouldRunProbes = 2;
189+
// the UI do not using this yet, but CLIs could use it to wait until the status is ready
190+
optional bool waitForReady = 3;
189191
}
190192

191193
message StatusResponse{

client/server/server.go

Lines changed: 93 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ type Server struct {
6767
proto.UnimplementedDaemonServiceServer
6868
clientRunning bool // protected by mutex
6969
clientRunningChan chan struct{}
70+
clientGiveUpChan chan struct{}
7071

7172
connectClient *internal.ConnectClient
7273

@@ -106,6 +107,10 @@ func (s *Server) Start() error {
106107
s.mutex.Lock()
107108
defer s.mutex.Unlock()
108109

110+
if s.clientRunning {
111+
return nil
112+
}
113+
109114
state := internal.CtxGetState(s.rootCtx)
110115

111116
if err := handlePanicLog(); err != nil {
@@ -175,12 +180,10 @@ func (s *Server) Start() error {
175180
return nil
176181
}
177182

178-
if s.clientRunning {
179-
return nil
180-
}
181183
s.clientRunning = true
182-
s.clientRunningChan = make(chan struct{}, 1)
183-
go s.connectWithRetryRuns(ctx, config, s.statusRecorder, s.clientRunningChan)
184+
s.clientRunningChan = make(chan struct{})
185+
s.clientGiveUpChan = make(chan struct{})
186+
go s.connectWithRetryRuns(ctx, config, s.statusRecorder, s.clientRunningChan, s.clientGiveUpChan)
184187
return nil
185188
}
186189

@@ -211,7 +214,7 @@ func (s *Server) setDefaultConfigIfNotExists(ctx context.Context) error {
211214
// connectWithRetryRuns runs the client connection with a backoff strategy where we retry the operation as additional
212215
// mechanism to keep the client connected even when the connection is lost.
213216
// we cancel retry if the client receive a stop or down command, or if disable auto connect is configured.
214-
func (s *Server) connectWithRetryRuns(ctx context.Context, profileConfig *profilemanager.Config, statusRecorder *peer.Status, runningChan chan struct{}) {
217+
func (s *Server) connectWithRetryRuns(ctx context.Context, profileConfig *profilemanager.Config, statusRecorder *peer.Status, runningChan chan struct{}, giveUpChan chan struct{}) {
215218
defer func() {
216219
s.mutex.Lock()
217220
s.clientRunning = false
@@ -261,6 +264,10 @@ func (s *Server) connectWithRetryRuns(ctx context.Context, profileConfig *profil
261264
if err := backoff.Retry(runOperation, backOff); err != nil {
262265
log.Errorf("operation failed: %v", err)
263266
}
267+
268+
if giveUpChan != nil {
269+
close(giveUpChan)
270+
}
264271
}
265272

266273
// loginAttempt attempts to login using the provided information. it returns a status in case something fails
@@ -379,7 +386,7 @@ func (s *Server) Login(callerCtx context.Context, msg *proto.LoginRequest) (*pro
379386
if s.actCancel != nil {
380387
s.actCancel()
381388
}
382-
ctx, cancel := context.WithCancel(s.rootCtx)
389+
ctx, cancel := context.WithCancel(callerCtx)
383390

384391
md, ok := metadata.FromIncomingContext(callerCtx)
385392
if ok {
@@ -389,11 +396,11 @@ func (s *Server) Login(callerCtx context.Context, msg *proto.LoginRequest) (*pro
389396
s.actCancel = cancel
390397
s.mutex.Unlock()
391398

392-
if err := restoreResidualState(ctx, s.profileManager.GetStatePath()); err != nil {
399+
if err := restoreResidualState(s.rootCtx, s.profileManager.GetStatePath()); err != nil {
393400
log.Warnf(errRestoreResidualState, err)
394401
}
395402

396-
state := internal.CtxGetState(ctx)
403+
state := internal.CtxGetState(s.rootCtx)
397404
defer func() {
398405
status, err := state.Status()
399406
if err != nil || (status != internal.StatusNeedsLogin && status != internal.StatusLoginFailed) {
@@ -606,6 +613,20 @@ func (s *Server) WaitSSOLogin(callerCtx context.Context, msg *proto.WaitSSOLogin
606613
// Up starts engine work in the daemon.
607614
func (s *Server) Up(callerCtx context.Context, msg *proto.UpRequest) (*proto.UpResponse, error) {
608615
s.mutex.Lock()
616+
if s.clientRunning {
617+
state := internal.CtxGetState(s.rootCtx)
618+
status, err := state.Status()
619+
if err != nil {
620+
s.mutex.Unlock()
621+
return nil, err
622+
}
623+
if status == internal.StatusNeedsLogin {
624+
s.actCancel()
625+
}
626+
s.mutex.Unlock()
627+
628+
return s.waitForUp(callerCtx)
629+
}
609630
defer s.mutex.Unlock()
610631

611632
if err := restoreResidualState(callerCtx, s.profileManager.GetStatePath()); err != nil {
@@ -621,16 +642,16 @@ func (s *Server) Up(callerCtx context.Context, msg *proto.UpRequest) (*proto.UpR
621642
if err != nil {
622643
return nil, err
623644
}
645+
624646
if status != internal.StatusIdle {
625647
return nil, fmt.Errorf("up already in progress: current status %s", status)
626648
}
627649

628-
// it should be nil here, but .
650+
// it should be nil here, but in case it isn't we cancel it.
629651
if s.actCancel != nil {
630652
s.actCancel()
631653
}
632654
ctx, cancel := context.WithCancel(s.rootCtx)
633-
634655
md, ok := metadata.FromIncomingContext(callerCtx)
635656
if ok {
636657
ctx = metadata.NewOutgoingContext(ctx, md)
@@ -673,26 +694,31 @@ func (s *Server) Up(callerCtx context.Context, msg *proto.UpRequest) (*proto.UpR
673694
s.statusRecorder.UpdateManagementAddress(s.config.ManagementURL.String())
674695
s.statusRecorder.UpdateRosenpass(s.config.RosenpassEnabled, s.config.RosenpassPermissive)
675696

697+
s.clientRunning = true
698+
s.clientRunningChan = make(chan struct{})
699+
s.clientGiveUpChan = make(chan struct{})
700+
go s.connectWithRetryRuns(ctx, s.config, s.statusRecorder, s.clientRunningChan, s.clientGiveUpChan)
701+
702+
return s.waitForUp(callerCtx)
703+
}
704+
705+
// todo: handle potential race conditions
706+
func (s *Server) waitForUp(callerCtx context.Context) (*proto.UpResponse, error) {
676707
timeoutCtx, cancel := context.WithTimeout(callerCtx, 50*time.Second)
677708
defer cancel()
678709

679-
if !s.clientRunning {
680-
s.clientRunning = true
681-
s.clientRunningChan = make(chan struct{}, 1)
682-
go s.connectWithRetryRuns(ctx, s.config, s.statusRecorder, s.clientRunningChan)
683-
}
684-
for {
685-
select {
686-
case <-s.clientRunningChan:
687-
s.isSessionActive.Store(true)
688-
return &proto.UpResponse{}, nil
689-
case <-callerCtx.Done():
690-
log.Debug("context done, stopping the wait for engine to become ready")
691-
return nil, callerCtx.Err()
692-
case <-timeoutCtx.Done():
693-
log.Debug("up is timed out, stopping the wait for engine to become ready")
694-
return nil, timeoutCtx.Err()
695-
}
710+
select {
711+
case <-s.clientGiveUpChan:
712+
return nil, fmt.Errorf("client gave up to connect")
713+
case <-s.clientRunningChan:
714+
s.isSessionActive.Store(true)
715+
return &proto.UpResponse{}, nil
716+
case <-callerCtx.Done():
717+
log.Debug("context done, stopping the wait for engine to become ready")
718+
return nil, callerCtx.Err()
719+
case <-timeoutCtx.Done():
720+
log.Debug("up is timed out, stopping the wait for engine to become ready")
721+
return nil, timeoutCtx.Err()
696722
}
697723
}
698724

@@ -966,12 +992,46 @@ func (s *Server) Status(
966992
ctx context.Context,
967993
msg *proto.StatusRequest,
968994
) (*proto.StatusResponse, error) {
969-
if ctx.Err() != nil {
970-
return nil, ctx.Err()
971-
}
972-
973995
s.mutex.Lock()
974-
defer s.mutex.Unlock()
996+
clientRunning := s.clientRunning
997+
s.mutex.Unlock()
998+
999+
if msg.WaitForReady != nil && *msg.WaitForReady && clientRunning {
1000+
state := internal.CtxGetState(s.rootCtx)
1001+
status, err := state.Status()
1002+
if err != nil {
1003+
return nil, err
1004+
}
1005+
1006+
if status != internal.StatusIdle && status != internal.StatusConnected && status != internal.StatusConnecting {
1007+
s.actCancel()
1008+
}
1009+
1010+
ticker := time.NewTicker(1 * time.Second)
1011+
defer ticker.Stop()
1012+
loop:
1013+
for {
1014+
select {
1015+
case <-s.clientGiveUpChan:
1016+
ticker.Stop()
1017+
break loop
1018+
case <-s.clientRunningChan:
1019+
ticker.Stop()
1020+
break loop
1021+
case <-ticker.C:
1022+
status, err := state.Status()
1023+
if err != nil {
1024+
continue
1025+
}
1026+
if status != internal.StatusIdle && status != internal.StatusConnected && status != internal.StatusConnecting {
1027+
s.actCancel()
1028+
}
1029+
continue
1030+
case <-ctx.Done():
1031+
return nil, ctx.Err()
1032+
}
1033+
}
1034+
}
9751035

9761036
status, err := internal.CtxGetState(s.rootCtx).Status()
9771037
if err != nil {

client/server/server_test.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func TestConnectWithRetryRuns(t *testing.T) {
105105
t.Setenv(maxRetryTimeVar, "5s")
106106
t.Setenv(retryMultiplierVar, "1")
107107

108-
s.connectWithRetryRuns(ctx, config, s.statusRecorder, nil)
108+
s.connectWithRetryRuns(ctx, config, s.statusRecorder, nil, nil)
109109
if counter < 3 {
110110
t.Fatalf("expected counter > 2, got %d", counter)
111111
}
@@ -134,8 +134,12 @@ func TestServer_Up(t *testing.T) {
134134

135135
profName := "default"
136136

137+
u, err := url.Parse("http://non-existent-url-for-testing.invalid:12345")
138+
require.NoError(t, err)
139+
137140
ic := profilemanager.ConfigInput{
138-
ConfigPath: filepath.Join(tempDir, profName+".json"),
141+
ConfigPath: filepath.Join(tempDir, profName+".json"),
142+
ManagementURL: u.String(),
139143
}
140144

141145
_, err = profilemanager.UpdateOrCreateConfig(ic)
@@ -153,16 +157,9 @@ func TestServer_Up(t *testing.T) {
153157
}
154158

155159
s := New(ctx, "console", "", false, false)
156-
157160
err = s.Start()
158161
require.NoError(t, err)
159162

160-
u, err := url.Parse("http://non-existent-url-for-testing.invalid:12345")
161-
require.NoError(t, err)
162-
s.config = &profilemanager.Config{
163-
ManagementURL: u,
164-
}
165-
166163
upCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
167164
defer cancel()
168165

@@ -171,6 +168,7 @@ func TestServer_Up(t *testing.T) {
171168
Username: &currUser.Username,
172169
}
173170
_, err = s.Up(upCtx, upReq)
171+
log.Errorf("error from Up: %v", err)
174172

175173
assert.Contains(t, err.Error(), "context deadline exceeded")
176174
}

shared/management/client/grpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsE
5252

5353
operation := func() error {
5454
var err error
55-
conn, err = nbgrpc.CreateConnection(addr, tlsEnabled)
55+
conn, err = nbgrpc.CreateConnection(ctx, addr, tlsEnabled)
5656
if err != nil {
5757
log.Printf("createConnection error: %v", err)
5858
return err

shared/signal/client/grpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo
5757

5858
operation := func() error {
5959
var err error
60-
conn, err = nbgrpc.CreateConnection(addr, tlsEnabled)
60+
conn, err = nbgrpc.CreateConnection(ctx, addr, tlsEnabled)
6161
if err != nil {
6262
log.Printf("createConnection error: %v", err)
6363
return err

0 commit comments

Comments
 (0)