Skip to content

Commit d35c4c8

Browse files
committed
observerDevices: introduce failure threshold
1 parent 38ef680 commit d35c4c8

File tree

6 files changed

+173
-55
lines changed

6 files changed

+173
-55
lines changed

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,15 @@ env: clean certificates
3636
if [ "${TRAVIS_OS_NAME}" == "linux" ]; then \
3737
sudo sh -c 'echo 0 > /proc/sys/net/ipv6/conf/all/disable_ipv6'; \
3838
fi
39-
mkdir -p $(DEVSIM_NET_HOST_PATH)
39+
mkdir -p $(DEVSIM_NET_HOST_PATH)/creds
40+
4041
docker pull $(DEVSIM_IMAGE)
4142
docker run -d \
4243
--privileged \
4344
--network=host \
4445
--name devsim-net-host \
4546
-v $(DEVSIM_NET_HOST_PATH):/tmp \
47+
-v $(DEVSIM_NET_HOST_PATH)/creds:/cloud_server_creds \
4648
-v $(CERT_PATH):/pki_certs \
4749
$(DEVSIM_IMAGE) devsim-$(SIMULATOR_NAME_SUFFIX)
4850

client/client.go

Lines changed: 134 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type subscription = interface {
5151
type Config struct {
5252
DeviceCacheExpirationSeconds int64
5353
ObserverPollingIntervalSeconds uint64 // 0 means 3 seconds
54+
ObserverFailureThreshold uint8 // 0 means 3
5455

5556
KeepAliveConnectionTimeoutSeconds uint64 // 0 means keepalive is disabled
5657
MaxMessageSize uint32
@@ -140,37 +141,144 @@ func NewClientFromConfig(cfg *Config, app ApplicationCallback, logger core.Logge
140141
return coap.DialUDP(ctx, addr, opts...)
141142
}
142143

143-
opts := []core.OptionFunc{
144-
core.WithDialDTLS(dialDTLS),
145-
core.WithDialTLS(dialTLS),
146-
core.WithDialTCP(dialTCP),
147-
core.WithDialUDP(dialUDP),
148-
core.WithLogger(logger),
144+
opts := []ClientOptionFunc{
145+
WithDialDTLS(dialDTLS),
146+
WithDialTLS(dialTLS),
147+
WithDialTCP(dialTCP),
148+
WithDialUDP(dialUDP),
149+
WithLogger(logger),
150+
WithObserverConfig(ObserverConfig{
151+
PollingInterval: observerPollingInterval,
152+
FailureThreshold: cfg.ObserverFailureThreshold,
153+
}),
154+
WithCacheExpiration(cacheExpiration),
149155
}
150156

151157
deviceOwner, err := NewDeviceOwnerFromConfig(cfg, dialTLS, dialDTLS, app)
152158
if err != nil {
153159
return nil, err
154160
}
155-
return NewClient(app, deviceOwner, cacheExpiration, observerPollingInterval, opts...)
161+
return NewClient(app, deviceOwner, opts...)
162+
}
163+
164+
// ObserverConfig is a configuration of the devices observation.
165+
type ObserverConfig struct {
166+
// PollingInterval is a time between two consecutive observations.
167+
PollingInterval time.Duration
168+
// FailureThreshold is a number of consecutive observation failures after which the device is marked as offline.
169+
FailureThreshold uint8
170+
}
171+
172+
type ClientConfig struct {
173+
CoreOptions []core.OptionFunc
174+
// CacheExpiration is a time after which the device entry in cache is invalidated.
175+
CacheExpiration time.Duration
176+
// Observer is a configuration of the devices observation.
177+
Observer ObserverConfig
178+
}
179+
180+
type ClientOptionFunc func(ClientConfig) ClientConfig
181+
182+
// WithObserverConfig sets the observer config.
183+
func WithObserverConfig(observerConfig ObserverConfig) ClientOptionFunc {
184+
return func(cfg ClientConfig) ClientConfig {
185+
if observerConfig.PollingInterval <= 0 {
186+
observerConfig.PollingInterval = 3 * time.Second
187+
}
188+
if observerConfig.FailureThreshold <= 0 {
189+
observerConfig.FailureThreshold = 3
190+
}
191+
cfg.Observer = observerConfig
192+
return cfg
193+
}
194+
}
195+
196+
func WithCacheExpiration(cacheExpiration time.Duration) ClientOptionFunc {
197+
return func(cfg ClientConfig) ClientConfig {
198+
cfg.CacheExpiration = cacheExpiration
199+
return cfg
200+
}
201+
}
202+
203+
func WithTLS(tlsConfig *core.TLSConfig) ClientOptionFunc {
204+
return func(cfg ClientConfig) ClientConfig {
205+
if tlsConfig != nil {
206+
cfg.CoreOptions = append(cfg.CoreOptions, core.WithTLS(tlsConfig))
207+
}
208+
return cfg
209+
}
210+
}
211+
212+
func WithLogger(logger core.Logger) ClientOptionFunc {
213+
return func(cfg ClientConfig) ClientConfig {
214+
if logger != nil {
215+
cfg.CoreOptions = append(cfg.CoreOptions, core.WithLogger(logger))
216+
}
217+
return cfg
218+
}
219+
}
220+
221+
func WithDialDTLS(dial core.DialDTLS) ClientOptionFunc {
222+
return func(cfg ClientConfig) ClientConfig {
223+
if dial != nil {
224+
cfg.CoreOptions = append(cfg.CoreOptions, core.WithDialDTLS(dial))
225+
}
226+
return cfg
227+
}
228+
}
229+
230+
func WithDialTLS(dial core.DialTLS) ClientOptionFunc {
231+
return func(cfg ClientConfig) ClientConfig {
232+
if dial != nil {
233+
cfg.CoreOptions = append(cfg.CoreOptions, core.WithDialTLS(dial))
234+
}
235+
return cfg
236+
}
237+
}
238+
239+
func WithDialTCP(dial core.DialTCP) ClientOptionFunc {
240+
return func(cfg ClientConfig) ClientConfig {
241+
if dial != nil {
242+
cfg.CoreOptions = append(cfg.CoreOptions, core.WithDialTCP(dial))
243+
}
244+
return cfg
245+
}
246+
}
247+
248+
func WithDialUDP(dial core.DialUDP) ClientOptionFunc {
249+
return func(cfg ClientConfig) ClientConfig {
250+
if dial != nil {
251+
cfg.CoreOptions = append(cfg.CoreOptions, core.WithDialUDP(dial))
252+
}
253+
return cfg
254+
}
156255
}
157256

158257
// NewClient constructs a new local client.
159258
func NewClient(
160259
app ApplicationCallback,
161260
deviceOwner DeviceOwner,
162-
cacheExpiration time.Duration,
163-
observerPollingInterval time.Duration,
164-
opt ...core.OptionFunc,
261+
opt ...ClientOptionFunc,
165262
) (*Client, error) {
166263
if app == nil {
167264
return nil, fmt.Errorf("missing application callback")
168265
}
169266
if deviceOwner == nil {
170267
return nil, fmt.Errorf("missing device owner callback")
171268
}
172-
var coreCfg core.Config
269+
clientCfg := ClientConfig{
270+
CacheExpiration: time.Hour,
271+
Observer: ObserverConfig{
272+
PollingInterval: time.Second * 3,
273+
FailureThreshold: 3,
274+
},
275+
}
173276
for _, o := range opt {
277+
clientCfg = o(clientCfg)
278+
}
279+
280+
var coreCfg core.Config
281+
for _, o := range clientCfg.CoreOptions {
174282
coreCfg = o(coreCfg)
175283
}
176284

@@ -181,27 +289,27 @@ func NewClient(
181289
GetCertificate: deviceOwner.GetIdentityCertificate,
182290
GetCertificateAuthorities: deviceOwner.GetIdentityCACerts,
183291
}
184-
opt = append(
292+
clientCfg.CoreOptions = append(
185293
[]core.OptionFunc{
186294
core.WithTLS(&tls),
187295
core.WithLogger(coreCfg.Logger),
188296
},
189-
opt...,
297+
clientCfg.CoreOptions...,
190298
)
191-
oc := core.NewClient(opt...)
299+
oc := core.NewClient(clientCfg.CoreOptions...)
192300
pollInterval := time.Second * 10
193-
if cacheExpiration/2 > pollInterval {
194-
pollInterval = cacheExpiration / 2
301+
if clientCfg.CacheExpiration/2 > pollInterval {
302+
pollInterval = clientCfg.CacheExpiration / 2
195303
}
196304
client := Client{
197-
client: oc,
198-
app: app,
199-
deviceCache: NewDeviceCache(cacheExpiration, pollInterval, coreCfg.Logger),
200-
observeResourceCache: coapSync.NewMap[string, *observationsHandler](),
201-
deviceOwner: deviceOwner,
202-
subscriptions: make(map[string]subscription),
203-
observerPollingInterval: observerPollingInterval,
204-
logger: coreCfg.Logger,
305+
client: oc,
306+
app: app,
307+
deviceCache: NewDeviceCache(clientCfg.CacheExpiration, pollInterval, coreCfg.Logger),
308+
observeResourceCache: coapSync.NewMap[string, *observationsHandler](),
309+
deviceOwner: deviceOwner,
310+
subscriptions: make(map[string]subscription),
311+
observerConfig: clientCfg.Observer,
312+
logger: coreCfg.Logger,
205313
}
206314
return &client, nil
207315
}
@@ -223,8 +331,8 @@ type Client struct {
223331

224332
deviceCache *DeviceCache
225333

226-
observeResourceCache *coapSync.Map[string, *observationsHandler]
227-
observerPollingInterval time.Duration
334+
observeResourceCache *coapSync.Map[string, *observationsHandler]
335+
observerConfig ObserverConfig
228336

229337
deviceOwner DeviceOwner
230338

client/observeDevices.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"errors"
2222
"fmt"
2323
"sync"
24-
"time"
2524

2625
"github.com/google/uuid"
2726
"github.com/plgd-dev/device/v2/client/core"
@@ -56,19 +55,19 @@ type devicesObserver struct {
5655
handler *devicesObservationHandler
5756
discoveryConfiguration core.DiscoveryConfiguration
5857

59-
cancel context.CancelFunc
60-
interval time.Duration
61-
wait func()
62-
onlineDeviceIDs map[string]struct{}
58+
cancel context.CancelFunc
59+
observerConfiguration ObserverConfig
60+
wait func()
61+
onlineDeviceIDs map[string]uint8
6362
}
6463

65-
func newDevicesObserver(c *Client, interval time.Duration, discoveryConfiguration core.DiscoveryConfiguration, handler *devicesObservationHandler) *devicesObserver {
64+
func newDevicesObserver(c *Client, observerConfiguration ObserverConfig, discoveryConfiguration core.DiscoveryConfiguration, handler *devicesObservationHandler) *devicesObserver {
6665
ctx, cancel := context.WithCancel(context.Background())
6766
var wg sync.WaitGroup
6867
obs := &devicesObserver{
6968
c: c,
7069
handler: handler,
71-
interval: interval,
70+
observerConfiguration: observerConfiguration,
7271
discoveryConfiguration: discoveryConfiguration,
7372

7473
cancel: cancel,
@@ -84,7 +83,7 @@ func newDevicesObserver(c *Client, interval time.Duration, discoveryConfiguratio
8483
}
8584

8685
func (o *devicesObserver) poll(ctx context.Context) bool {
87-
pollCtx, cancel := context.WithTimeout(ctx, o.interval)
86+
pollCtx, cancel := context.WithTimeout(ctx, o.observerConfiguration.PollingInterval)
8887
defer cancel()
8988
newDeviceIDs, err := o.observe(pollCtx)
9089
select {
@@ -101,18 +100,24 @@ func (o *devicesObserver) poll(ctx context.Context) bool {
101100
}
102101
}
103102

104-
func (o *devicesObserver) processDevices(devices *coapSync.Map[string, struct{}]) (added map[string]struct{}, removed []string, current map[string]struct{}) {
105-
current = make(map[string]struct{})
103+
func (o *devicesObserver) processDevices(devices *coapSync.Map[string, struct{}]) (added map[string]struct{}, removed []string, current map[string]uint8) {
104+
current = make(map[string]uint8)
106105
devices.Range(func(key string, value struct{}) bool {
107-
current[key] = struct{}{}
106+
current[key] = 0
108107
return true
109108
})
110109
added = make(map[string]struct{})
111110
removed = make([]string, 0, len(current))
112-
for deviceID := range o.onlineDeviceIDs {
111+
for deviceID, failures := range o.onlineDeviceIDs {
113112
_, ok := current[deviceID]
114113
if !ok {
115-
removed = append(removed, deviceID)
114+
// we start counting from 0 so we need to subtract 1
115+
maxFailures := o.observerConfiguration.FailureThreshold - 1
116+
if failures < maxFailures {
117+
current[deviceID] = failures + 1
118+
} else {
119+
removed = append(removed, deviceID)
120+
}
116121
}
117122
}
118123
for deviceID := range current {
@@ -178,7 +183,7 @@ func (o *devicesObserver) discover(ctx context.Context, handler core.DiscoverDev
178183
return core.DiscoverDevices(ctx, multicastConn, handler, coap.WithResourceType(device.ResourceType))
179184
}
180185

181-
func (o *devicesObserver) observe(ctx context.Context) (map[string]struct{}, error) {
186+
func (o *devicesObserver) observe(ctx context.Context) (map[string]uint8, error) {
182187
newDevices := listDeviceIds{
183188
err: func(err error) { o.c.logger.Debug(err.Error()) },
184189
devices: coapSync.NewMap[string, struct{}](),
@@ -315,7 +320,7 @@ func (c *Client) ObserveDevices(ctx context.Context, handler DevicesObservationH
315320
return "", err
316321
}
317322

318-
obs := newDevicesObserver(c, c.observerPollingInterval, cfg.discoveryConfiguration, &devicesObservationHandler{
323+
obs := newDevicesObserver(c, c.observerConfig, cfg.discoveryConfiguration, &devicesObservationHandler{
319324
handler: handler,
320325
removeSubscription: func() {
321326
c.stopObservingDevices(ID.String())

client/onboardDevice.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,6 @@ func setACLForCloud(ctx context.Context, p *core.ProvisioningClient, cloudID str
4949
return err
5050
}
5151

52-
var acls acl.Response
53-
err = p.GetResource(ctx, link, &acls)
54-
if err != nil {
55-
return err
56-
}
57-
58-
for _, acl := range acls.AccessControlList {
59-
if acl.Subject.Subject_Device != nil {
60-
if acl.Subject.Subject_Device.DeviceID == cloudID {
61-
return nil
62-
}
63-
}
64-
}
6552
confResources := acl.AllResources
6653
for _, href := range links.GetResourceHrefs(softwareupdate.ResourceType) {
6754
confResources = append(confResources, acl.Resource{

test/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ var (
6161

6262
const (
6363
TestResourceSwitchesHref = "/switches"
64+
DockerDevsimName = "devsim-net-host"
6465
)
6566

6667
func TestResourceSwitchesInstanceHref(id string) string {

test/test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"crypto/x509"
2323
"fmt"
2424
"os"
25+
"os/exec"
2526
"strings"
2627
"sync/atomic"
2728
"testing"
@@ -285,3 +286,17 @@ func CheckResourceLinks(t *testing.T, expected, actual schema.ResourceLinks) {
285286
}
286287
require.Empty(t, expLinks)
287288
}
289+
290+
func DockerStopDevsim(t *testing.T) {
291+
cmd := exec.Command("docker")
292+
cmd.Args = []string{"docker", "kill", DockerDevsimName}
293+
err := cmd.Run()
294+
require.NoError(t, err)
295+
}
296+
297+
func DockerStartDevsim(t *testing.T) {
298+
cmd := exec.Command("docker")
299+
cmd.Args = []string{"docker", "start", DockerDevsimName}
300+
err := cmd.Run()
301+
require.NoError(t, err)
302+
}

0 commit comments

Comments
 (0)