Skip to content

Commit 6296fdc

Browse files
committed
bridge: Reset the cloud connection when the remote side closes the connection
1 parent 690a44f commit 6296fdc

File tree

10 files changed

+97
-46
lines changed

10 files changed

+97
-46
lines changed

bridge/device/cloud/manager.go

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,11 @@ type Manager struct {
9191
readyToPublishResources map[string]struct{}
9292
readyToUnpublishResources map[string]struct{}
9393
creds ocfCloud.CoapSignUpResponse
94+
client *client.Conn
95+
signedIn bool
9496
}
9597

9698
logger log.Logger
97-
client *client.Conn
98-
signedIn bool
9999
resourcesPublished bool
100100
forceRefreshToken bool
101101
done chan struct{}
@@ -182,6 +182,12 @@ func (c *Manager) isInitialized() bool {
182182
return cfg.URL != ""
183183
}
184184

185+
func (c *Manager) isSignedIn() bool {
186+
c.private.mutex.Lock()
187+
defer c.private.mutex.Unlock()
188+
return c.private.signedIn
189+
}
190+
185191
func (c *Manager) handleTrigger(value reflect.Value, closed bool) {
186192
if closed {
187193
return
@@ -203,7 +209,7 @@ func (c *Manager) handleTrigger(value reflect.Value, closed bool) {
203209
c.resetPublishing()
204210
return
205211
}
206-
if !c.signedIn {
212+
if !c.isSignedIn() {
207213
// resources will be published after sign in
208214
c.resetPublishing()
209215
}
@@ -364,13 +370,7 @@ func (c *Manager) setCreds(creds ocfCloud.CoapSignUpResponse) {
364370
c.private.mutex.Lock()
365371
defer c.private.mutex.Unlock()
366372
c.private.creds = creds
367-
c.signedIn = false
368-
}
369-
370-
func (c *Manager) updateCreds(f func(creds *ocfCloud.CoapSignUpResponse)) {
371-
c.private.mutex.Lock()
372-
defer c.private.mutex.Unlock()
373-
f(&c.private.creds)
373+
c.private.signedIn = false
374374
}
375375

376376
func (c *Manager) getCreds() ocfCloud.CoapSignUpResponse {
@@ -473,18 +473,32 @@ func (c *Manager) serveCOAP(w mux.ResponseWriter, request *mux.Message) {
473473
}
474474
}
475475

476+
func (c *Manager) replaceClient(client *client.Conn) *client.Conn {
477+
c.private.mutex.Lock()
478+
defer c.private.mutex.Unlock()
479+
c.private.signedIn = false
480+
oldClient := c.private.client
481+
c.private.client = client
482+
return oldClient
483+
}
484+
476485
func (c *Manager) close() error {
477-
c.signedIn = false
478-
if c.client == nil {
486+
oldClient := c.replaceClient(nil)
487+
if oldClient == nil {
479488
return nil
480489
}
481-
client := c.client
482-
c.client = nil
483-
return client.Close()
490+
return oldClient.Close()
491+
}
492+
493+
func (c *Manager) getClient() *client.Conn {
494+
c.private.mutex.Lock()
495+
defer c.private.mutex.Unlock()
496+
return c.private.client
484497
}
485498

486499
func (c *Manager) dial(ctx context.Context) error {
487-
if c.client != nil && c.client.Context().Err() == nil {
500+
cc := c.getClient()
501+
if cc != nil && cc.Context().Err() == nil {
488502
return nil
489503
}
490504
_ = c.close()
@@ -535,7 +549,16 @@ func (c *Manager) dial(ctx context.Context) error {
535549
if err != nil {
536550
return fmt.Errorf("cannot dial to %v: %w", addr.String(), err)
537551
}
538-
c.client = conn
552+
conn.AddOnClose(func() {
553+
c.private.mutex.Lock()
554+
defer c.private.mutex.Unlock()
555+
if c.private.client == conn {
556+
c.logger.Infof("cloud connection: closed")
557+
c.private.client = nil
558+
c.private.signedIn = false
559+
}
560+
})
561+
c.replaceClient(conn)
539562
return nil
540563
}
541564

bridge/device/cloud/manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func TestManagerDeviceBecomesUnauthorized(t *testing.T) {
139139
if cfg.AccessToken == "" {
140140
return
141141
}
142-
time.Sleep(tickInterval)
142+
time.Sleep(tickInterval * 2)
143143
}
144144
require.Fail(t, "cloud manager should be reset, but it is not")
145145
}

bridge/device/cloud/publishResources.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,15 @@ func (c *Manager) publishResources(ctx context.Context) error {
7272
Links: links,
7373
TimeToLive: 0,
7474
}
75-
req, err := newPostRequest(ctx, c.client, ocfCloud.ResourceDirectory, wkRd)
75+
client := c.getClient()
76+
if client == nil {
77+
return errCannotPublishResources(fmt.Errorf("no connection"))
78+
}
79+
req, err := newPostRequest(ctx, client, ocfCloud.ResourceDirectory, wkRd)
7680
if err != nil {
7781
return errCannotPublishResources(err)
7882
}
79-
resp, err := c.client.Do(req)
83+
resp, err := client.Do(req)
8084
if err != nil {
8185
return errCannotPublishResources(err)
8286
}
@@ -117,14 +121,18 @@ func (c *Manager) unpublishResources(ctx context.Context) error {
117121
}
118122
break
119123
}
124+
client := c.getClient()
125+
if client == nil {
126+
return errCannotUnpublishResources(fmt.Errorf("no connection"))
127+
}
120128
firstRun = false
121-
req, err := newDeleteRequest(ctx, c.client, ocfCloud.ResourceDirectory)
129+
req, err := newDeleteRequest(ctx, client, ocfCloud.ResourceDirectory)
122130
if err != nil {
123131
return errCannotUnpublishResources(err)
124132
}
125133
query := toQuery(c.deviceID.String(), readyResouces)
126134
req.AddQuery(query)
127-
resp, err := c.client.Do(req)
135+
resp, err := client.Do(req)
128136
if err != nil {
129137
return errCannotUnpublishResources(err)
130138
}

bridge/device/cloud/refreshToken.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,11 @@ func (c *Manager) refreshToken(ctx context.Context) error {
3939
if creds.RefreshToken == "" {
4040
return nil
4141
}
42-
req, err := newPostRequest(ctx, c.client, ocfCloud.RefreshToken, ocfCloud.CoapRefreshTokenRequest{
42+
client := c.getClient()
43+
if client == nil {
44+
return errCannotRefreshToken(fmt.Errorf("no connection"))
45+
}
46+
req, err := newPostRequest(ctx, client, ocfCloud.RefreshToken, ocfCloud.CoapRefreshTokenRequest{
4347
DeviceID: c.deviceID.String(),
4448
UserID: creds.UserID,
4549
RefreshToken: creds.RefreshToken,
@@ -48,7 +52,7 @@ func (c *Manager) refreshToken(ctx context.Context) error {
4852
return errCannotRefreshToken(err)
4953
}
5054
c.setProvisioningStatus(cloud.ProvisioningStatus_REGISTERING)
51-
resp, err := c.client.Do(req)
55+
resp, err := client.Do(req)
5256
if err != nil {
5357
return errCannotRefreshToken(err)
5458
}
@@ -69,10 +73,10 @@ func (c *Manager) refreshToken(ctx context.Context) error {
6973
}
7074

7175
func (c *Manager) updateCredsByRefreshTokenResponse(resp ocfCloud.CoapRefreshTokenResponse) {
72-
c.updateCreds(func(creds *ocfCloud.CoapSignUpResponse) {
73-
creds.AccessToken = resp.AccessToken
74-
creds.RefreshToken = resp.RefreshToken
75-
creds.ValidUntil = validUntil(resp.ExpiresIn)
76-
})
77-
c.signedIn = false
76+
c.private.mutex.Lock()
77+
defer c.private.mutex.Unlock()
78+
c.private.creds.AccessToken = resp.AccessToken
79+
c.private.creds.RefreshToken = resp.RefreshToken
80+
c.private.creds.ValidUntil = validUntil(resp.ExpiresIn)
81+
c.private.signedIn = false
7882
}

bridge/device/cloud/signIn.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,23 +50,24 @@ func errCannotSignIn(err error) error {
5050
}
5151

5252
func (c *Manager) signIn(ctx context.Context) error {
53-
if c.client == nil {
53+
client := c.getClient()
54+
if client == nil {
5455
return errCannotSignIn(fmt.Errorf("no connection"))
5556
}
56-
if c.signedIn {
57+
if c.isSignedIn() {
5758
return nil
5859
}
5960
creds := c.getCreds()
6061
signInReq, err := MakeSignInRequest(c.deviceID.String(), creds.UserID, creds.AccessToken)
6162
if err != nil {
6263
return errCannotSignIn(err)
6364
}
64-
req, err := newPostRequest(ctx, c.client, ocfCloud.SignIn, signInReq)
65+
req, err := newPostRequest(ctx, client, ocfCloud.SignIn, signInReq)
6566
if err != nil {
6667
return errCannotSignIn(err)
6768
}
6869
c.setProvisioningStatus(cloud.ProvisioningStatus_REGISTERING)
69-
resp, err := c.client.Do(req)
70+
resp, err := client.Do(req)
7071
if err != nil {
7172
return errCannotSignIn(err)
7273
}
@@ -92,9 +93,9 @@ func (c *Manager) signIn(ctx context.Context) error {
9293
}
9394

9495
func (c *Manager) updateCredsBySignInResponse(resp ocfCloud.CoapSignInResponse) {
95-
c.updateCreds(func(creds *ocfCloud.CoapSignUpResponse) {
96-
creds.ExpiresIn = resp.ExpiresIn
97-
creds.ValidUntil = validUntil(resp.ExpiresIn)
98-
})
99-
c.signedIn = true
96+
c.private.mutex.Lock()
97+
defer c.private.mutex.Unlock()
98+
c.private.creds.ExpiresIn = resp.ExpiresIn
99+
c.private.creds.ValidUntil = validUntil(resp.ExpiresIn)
100+
c.private.signedIn = true
100101
}

bridge/device/cloud/signOff.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,20 +49,21 @@ func errCannotSignOff(err error) error {
4949
}
5050

5151
func (c *Manager) signOff(ctx context.Context) error {
52-
if c.client == nil {
53-
return nil
52+
client := c.getClient()
53+
if client == nil {
54+
return errCannotSignOff(fmt.Errorf("no connection"))
5455
}
5556
// signIn / refresh token fails
5657
if ctx.Err() != nil {
5758
return errCannotSignOff(ctx.Err())
5859
}
5960

60-
req, err := newSignOffReq(ctx, c.client, c.deviceID.String(), c.getCreds().UserID)
61+
req, err := newSignOffReq(ctx, client, c.deviceID.String(), c.getCreds().UserID)
6162
if err != nil {
6263
return errCannotSignOff(err)
6364
}
6465
c.setProvisioningStatus(ProvisioningStatusDEREGISTERING)
65-
resp, err := c.client.Do(req)
66+
resp, err := client.Do(req)
6667
defer c.setProvisioningStatus(cloud.ProvisioningStatus_UNINITIALIZED)
6768
if err != nil {
6869
return errCannotSignOff(err)

bridge/device/cloud/signUp.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,16 @@ func (c *Manager) signUp(ctx context.Context) error {
6363
if err != nil {
6464
return errCannotSignUp(err)
6565
}
66-
req, err := newPostRequest(ctx, c.client, ocfCloud.SignUp, signUpRequest)
66+
client := c.getClient()
67+
if client == nil {
68+
return errCannotSignOff(fmt.Errorf("no connection"))
69+
}
70+
req, err := newPostRequest(ctx, client, ocfCloud.SignUp, signUpRequest)
6771
if err != nil {
6872
return errCannotSignUp(err)
6973
}
7074
c.setProvisioningStatus(cloud.ProvisioningStatus_REGISTERING)
71-
resp, err := c.client.Do(req)
75+
resp, err := client.Do(req)
7276
if err != nil {
7377
return errCannotSignUp(err)
7478
}

test/coap-gateway/service/refreshToken.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func refreshTokenPostHandler(req *mux.Message, client *Client) {
3333
client.sendErrorResponse(fmt.Errorf("cannot handle refresh token: %w", err), code, req.Token())
3434
if client.handler == nil || client.handler.CloseOnError() {
3535
// to send the error response
36-
time.Sleep(time.Millisecond * 10)
36+
time.Sleep(time.Millisecond * 100)
3737
if err := client.Close(); err != nil {
3838
fmt.Printf("refresh token error: %v\n", err)
3939
}

test/coap-gateway/service/signIn.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package service
2020

2121
import (
2222
"fmt"
23+
"time"
2324

2425
"github.com/plgd-dev/device/v2/pkg/codec/cbor"
2526
"github.com/plgd-dev/device/v2/pkg/ocf/cloud"
@@ -32,6 +33,8 @@ func signInPostHandler(req *mux.Message, client *Client, signIn cloud.CoapSignIn
3233
logErrorAndCloseClient := func(err error, code coapCodes.Code) {
3334
client.sendErrorResponse(fmt.Errorf("cannot handle sign in: %w", err), code, req.Token())
3435
if client.handler == nil || client.handler.CloseOnError() {
36+
// to send the error response
37+
time.Sleep(time.Millisecond * 100)
3538
if err := client.Close(); err != nil {
3639
fmt.Printf("sign in error: %v\n", err)
3740
}
@@ -59,6 +62,8 @@ func signOutPostHandler(req *mux.Message, client *Client, signOut cloud.CoapSign
5962
logErrorAndCloseClient := func(err error, code coapCodes.Code) {
6063
client.sendErrorResponse(fmt.Errorf("cannot handle sign out: %w", err), code, req.Token())
6164
if client.handler == nil || client.handler.CloseOnError() {
65+
// to send the error response
66+
time.Sleep(time.Millisecond * 100)
6267
if err := client.Close(); err != nil {
6368
fmt.Printf("sign out error: %v\n", err)
6469
}

test/coap-gateway/service/signUp.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package service
2020

2121
import (
2222
"fmt"
23+
"time"
2324

2425
"github.com/plgd-dev/device/v2/pkg/codec/cbor"
2526
"github.com/plgd-dev/device/v2/pkg/ocf/cloud"
@@ -32,6 +33,8 @@ func signUpPostHandler(r *mux.Message, client *Client) {
3233
logErrorAndCloseClient := func(err error, code coapCodes.Code) {
3334
client.sendErrorResponse(fmt.Errorf("cannot handle sign up: %w", err), code, r.Token())
3435
if client.handler == nil || client.handler.CloseOnError() {
36+
// to send the error response
37+
time.Sleep(time.Millisecond * 100)
3538
if err := client.Close(); err != nil {
3639
fmt.Printf("sign up error: %v\n", err)
3740
}
@@ -67,6 +70,8 @@ func signOffHandler(req *mux.Message, client *Client) {
6770
logErrorAndCloseClient := func(err error, code coapCodes.Code) {
6871
client.sendErrorResponse(fmt.Errorf("cannot handle sign off: %w", err), code, req.Token())
6972
if client.handler == nil || client.handler.CloseOnError() {
73+
// to send the error response
74+
time.Sleep(time.Millisecond * 100)
7075
if err := client.Close(); err != nil {
7176
fmt.Printf("sign off error: %v\n", err)
7277
}

0 commit comments

Comments
 (0)