Skip to content

Commit df64d3a

Browse files
authored
Update fswatcher to use /events/batcher (dapr#75)
* Update fswatcher to use /events/batcher Signed-off-by: joshvanl <[email protected]> * Linting Signed-off-by: joshvanl <[email protected]> * Linting Signed-off-by: joshvanl <[email protected]> * Add sleep to wait for windows fsnotify to become ready Signed-off-by: joshvanl <[email protected]> * Increase time for event to be received to 1 second Signed-off-by: joshvanl <[email protected]> --------- Signed-off-by: joshvanl <[email protected]>
1 parent 0e1fd37 commit df64d3a

File tree

7 files changed

+414
-245
lines changed

7 files changed

+414
-245
lines changed

fswatcher/fswatcher.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
Copyright 2023 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
*/
15+
16+
package fswatcher
17+
18+
import (
19+
"context"
20+
"errors"
21+
"fmt"
22+
"sync/atomic"
23+
"time"
24+
25+
"github.com/fsnotify/fsnotify"
26+
27+
"github.com/dapr/kit/events/batcher"
28+
)
29+
30+
// Options are the options for the FSWatcher.
31+
type Options struct {
32+
// Targets is a list of directories to watch for changes.
33+
Targets []string
34+
35+
// Interval is the interval to wait before sending a notification after a file has changed.
36+
// Default to 500ms.
37+
Interval *time.Duration
38+
}
39+
40+
// FSWatcher watches for changes to a directory on the filesystem and sends a notification to eventCh every time a file in the folder is changed.
41+
// Although it's possible to watch for individual files, that's not recommended; watch for the file's parent folder instead.
42+
// That is because, like in Kubernetes which uses system links on mounted volumes, the file may be deleted and recreated with a different inode.
43+
// Note that changes are batched for 0.5 seconds before notifications are sent as events on a single file often come in batches.
44+
type FSWatcher struct {
45+
w *fsnotify.Watcher
46+
running atomic.Bool
47+
batcher *batcher.Batcher[string]
48+
}
49+
50+
func New(opts Options) (*FSWatcher, error) {
51+
w, err := fsnotify.NewWatcher()
52+
if err != nil {
53+
return nil, fmt.Errorf("failed to create watcher: %w", err)
54+
}
55+
56+
for _, target := range opts.Targets {
57+
if err = w.Add(target); err != nil {
58+
return nil, fmt.Errorf("failed to add target %s: %w", target, err)
59+
}
60+
}
61+
62+
interval := time.Millisecond * 500
63+
if opts.Interval != nil {
64+
interval = *opts.Interval
65+
}
66+
if interval < 0 {
67+
return nil, errors.New("interval must be positive")
68+
}
69+
70+
return &FSWatcher{
71+
w: w,
72+
// Often the case, writes to files are not atomic and involve multiple file system events.
73+
// We want to hold off on sending events until we are sure that the file has been written to completion. We do this by waiting for a period of time after the last event has been received for a file name.
74+
batcher: batcher.New[string](interval),
75+
}, nil
76+
}
77+
78+
func (f *FSWatcher) Run(ctx context.Context, eventCh chan<- struct{}) error {
79+
if !f.running.CompareAndSwap(false, true) {
80+
return errors.New("watcher already running")
81+
}
82+
83+
f.batcher.Subscribe(eventCh)
84+
85+
for {
86+
select {
87+
case <-ctx.Done():
88+
return f.w.Close()
89+
case err := <-f.w.Errors:
90+
return errors.Join(fmt.Errorf("watcher error: %w", err), f.w.Close())
91+
case event := <-f.w.Events:
92+
f.batcher.Batch(event.Name)
93+
}
94+
}
95+
}

fswatcher/fswatcher_test.go

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
Copyright 2023 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
*/
15+
16+
package fswatcher
17+
18+
import (
19+
"context"
20+
"os"
21+
"path/filepath"
22+
"runtime"
23+
"testing"
24+
"time"
25+
26+
"github.com/stretchr/testify/assert"
27+
"github.com/stretchr/testify/require"
28+
clocktesting "k8s.io/utils/clock/testing"
29+
30+
"github.com/dapr/kit/events/batcher"
31+
"github.com/dapr/kit/ptr"
32+
)
33+
34+
func TestFSWatcher(t *testing.T) {
35+
runWatcher := func(t *testing.T, opts Options, bacher *batcher.Batcher[string]) <-chan struct{} {
36+
t.Helper()
37+
38+
f, err := New(opts)
39+
require.NoError(t, err)
40+
41+
if bacher != nil {
42+
f.WithBatcher(bacher)
43+
}
44+
45+
errCh := make(chan error)
46+
ctx, cancel := context.WithCancel(context.Background())
47+
eventsCh := make(chan struct{})
48+
go func() {
49+
errCh <- f.Run(ctx, eventsCh)
50+
}()
51+
52+
t.Cleanup(func() {
53+
cancel()
54+
select {
55+
case err := <-errCh:
56+
require.NoError(t, err)
57+
case <-time.After(time.Second):
58+
assert.Fail(t, "timeout waiting for watcher to stop")
59+
}
60+
})
61+
62+
assert.Eventually(t, f.running.Load, time.Second, time.Millisecond*10)
63+
return eventsCh
64+
}
65+
66+
t.Run("creating fswatcher with no directory should not error", func(t *testing.T) {
67+
runWatcher(t, Options{}, nil)
68+
})
69+
70+
t.Run("creating fswatcher with 0 interval should not error", func(t *testing.T) {
71+
_, err := New(Options{
72+
Interval: ptr.Of(time.Duration(0)),
73+
})
74+
require.NoError(t, err)
75+
})
76+
77+
t.Run("creating fswatcher with negative interval should error", func(t *testing.T) {
78+
_, err := New(Options{
79+
Interval: ptr.Of(time.Duration(-1)),
80+
})
81+
require.Error(t, err)
82+
})
83+
84+
t.Run("running Run twice should error", func(t *testing.T) {
85+
fs, err := New(Options{})
86+
require.NoError(t, err)
87+
ctx, cancel := context.WithCancel(context.Background())
88+
cancel()
89+
require.NoError(t, fs.Run(ctx, make(chan struct{})))
90+
require.Error(t, fs.Run(ctx, make(chan struct{})))
91+
})
92+
93+
t.Run("creating fswatcher with non-existent directory should error", func(t *testing.T) {
94+
dir := t.TempDir()
95+
require.NoError(t, os.RemoveAll(dir))
96+
_, err := New(Options{
97+
Targets: []string{dir},
98+
})
99+
require.Error(t, err)
100+
})
101+
102+
t.Run("should fire event when event occurs on target file", func(t *testing.T) {
103+
fp := filepath.Join(t.TempDir(), "test.txt")
104+
require.NoError(t, os.WriteFile(fp, []byte{}, 0o644))
105+
eventsCh := runWatcher(t, Options{
106+
Targets: []string{fp},
107+
Interval: ptr.Of(time.Duration(1)),
108+
}, nil)
109+
assert.Empty(t, eventsCh)
110+
111+
if runtime.GOOS == "windows" {
112+
// If running in windows, wait for notify to be ready.
113+
time.Sleep(time.Second)
114+
}
115+
require.NoError(t, os.WriteFile(fp, []byte{}, 0o644))
116+
117+
select {
118+
case <-eventsCh:
119+
case <-time.After(time.Second):
120+
assert.Fail(t, "timeout waiting for event")
121+
}
122+
})
123+
124+
t.Run("should fire 2 events when event occurs on 2 file target", func(t *testing.T) {
125+
fp1 := filepath.Join(t.TempDir(), "test.txt")
126+
fp2 := filepath.Join(t.TempDir(), "test.txt")
127+
require.NoError(t, os.WriteFile(fp1, []byte{}, 0o644))
128+
require.NoError(t, os.WriteFile(fp2, []byte{}, 0o644))
129+
eventsCh := runWatcher(t, Options{
130+
Targets: []string{fp1, fp2},
131+
Interval: ptr.Of(time.Duration(1)),
132+
}, nil)
133+
assert.Empty(t, eventsCh)
134+
require.NoError(t, os.WriteFile(fp1, []byte{}, 0o644))
135+
require.NoError(t, os.WriteFile(fp2, []byte{}, 0o644))
136+
for i := 0; i < 2; i++ {
137+
select {
138+
case <-eventsCh:
139+
case <-time.After(time.Second):
140+
assert.Fail(t, "timeout waiting for event")
141+
}
142+
}
143+
})
144+
145+
t.Run("should fire 2 events when event occurs on 2 files inside target directory", func(t *testing.T) {
146+
dir := t.TempDir()
147+
fp1 := filepath.Join(dir, "test1.txt")
148+
fp2 := filepath.Join(dir, "test2.txt")
149+
require.NoError(t, os.WriteFile(fp1, []byte{}, 0o644))
150+
require.NoError(t, os.WriteFile(fp2, []byte{}, 0o644))
151+
eventsCh := runWatcher(t, Options{
152+
Targets: []string{fp1, fp2},
153+
Interval: ptr.Of(time.Duration(1)),
154+
}, nil)
155+
if runtime.GOOS == "windows" {
156+
// If running in windows, wait for notify to be ready.
157+
time.Sleep(time.Second)
158+
}
159+
assert.Empty(t, eventsCh)
160+
require.NoError(t, os.WriteFile(fp1, []byte{}, 0o644))
161+
require.NoError(t, os.WriteFile(fp2, []byte{}, 0o644))
162+
for i := 0; i < 2; i++ {
163+
select {
164+
case <-eventsCh:
165+
case <-time.After(time.Second):
166+
assert.Fail(t, "timeout waiting for event")
167+
}
168+
}
169+
})
170+
171+
t.Run("should fire 2 events when event occurs on 2 target directories", func(t *testing.T) {
172+
dir1 := t.TempDir()
173+
dir2 := t.TempDir()
174+
fp1 := filepath.Join(dir1, "test1.txt")
175+
fp2 := filepath.Join(dir2, "test2.txt")
176+
eventsCh := runWatcher(t, Options{
177+
Targets: []string{dir1, dir2},
178+
Interval: ptr.Of(time.Duration(1)),
179+
}, nil)
180+
assert.Empty(t, eventsCh)
181+
require.NoError(t, os.WriteFile(fp1, []byte{}, 0o644))
182+
require.NoError(t, os.WriteFile(fp2, []byte{}, 0o644))
183+
for i := 0; i < 2; i++ {
184+
select {
185+
case <-eventsCh:
186+
case <-time.After(time.Second):
187+
assert.Fail(t, "timeout waiting for event")
188+
}
189+
}
190+
})
191+
192+
t.Run("should batch events of the same file for multiple events", func(t *testing.T) {
193+
clock := clocktesting.NewFakeClock(time.Time{})
194+
batcher := batcher.New[string](time.Millisecond * 500)
195+
batcher.WithClock(clock)
196+
dir1 := t.TempDir()
197+
dir2 := t.TempDir()
198+
fp1 := filepath.Join(dir1, "test1.txt")
199+
fp2 := filepath.Join(dir2, "test2.txt")
200+
eventsCh := runWatcher(t, Options{Targets: []string{dir1, dir2}}, batcher)
201+
assert.Empty(t, eventsCh)
202+
203+
if runtime.GOOS == "windows" {
204+
// If running in windows, wait for notify to be ready.
205+
time.Sleep(time.Second)
206+
}
207+
208+
for i := 0; i < 10; i++ {
209+
require.NoError(t, os.WriteFile(fp1, []byte{}, 0o644))
210+
require.NoError(t, os.WriteFile(fp2, []byte{}, 0o644))
211+
}
212+
213+
assert.Eventually(t, clock.HasWaiters, time.Second, time.Millisecond*10)
214+
215+
select {
216+
case <-eventsCh:
217+
assert.Fail(t, "unexpected event")
218+
case <-time.After(time.Millisecond * 10):
219+
}
220+
221+
clock.Step(time.Millisecond * 250)
222+
223+
for i := 0; i < 10; i++ {
224+
require.NoError(t, os.WriteFile(fp1, []byte{}, 0o644))
225+
require.NoError(t, os.WriteFile(fp2, []byte{}, 0o644))
226+
}
227+
228+
select {
229+
case <-eventsCh:
230+
assert.Fail(t, "unexpected event")
231+
case <-time.After(time.Millisecond * 10):
232+
}
233+
234+
assert.Eventually(t, clock.HasWaiters, time.Second, time.Millisecond*10)
235+
clock.Step(time.Millisecond * 500)
236+
237+
for i := 0; i < 2; i++ {
238+
select {
239+
case <-eventsCh:
240+
case <-time.After(time.Second):
241+
assert.Fail(t, "timeout waiting for event")
242+
}
243+
clock.Step(1)
244+
}
245+
})
246+
}

fswatcher/unit.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
//go:build unit
2+
// +build unit
3+
4+
/*
5+
Copyright 2023 The Dapr Authors
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package fswatcher
20+
21+
import (
22+
"github.com/dapr/kit/events/batcher"
23+
)
24+
25+
func (f *FSWatcher) WithBatcher(b *batcher.Batcher[string]) *FSWatcher {
26+
f.batcher = b
27+
return f
28+
}

0 commit comments

Comments
 (0)