Skip to content

Commit 666b490

Browse files
committed
feat: add watch code chassis
1 parent 6624dd9 commit 666b490

File tree

15 files changed

+357
-145
lines changed

15 files changed

+357
-145
lines changed

cmd/onex-nightwatch/app/options/options.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
clientset "github.com/superproj/onex/pkg/generated/clientset/versioned"
2323
"github.com/superproj/onex/pkg/log"
2424
genericoptions "github.com/superproj/onex/pkg/options"
25+
"github.com/superproj/onex/pkg/watch"
2526
)
2627

2728
const (
@@ -36,8 +37,8 @@ type Options struct {
3637
HealthOptions *genericoptions.HealthOptions `json:"health" mapstructure:"health"`
3738
MySQLOptions *genericoptions.MySQLOptions `json:"mysql" mapstructure:"mysql"`
3839
RedisOptions *genericoptions.RedisOptions `json:"redis" mapstructure:"redis"`
40+
WatchOptions *watch.Options `json:"nightwatch" mapstructure:"nightwatch"`
3941
UserWatcherMaxWorkers int64 `json:"user-watcher-max-workers" mapstructure:"user-watcher-max-workers"`
40-
DisableWatchers []string `json:"disable-watchers" mapstructure:"disable-watchers"`
4142
Metrics *genericoptions.MetricsOptions `json:"metrics" mapstructure:"metrics"`
4243
// Path to kubeconfig file with authorization and master location information.
4344
Kubeconfig string `json:"kubeconfig" mapstructure:"kubeconfig"`
@@ -52,7 +53,7 @@ func NewOptions() *Options {
5253
MySQLOptions: genericoptions.NewMySQLOptions(),
5354
RedisOptions: genericoptions.NewRedisOptions(),
5455
UserWatcherMaxWorkers: math.MaxInt64,
55-
DisableWatchers: []string{},
56+
WatchOptions: watch.NewOptions(),
5657
Metrics: genericoptions.NewMetricsOptions(),
5758
Log: log.NewOptions(),
5859
}
@@ -65,6 +66,7 @@ func (o *Options) Flags() (fss cliflag.NamedFlagSets) {
6566
o.HealthOptions.AddFlags(fss.FlagSet("health"))
6667
o.MySQLOptions.AddFlags(fss.FlagSet("mysql"))
6768
o.RedisOptions.AddFlags(fss.FlagSet("redis"))
69+
o.WatchOptions.AddFlags(fss.FlagSet("watch"))
6870
o.Metrics.AddFlags(fss.FlagSet("metrics"))
6971
o.Log.AddFlags(fss.FlagSet("log"))
7072

@@ -73,7 +75,6 @@ func (o *Options) Flags() (fss cliflag.NamedFlagSets) {
7375
fs := fss.FlagSet("misc")
7476
fs.StringVar(&o.Kubeconfig, "kubeconfig", o.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
7577
fs.Int64Var(&o.UserWatcherMaxWorkers, "user-watcher-max-workers", o.UserWatcherMaxWorkers, "Specify the maximum concurrency event of user watcher.")
76-
fs.StringSliceVar(&o.DisableWatchers, "disable-watchers", o.DisableWatchers, "The list of watchers that should be disabled.")
7778
feature.DefaultMutableFeatureGate.AddFlag(fs)
7879

7980
return fss
@@ -100,6 +101,7 @@ func (o *Options) Validate() error {
100101
errs = append(errs, o.HealthOptions.Validate()...)
101102
errs = append(errs, o.MySQLOptions.Validate()...)
102103
errs = append(errs, o.RedisOptions.Validate()...)
104+
errs = append(errs, o.WatchOptions.Validate()...)
103105
errs = append(errs, o.Metrics.Validate()...)
104106
errs = append(errs, o.Log.Validate()...)
105107

@@ -110,8 +112,8 @@ func (o *Options) Validate() error {
110112
func (o *Options) ApplyTo(c *nightwatch.Config) error {
111113
c.MySQLOptions = o.MySQLOptions
112114
c.RedisOptions = o.RedisOptions
115+
c.WatchOptions = o.WatchOptions
113116
c.UserWatcherMaxWorkers = o.UserWatcherMaxWorkers
114-
c.DisableWatchers = o.DisableWatchers
115117
return nil
116118
}
117119

internal/nightwatch/nightwatch.go

Lines changed: 19 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -7,51 +7,34 @@
77
package nightwatch
88

99
import (
10-
"context"
11-
"errors"
12-
"time"
13-
14-
"github.com/go-redsync/redsync/v4"
15-
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
1610
"github.com/jinzhu/copier"
17-
"github.com/robfig/cron/v3"
1811
"k8s.io/apimachinery/pkg/util/wait"
1912

2013
"github.com/superproj/onex/internal/nightwatch/watcher"
2114
"github.com/superproj/onex/pkg/db"
2215
clientset "github.com/superproj/onex/pkg/generated/clientset/versioned"
2316
"github.com/superproj/onex/pkg/log"
17+
"github.com/superproj/onex/pkg/watch"
18+
onexlogger "github.com/superproj/onex/pkg/watch/logger/onex"
2419

2520
// trigger init functions in `internal/nightwatch/watcher/all`.
2621
_ "github.com/superproj/onex/internal/nightwatch/watcher/all"
2722
genericoptions "github.com/superproj/onex/pkg/options"
28-
stringsutil "github.com/superproj/onex/pkg/util/strings"
29-
)
30-
31-
var (
32-
lockName = "onex-nightwatch-lock2"
33-
jobStopTimeout = 3 * time.Minute
34-
extendExpiration = 5 * time.Second
35-
defaultExpiration = 10 * extendExpiration
3623
)
3724

3825
type nightWatch struct {
39-
runner *cron.Cron
40-
// distributed lock
41-
locker *redsync.Mutex
42-
config *watcher.Config
43-
disableWatchers []string
26+
*watch.Watch
4427
}
4528

4629
// Config is the configuration for the nightwatch server.
4730
type Config struct {
4831
MySQLOptions *genericoptions.MySQLOptions
4932
RedisOptions *genericoptions.RedisOptions
33+
WatchOptions *watch.Options
5034
// The maximum concurrency event of user watcher.
5135
UserWatcherMaxWorkers int64
5236
// The list of watchers that should be disabled.
53-
DisableWatchers []string
54-
Client clientset.Interface
37+
Client clientset.Interface
5538
}
5639

5740
// CompletedConfig same as Config, just to swap private object.
@@ -65,7 +48,7 @@ func (c *Config) Complete() *CompletedConfig {
6548
}
6649

6750
// CreateWatcherConfig used to create configuration used by all watcher.
68-
func (c *Config) CreateWatcherConfig() (*watcher.Config, error) {
51+
func (c *Config) CreateWatcherConfig() (*watcher.AggregateConfig, error) {
6952
var mysqlOptions db.MySQLOptions
7053
_ = copier.Copy(&mysqlOptions, c.MySQLOptions)
7154
storeClient, err := wireStoreClient(&mysqlOptions)
@@ -74,7 +57,7 @@ func (c *Config) CreateWatcherConfig() (*watcher.Config, error) {
7457
return nil, err
7558
}
7659

77-
return &watcher.Config{
60+
return &watcher.AggregateConfig{
7861
Store: storeClient,
7962
Client: c.Client,
8063
UserWatcherMaxWorkers: c.UserWatcherMaxWorkers,
@@ -83,118 +66,36 @@ func (c *Config) CreateWatcherConfig() (*watcher.Config, error) {
8366

8467
// New creates an asynchronous task instance.
8568
func (c *Config) New() (*nightWatch, error) {
86-
// Create a pool with go-redis which is the pool redisync will
87-
// use while communicating with Redis. This can also be any pool that
88-
// implements the `redis.Pool` interface.
8969
client, err := c.RedisOptions.NewClient()
9070
if err != nil {
9171
log.Errorw(err, "Failed to create Redis client")
9272
return nil, err
9373
}
9474

95-
logger := newCronLogger()
96-
runner := cron.New(
97-
cron.WithSeconds(),
98-
cron.WithLogger(logger),
99-
cron.WithChain(cron.SkipIfStillRunning(logger), cron.Recover(logger)),
100-
)
101-
102-
pool := goredis.NewPool(client)
103-
lockOpts := []redsync.Option{
104-
redsync.WithRetryDelay(50 * time.Microsecond),
105-
redsync.WithTries(3),
106-
redsync.WithExpiry(defaultExpiration),
107-
}
108-
// Create an instance of redisync and obtain a new mutex by using the same name
109-
// for all instances wanting the same lock.
110-
locker := redsync.New(pool).NewMutex(lockName, lockOpts...)
111-
11275
cfg, err := c.CreateWatcherConfig()
11376
if err != nil {
11477
return nil, err
11578
}
11679

117-
nw := &nightWatch{runner: runner, locker: locker, config: cfg, disableWatchers: c.DisableWatchers}
118-
if err := nw.addWatchers(); err != nil {
119-
return nil, err
80+
initialize := watcher.NewWatcherInitializer(cfg.Store, cfg.Client, cfg.UserWatcherMaxWorkers)
81+
opts := []watch.Option{
82+
watch.WithInitialize(initialize),
83+
watch.WithLogger(onexlogger.NewLogger()),
84+
watch.WithLockName("onex-nightwatch-lock"),
12085
}
12186

122-
return nw, nil
123-
}
124-
125-
// addWatchers used to initialize all registered watchers and add the watchers as a Cron job.
126-
func (nw *nightWatch) addWatchers() error {
127-
for n, w := range watcher.ListWatchers() {
128-
if stringsutil.StringIn(n, nw.disableWatchers) {
129-
continue
130-
}
131-
132-
if err := w.Initialize(context.Background(), nw.config); err != nil {
133-
log.Errorw(err, "Failed to construct watcher", "watcher", n)
134-
return err
135-
}
136-
137-
spec := watcher.Every3Seconds
138-
if obj, ok := w.(watcher.ISpec); ok {
139-
spec = obj.Spec()
140-
}
141-
142-
if _, err := nw.runner.AddJob(spec, w); err != nil {
143-
log.Errorw(err, "Failed to add job to the cron", "watcher", n)
144-
return err
145-
}
87+
nw, err := watch.NewWatch(c.WatchOptions, client, opts...)
88+
if err != nil {
89+
return nil, err
14690
}
14791

148-
return nil
92+
return &nightWatch{nw}, nil
14993
}
15094

15195
// Run keep retrying to acquire lock and then start the Cron job.
15296
func (nw *nightWatch) Run(stopCh <-chan struct{}) {
153-
ctx := wait.ContextForChannel(stopCh)
154-
155-
ticker := time.NewTicker(defaultExpiration + (5 * time.Second))
156-
for {
157-
// Obtain a lock for our given mutex. After this is successful, no one else
158-
// can obtain the same lock (the same mutex name) until we unlock it.
159-
err := nw.locker.LockContext(ctx)
160-
if err == nil {
161-
log.Debugw("Successfully acquired lock", "lockName", lockName)
162-
break
163-
}
164-
log.Debugw("Failed to acquire lock.", "lockName", lockName, "err", err)
165-
<-ticker.C
166-
}
167-
168-
go func() {
169-
ticker := time.NewTicker(extendExpiration)
170-
for {
171-
<-ticker.C
172-
if ok, err := nw.locker.ExtendContext(ctx); !ok || err != nil {
173-
log.Debugw("Failed to extend mutex", "err", err, "status", ok)
174-
}
175-
}
176-
}()
177-
178-
nw.runner.Start()
179-
log.Infof("Successfully started nightwatch server")
180-
97+
nw.Start(wait.ContextForChannel(stopCh))
98+
// graceful shutdown
18199
<-stopCh
182-
183-
nw.stop()
184-
}
185-
186-
// stop used to blocking waits for the job to complete and releases the lock.
187-
func (nw *nightWatch) stop() {
188-
ctx := nw.runner.Stop()
189-
select {
190-
case <-ctx.Done():
191-
case <-time.After(jobStopTimeout):
192-
log.Errorw(errors.New("context was not done immediately"), "timeout", jobStopTimeout.String())
193-
}
194-
195-
if ok, err := nw.locker.Unlock(); !ok || err != nil {
196-
log.Debugw("Failed to unlock", "err", err, "status", ok)
197-
}
198-
199-
log.Infof("Successfully stopped nightwatch server")
100+
nw.Stop()
200101
}

internal/nightwatch/watcher/clean/watcher.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ import (
1313
"github.com/superproj/onex/internal/nightwatch/watcher"
1414
"github.com/superproj/onex/internal/pkg/client/store"
1515
"github.com/superproj/onex/pkg/log"
16+
"github.com/superproj/onex/pkg/watch"
1617
)
1718

18-
var _ watcher.Watcher = (*cleanWatcher)(nil)
19+
var _ watch.Watcher = (*cleanWatcher)(nil)
1920

2021
// watcher implement.
2122
type cleanWatcher struct {
@@ -35,12 +36,11 @@ func (w *cleanWatcher) Run() {
3536
}
3637
}
3738

38-
// Initialize initializes the watcher for later execution.
39-
func (w *cleanWatcher) Initialize(ctx context.Context, config *watcher.Config) error {
39+
// SetAggregateConfig initializes the watcher for later execution.
40+
func (w *cleanWatcher) SetAggregateConfig(config *watcher.AggregateConfig) {
4041
w.store = config.Store
41-
return nil
4242
}
4343

4444
func init() {
45-
watcher.Register("clean", &cleanWatcher{})
45+
watch.Register("clean", &cleanWatcher{})
4646
}

internal/nightwatch/watcher/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import (
1212
clientset "github.com/superproj/onex/pkg/generated/clientset/versioned"
1313
)
1414

15-
// Config aggregates the configurations of all watchers and serves as a configuration aggregator.
16-
type Config struct {
15+
// AggregateConfig aggregates the configurations of all watchers and serves as a configuration aggregator.
16+
type AggregateConfig struct {
1717
// The purpose of nightwatch is to handle asynchronous tasks on the onex platform
1818
// in a unified manner, so a store aggregation type is needed here.
1919
Store store.Interface
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package watcher
2+
3+
import (
4+
"github.com/superproj/onex/internal/pkg/client/store"
5+
clientset "github.com/superproj/onex/pkg/generated/clientset/versioned"
6+
"github.com/superproj/onex/pkg/watch"
7+
)
8+
9+
// WatcherInitializer is used for initialization of the onex specific watcher plugins.
10+
type WatcherInitializer struct {
11+
// The purpose of nightwatch is to handle asynchronous tasks on the onex platform
12+
// in a unified manner, so a store aggregation type is needed here.
13+
store store.Interface
14+
15+
// Client is the client for onex-apiserver.
16+
client clientset.Interface
17+
18+
// Then maximum concurrency event of user watcher.
19+
userWatcherMaxWorkers int64
20+
}
21+
22+
var _ watch.WatcherInitializer = &WatcherInitializer{}
23+
24+
func NewWatcherInitializer(store store.Interface, client clientset.Interface, maxWorkers int64) *WatcherInitializer {
25+
return &WatcherInitializer{
26+
store: store,
27+
client: client,
28+
userWatcherMaxWorkers: maxWorkers,
29+
}
30+
}
31+
32+
func (w *WatcherInitializer) Initialize(wc watch.Watcher) {
33+
// We can set a specific configuration as needed, as shown in the example below.
34+
// However, for convenience, I directly assign all configurations to each watcher,
35+
// allowing the watcher to choose which ones to use.
36+
if wants, ok := wc.(WantsStore); ok {
37+
wants.SetStore(w.store)
38+
}
39+
40+
if wants, ok := wc.(WantsAggregateConfig); ok {
41+
wants.SetAggregateConfig(&AggregateConfig{
42+
Store: w.store,
43+
Client: w.client,
44+
UserWatcherMaxWorkers: w.userWatcherMaxWorkers,
45+
})
46+
}
47+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package watcher
2+
3+
import (
4+
"github.com/superproj/onex/internal/pkg/client/store"
5+
"github.com/superproj/onex/pkg/watch"
6+
)
7+
8+
// WantsAggregateConfig defines a function which sets AggregateConfig for watcher plugins that need it.
9+
type WantsAggregateConfig interface {
10+
watch.Watcher
11+
SetAggregateConfig(config *AggregateConfig)
12+
}
13+
14+
// WantsStore defines a function which sets store for watcher plugins that need it.
15+
type WantsStore interface {
16+
watch.Watcher
17+
SetStore(store store.Interface)
18+
}

0 commit comments

Comments
 (0)