@@ -22,6 +22,7 @@ import (
2222 "context"
2323 "crypto/tls"
2424 "crypto/x509"
25+ "errors"
2526 "fmt"
2627 "reflect"
2728 "sync"
@@ -41,6 +42,7 @@ import (
4142 "github.com/plgd-dev/device/v2/schema/cloud"
4243 "github.com/plgd-dev/device/v2/schema/device"
4344 plgdResources "github.com/plgd-dev/device/v2/schema/resources"
45+ "github.com/plgd-dev/go-coap/v3/message"
4446 "github.com/plgd-dev/go-coap/v3/message/codes"
4547 "github.com/plgd-dev/go-coap/v3/message/pool"
4648 "github.com/plgd-dev/go-coap/v3/mux"
@@ -50,8 +52,6 @@ import (
5052 "github.com/plgd-dev/go-coap/v3/tcp/client"
5153)
5254
53- const tickInterval = time .Second * 10
54-
5555type (
5656 GetLinksFilteredBy func (endpoints schema.Endpoints , deviceIDfilter uuid.UUID , resourceTypesFitler []string , policyBitMaskFitler schema.BitMask ) (links schema.ResourceLinks )
5757 GetCertificates func (deviceID string ) []tls.Certificate
@@ -82,22 +82,25 @@ type Manager struct {
8282 caPool CAPoolGetter
8383 getCertificates GetCertificates
8484 removeCloudCAs RemoveCloudCAs
85+ tickInterval time.Duration
8586
8687 private struct {
8788 mutex sync.Mutex
8889 cfg Configuration
8990 previousCloudIDs []string
9091 readyToPublishResources map [string ]struct {}
9192 readyToUnpublishResources map [string ]struct {}
93+ creds ocfCloud.CoapSignUpResponse
9294 }
9395
9496 logger log.Logger
95- creds ocfCloud.CoapSignUpResponse
9697 client * client.Conn
9798 signedIn bool
9899 resourcesPublished bool
100+ forceRefreshToken bool
99101 done chan struct {}
100102 stopped atomic.Bool
103+ reconnect atomic.Bool
101104 trigger chan bool
102105 loop * eventloop.Loop
103106}
@@ -114,7 +117,8 @@ func New(cfg Config, deviceID uuid.UUID, save func(), handler net.RequestHandler
114117 removeCloudCAs : func (... string ) {
115118 // do nothing
116119 },
117- logger : log .NewNilLogger (),
120+ logger : log .NewNilLogger (),
121+ tickInterval : time .Second * 10 ,
118122 }
119123 for _ , opt := range opts {
120124 opt (& o )
@@ -133,6 +137,7 @@ func New(cfg Config, deviceID uuid.UUID, save func(), handler net.RequestHandler
133137 removeCloudCAs : o .removeCloudCAs ,
134138 logger : o .logger ,
135139 loop : loop ,
140+ tickInterval : o .tickInterval ,
136141 }
137142 c .private .cfg .ProvisioningStatus = cloud .ProvisioningStatus_UNINITIALIZED
138143 c .importConfig (cfg )
@@ -186,6 +191,13 @@ func (c *Manager) handleTrigger(value reflect.Value, closed bool) {
186191 if wantToReset {
187192 c .resetCredentials (ctx , true )
188193 }
194+ if c .reconnect .CompareAndSwap (true , false ) {
195+ err := c .close ()
196+ if err != nil && ! errors .Is (err , context .Canceled ) {
197+ c .logger .Errorf ("cannot close connection for reconnect: %w" , err )
198+ }
199+ return
200+ }
189201 if ! c .isInitialized () {
190202 // resources will be published after sign in
191203 c .resetPublishing ()
@@ -220,7 +232,7 @@ func (c *Manager) Init() {
220232 if c .private .cfg .URL != "" {
221233 c .triggerRunner (false )
222234 }
223- t := time .NewTicker (tickInterval )
235+ t := time .NewTicker (c . tickInterval )
224236 handlers := []eventloop.Handler {
225237 eventloop .NewReadHandler (reflect .ValueOf (c .trigger ), c .handleTrigger ),
226238 eventloop .NewReadHandler (reflect .ValueOf (t .C ), c .handleTimer ),
@@ -242,14 +254,16 @@ func (c *Manager) resetCredentials(ctx context.Context, signOff bool) {
242254 c .logger .Debugf ("%w" , err )
243255 }
244256 }
245- c .creds = ocfCloud.CoapSignUpResponse {}
246- c .signedIn = false
257+ c .setCreds (ocfCloud.CoapSignUpResponse {})
247258 c .resourcesPublished = false
259+ c .forceRefreshToken = false
260+ c .reconnect .Store (false )
248261 if err := c .close (); err != nil {
249262 c .logger .Warnf ("cannot close connection: %w" , err )
250263 }
251264 c .save ()
252265 c .removePreviousCloudIDs ()
266+ c .logger .Infof ("reset credentials" )
253267}
254268
255269func (c * Manager ) cleanup () {
@@ -347,29 +361,40 @@ func validUntil(expiresIn int64) time.Time {
347361}
348362
349363func (c * Manager ) setCreds (creds ocfCloud.CoapSignUpResponse ) {
350- c .creds = creds
364+ c .private .mutex .Lock ()
365+ defer c .private .mutex .Unlock ()
366+ c .private .creds = creds
351367 c .signedIn = false
352368}
353369
370+ func (c * Manager ) updateCreds (f func (creds * ocfCloud.CoapSignUpResponse )) {
371+ c .private .mutex .Lock ()
372+ defer c .private .mutex .Unlock ()
373+ f (& c .private .creds )
374+ }
375+
354376func (c * Manager ) getCreds () ocfCloud.CoapSignUpResponse {
355- return c .creds
377+ c .private .mutex .Lock ()
378+ defer c .private .mutex .Unlock ()
379+ return c .private .creds
356380}
357381
358382func (c * Manager ) isCredsExpiring () bool {
359- if c .creds .ValidUntil .IsZero () {
383+ creds := c .getCreds ()
384+ if creds .ValidUntil .IsZero () {
360385 return false
361386 }
362- diff := time .Until (c . creds .ValidUntil )
363- if diff < tickInterval * 2 {
387+ diff := time .Until (creds .ValidUntil )
388+ if diff < c . tickInterval * 2 {
364389 // refresh token before it expires
365390 return true
366391 }
367392 // refresh token when it is 1/3 before it expires
368- return time .Now ().After (c . creds .ValidUntil .Add (- diff / 3 ))
393+ return time .Now ().After (creds .ValidUntil .Add (- diff / 3 ))
369394}
370395
371- func getResourceTypesFilter (request * mux. Message ) []string {
372- queries , _ := request . Options () .Queries ()
396+ func getResourceTypesFilter (messageOptions message. Options ) []string {
397+ queries , _ := messageOptions .Queries ()
373398 resourceTypesFitler := []string {}
374399 for _ , q := range queries {
375400 if len (q ) > 3 && q [:3 ] == "rt=" {
@@ -379,37 +404,64 @@ func getResourceTypesFilter(request *mux.Message) []string {
379404 return resourceTypesFitler
380405}
381406
382- func ( c * Manager ) serveCOAP ( w mux. ResponseWriter , request * mux.Message ) {
383- request .Message . AddQuery ( "di=" + c . deviceID . String ())
384- r := net. Request {
385- Message : request . Message ,
386- Endpoints : nil ,
387- Conn : w . Conn (),
407+ func inFilterSupportedCodes ( request * mux.Message ) bool {
408+ switch request .Code () {
409+ case codes . POST , codes . PUT , codes . DELETE , codes . GET :
410+ return true
411+ default :
412+ return false
388413 }
389- var resp * pool.Message
414+ }
415+
416+ func (c * Manager ) handleDeviceResource (r * net.Request ) (* pool.Message , error ) {
417+ links := c .getLinks (schema.Endpoints {}, c .deviceID , nil , resources .PublishToCloud )
418+ for _ , link := range links {
419+ if link .HasType (device .ResourceType ) {
420+ _ = r .SetPath (link .Href )
421+ break
422+ }
423+ }
424+ return c .handler (r )
425+ }
426+
427+ func (c * Manager ) handleDiscoveryResource (r * net.Request ) (* pool.Message , error ) {
428+ links := c .getLinks (schema.Endpoints {}, c .deviceID , getResourceTypesFilter (r .Message .Options ()), resources .PublishToCloud )
429+ links = patchDeviceLink (links )
430+ links = discovery .PatchLinks (links , c .deviceID .String ())
431+ return resources .CreateResponseContent (r .Context (), links , codes .Content )
432+ }
433+
434+ func (c * Manager ) getHandler (r * net.Request ) func (r * net.Request ) (* pool.Message , error ) {
435+ h := c .handler
390436 p , err := r .Path ()
391437 if err == nil {
392438 switch p {
393439 case device .ResourceURI :
394- links := c .getLinks (schema.Endpoints {}, c .deviceID , nil , resources .PublishToCloud )
395- for _ , link := range links {
396- if link .HasType (device .ResourceType ) {
397- _ = r .SetPath (link .Href )
398- break
399- }
400- }
401- resp , err = c .handler (& r )
440+ h = c .handleDeviceResource
402441 case plgdResources .ResourceURI :
403- links := c .getLinks (schema.Endpoints {}, c .deviceID , getResourceTypesFilter (request ), resources .PublishToCloud )
404- links = patchDeviceLink (links )
405- links = discovery .PatchLinks (links , c .deviceID .String ())
406- resp , err = resources .CreateResponseContent (request .Context (), links , codes .Content )
407- default :
408- resp , err = c .handler (& r )
442+ h = c .handleDiscoveryResource
409443 }
410- } else {
411- resp , err = c .handler (& r )
412444 }
445+ return h
446+ }
447+
448+ func (c * Manager ) serveCOAP (w mux.ResponseWriter , request * mux.Message ) {
449+ if ! inFilterSupportedCodes (request ) {
450+ // ignore unsupported request
451+ if w .Conn ().Context ().Err () == nil {
452+ // log only if connection is still open
453+ c .logger .Debugf ("unsupported request: %v\n " , request )
454+ }
455+ return
456+ }
457+ request .Message .AddQuery ("di=" + c .deviceID .String ())
458+ r := net.Request {
459+ Message : request .Message ,
460+ Endpoints : nil ,
461+ Conn : w .Conn (),
462+ }
463+ h := c .getHandler (& r )
464+ resp , err := h (& r )
413465 if err != nil {
414466 resp = net .CreateResponseError (request .Context (), err , request .Token ())
415467 }
@@ -502,8 +554,9 @@ func patchDeviceLink(links schema.ResourceLinks) schema.ResourceLinks {
502554
503555func (c * Manager ) connect (ctx context.Context ) error {
504556 funcs := make ([]func (ctx context.Context ) error , 0 , 5 )
505- if c .isCredsExpiring () {
557+ if c .isCredsExpiring () || c . forceRefreshToken {
506558 funcs = append (funcs , c .refreshToken )
559+ c .forceRefreshToken = false
507560 }
508561 funcs = append (funcs , []func (ctx context.Context ) error {
509562 c .signUp ,
@@ -517,7 +570,7 @@ func (c *Manager) connect(ctx context.Context) error {
517570 }
518571 for _ , f := range funcs {
519572 r := func (ctx context.Context ) error {
520- fctx , cancel := context .WithTimeout (ctx , time . Second * 10 )
573+ fctx , cancel := context .WithTimeout (ctx , c . tickInterval )
521574 defer cancel ()
522575 return f (fctx )
523576 }
@@ -584,6 +637,11 @@ func (c *Manager) popReadyToPublishResources() map[string]struct{} {
584637 return res
585638}
586639
640+ func (c * Manager ) Reconnect () {
641+ c .reconnect .Store (true )
642+ c .triggerRunner (false )
643+ }
644+
587645func (c * Manager ) popReadyToUnpublishResources (count int ) []string {
588646 c .private .mutex .Lock ()
589647 defer c .private .mutex .Unlock ()
0 commit comments