Skip to content

Commit 8c9f4cc

Browse files
committed
gopls/internal/filewatcher: refactor filewatcher to pass in handler func
Instead of communicating between the client and the file watcher through two channels (event channel and error channel), the client can pass in a handler call back function that will be called upon events or errors. For golang/go#74292 Change-Id: Ibf772ad358e00cb0ffb729495860ea7b5f8c740c Reviewed-on: https://go-review.googlesource.com/c/tools/+/686838 LUCI-TryBot-Result: Go LUCI <[email protected]> Reviewed-by: Alan Donovan <[email protected]>
1 parent 63f81c9 commit 8c9f4cc

File tree

3 files changed

+139
-139
lines changed

3 files changed

+139
-139
lines changed

gopls/internal/cmd/mcp.go

Lines changed: 51 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"io"
1212
"log"
1313
"os"
14+
"sync"
1415
"time"
1516

1617
"golang.org/x/tools/gopls/internal/filewatcher"
@@ -68,35 +69,65 @@ func (m *headlessMCP) Run(ctx context.Context, args ...string) error {
6869
}
6970
defer cli.terminate(ctx)
7071

71-
w, eventsChan, errorChan, err := filewatcher.New(1*time.Second, nil)
72-
if err != nil {
73-
return err
74-
}
75-
defer w.Close()
72+
var (
73+
queueMu sync.Mutex
74+
queue []protocol.FileEvent
75+
nonempty = make(chan struct{}) // receivable when len(queue) > 0
76+
stop = make(chan struct{}) // closed when Run returns
77+
)
78+
defer close(stop)
7679

77-
// Start listening for events.
80+
// This goroutine forwards file change events to the LSP server.
7881
go func() {
7982
for {
8083
select {
81-
case events, ok := <-eventsChan:
82-
if !ok {
83-
return
84-
}
85-
if err := cli.server.DidChangeWatchedFiles(ctx, &protocol.DidChangeWatchedFilesParams{
86-
Changes: events,
87-
}); err != nil {
88-
log.Printf("failed to notify changed files: %v", err)
89-
}
90-
case err, ok := <-errorChan:
91-
if !ok {
92-
return
93-
}
94-
log.Printf("error found: %v", err)
84+
case <-stop:
9585
return
86+
case <-nonempty:
87+
queueMu.Lock()
88+
q := queue
89+
queue = nil
90+
queueMu.Unlock()
91+
92+
if len(q) > 0 {
93+
if err := cli.server.DidChangeWatchedFiles(ctx, &protocol.DidChangeWatchedFilesParams{
94+
Changes: q,
95+
}); err != nil {
96+
log.Printf("failed to notify changed files: %v", err)
97+
}
98+
}
99+
96100
}
97101
}
98102
}()
99103

104+
w, err := filewatcher.New(500*time.Millisecond, nil, func(events []protocol.FileEvent, err error) {
105+
if err != nil {
106+
log.Printf("watch error: %v", err)
107+
return
108+
}
109+
110+
if len(events) == 0 {
111+
return
112+
}
113+
114+
// Since there is no promise [protocol.Server.DidChangeWatchedFiles]
115+
// will return immediately, we should buffer the captured events and
116+
// sent them whenever available in a separate go routine.
117+
queueMu.Lock()
118+
queue = append(queue, events...)
119+
queueMu.Unlock()
120+
121+
select {
122+
case nonempty <- struct{}{}:
123+
default:
124+
}
125+
})
126+
if err != nil {
127+
return err
128+
}
129+
defer w.Close()
130+
100131
// TODO(hxjiang): replace this with LSP initial param workspace root.
101132
dir, err := os.Getwd()
102133
if err != nil {

gopls/internal/filewatcher/filewatcher.go

Lines changed: 60 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -17,54 +17,55 @@ import (
1717
"golang.org/x/tools/gopls/internal/protocol"
1818
)
1919

20-
// FileWatcher collects events from a [fsnotify.Watcher] and converts them into
21-
// batched LSP [protocol.FileEvent]s. Events are debounced and sent to the
22-
// event channel after a configurable period of no new relevant activity.
23-
type FileWatcher struct {
20+
// Watcher collects events from a [fsnotify.Watcher] and converts them into
21+
// batched LSP [protocol.FileEvent]s.
22+
type Watcher struct {
2423
logger *slog.Logger
2524

26-
closed chan struct{}
25+
stop chan struct{} // closed by Close to terminate run loop
2726

28-
wg sync.WaitGroup
27+
wg sync.WaitGroup // counts number of active run goroutines (max 1)
2928

30-
mu sync.Mutex
29+
watcher *fsnotify.Watcher
30+
31+
mu sync.Mutex // guards all fields below
3132

3233
// watchedDirs keeps track of which directories are being watched by the
3334
// watcher, explicitly added via [fsnotify.Watcher.Add].
3435
watchedDirs map[string]bool
35-
watcher *fsnotify.Watcher
3636

37-
// events is the current batch of unsent [protocol.FileEvent]s, which will
38-
// be sent when the timer expires.
37+
// events is the current batch of unsent file events, which will be sent
38+
// when the timer expires.
3939
events []protocol.FileEvent
4040
}
4141

42-
// New creates a new FileWatcher and starts its event-handling loop. The
43-
// [FileWatcher.Close] should be called to cleanup.
44-
func New(delay time.Duration, logger *slog.Logger) (*FileWatcher, <-chan []protocol.FileEvent, <-chan error, error) {
42+
// New creates a new file watcher and starts its event-handling loop. The
43+
// [Watcher.Close] method must be called to clean up resources.
44+
//
45+
// The provided handler is called sequentially with either a batch of file
46+
// events or an error. Events and errors may be interleaved. The watcher blocks
47+
// until the handler returns, so the handler should be fast and non-blocking.
48+
func New(delay time.Duration, logger *slog.Logger, handler func([]protocol.FileEvent, error)) (*Watcher, error) {
4549
watcher, err := fsnotify.NewWatcher()
4650
if err != nil {
47-
return nil, nil, nil, err
51+
return nil, err
4852
}
49-
w := &FileWatcher{
53+
w := &Watcher{
5054
logger: logger,
5155
watcher: watcher,
5256
watchedDirs: make(map[string]bool),
53-
closed: make(chan struct{}),
57+
stop: make(chan struct{}),
5458
}
5559

56-
eventsChan := make(chan []protocol.FileEvent)
57-
errorChan := make(chan error)
58-
5960
w.wg.Add(1)
60-
go w.run(eventsChan, errorChan, delay)
61+
go w.run(delay, handler)
6162

62-
return w, eventsChan, errorChan, nil
63+
return w, nil
6364
}
6465

6566
// run is the main event-handling loop for the watcher. It should be run in a
6667
// separate goroutine.
67-
func (w *FileWatcher) run(events chan<- []protocol.FileEvent, errs chan<- error, delay time.Duration) {
68+
func (w *Watcher) run(delay time.Duration, handler func([]protocol.FileEvent, error)) {
6869
defer w.wg.Done()
6970

7071
// timer is used to debounce events.
@@ -73,20 +74,11 @@ func (w *FileWatcher) run(events chan<- []protocol.FileEvent, errs chan<- error,
7374

7475
for {
7576
select {
76-
case <-w.closed:
77-
// File watcher should not send the remaining events to the receiver
78-
// because the client may not listening to the channel, could
79-
// result in blocking forever.
80-
//
81-
// Once close signal received, ErrorChan and EventsChan will be
82-
// closed. Exit the go routine to ensure no more value will be sent
83-
// through those channels.
84-
close(errs)
85-
close(events)
77+
case <-w.stop:
8678
return
8779

8880
case <-timer.C:
89-
w.sendEvents(events)
81+
w.sendEvents(handler)
9082
timer.Reset(delay)
9183

9284
case err, ok := <-w.watcher.Errors:
@@ -96,18 +88,20 @@ func (w *FileWatcher) run(events chan<- []protocol.FileEvent, errs chan<- error,
9688
if !ok {
9789
continue
9890
}
99-
errs <- err
91+
if err != nil {
92+
handler(nil, err)
93+
}
10094

10195
case event, ok := <-w.watcher.Events:
10296
if !ok {
10397
continue
10498
}
105-
// FileWatcher should not handle the fsnotify.Event concurrently,
99+
// file watcher should not handle the fsnotify.Event concurrently,
106100
// the original order should be preserved. E.g. if a file get
107101
// deleted and recreated, running concurrently may result it in
108102
// reverse order.
109103
//
110-
// Only reset the timer if an relevant event happened.
104+
// Only reset the timer if a relevant event happened.
111105
// https://github.com/fsnotify/fsnotify?tab=readme-ov-file#why-do-i-get-many-chmod-events
112106
if e := w.handleEvent(event); e != nil {
113107
w.addEvent(*e)
@@ -134,7 +128,7 @@ func skipDir(dirName string) bool {
134128

135129
// WatchDir walks through the directory and all its subdirectories, adding
136130
// them to the watcher.
137-
func (w *FileWatcher) WatchDir(path string) error {
131+
func (w *Watcher) WatchDir(path string) error {
138132
return filepath.WalkDir(filepath.Clean(path), func(path string, d fs.DirEntry, err error) error {
139133
if d.IsDir() {
140134
if skipDir(d.Name()) {
@@ -150,29 +144,27 @@ func (w *FileWatcher) WatchDir(path string) error {
150144
}
151145

152146
// handleEvent converts a single [fsnotify.Event] to the corresponding
153-
// [protocol.FileEvent].
147+
// [protocol.FileEvent] and updates the watcher state.
154148
// Returns nil if the input event is not relevant.
155-
func (w *FileWatcher) handleEvent(event fsnotify.Event) *protocol.FileEvent {
149+
func (w *Watcher) handleEvent(event fsnotify.Event) *protocol.FileEvent {
156150
// fsnotify does not guarantee clean filepaths.
157151
path := filepath.Clean(event.Name)
158152

159153
var isDir bool
160154
if info, err := os.Stat(path); err == nil {
161155
isDir = info.IsDir()
156+
} else if os.IsNotExist(err) {
157+
// Upon deletion, the file/dir has been removed. fsnotify
158+
// does not provide information regarding the deleted item.
159+
// Use the watchedDirs to determine whether it's a dir.
160+
isDir = w.isDir(path)
162161
} else {
163-
if os.IsNotExist(err) {
164-
// Upon deletion, the file/dir has been removed. fsnotify
165-
// does not provide information regarding the deleted item.
166-
// Use the watchedDirs to determine whether it's a dir.
167-
isDir = w.isDir(path)
168-
} else {
169-
// If statting failed, something is wrong with the file system.
170-
// Log and move on.
171-
if w.logger != nil {
172-
w.logger.Error("failed to stat path, skipping event as its type (file/dir) is unknown", "path", path, "err", err)
173-
}
174-
return nil
162+
// If statting failed, something is wrong with the file system.
163+
// Log and move on.
164+
if w.logger != nil {
165+
w.logger.Error("failed to stat path, skipping event as its type (file/dir) is unknown", "path", path, "err", err)
175166
}
167+
return nil
176168
}
177169

178170
if isDir {
@@ -210,9 +202,9 @@ func (w *FileWatcher) handleEvent(event fsnotify.Event) *protocol.FileEvent {
210202
return nil
211203
}
212204
} else {
213-
// Only watch *.{go,mod,sum,work}
205+
// Only watch files of interest.
214206
switch strings.TrimPrefix(filepath.Ext(path), ".") {
215-
case "go", "mod", "sum", "work":
207+
case "go", "mod", "sum", "work", "s":
216208
default:
217209
return nil
218210
}
@@ -241,7 +233,7 @@ func (w *FileWatcher) handleEvent(event fsnotify.Event) *protocol.FileEvent {
241233
}
242234
}
243235

244-
func (w *FileWatcher) watchDir(path string) error {
236+
func (w *Watcher) watchDir(path string) error {
245237
w.mu.Lock()
246238
defer w.mu.Unlock()
247239

@@ -255,30 +247,29 @@ func (w *FileWatcher) watchDir(path string) error {
255247
return nil
256248
}
257249

258-
func (w *FileWatcher) unwatchDir(path string) {
250+
func (w *Watcher) unwatchDir(path string) {
259251
w.mu.Lock()
260252
defer w.mu.Unlock()
261253

262-
// Upon removal, we only need to remove the entries from the map
263-
// [fileWatcher.watchedDirPath].
254+
// Upon removal, we only need to remove the entries from the map.
264255
// The [fsnotify.Watcher] remove the watch for us.
265256
// fsnotify/fsnotify#268
266257
delete(w.watchedDirs, path)
267258
}
268259

269-
func (w *FileWatcher) isDir(path string) bool {
260+
func (w *Watcher) isDir(path string) bool {
270261
w.mu.Lock()
271262
defer w.mu.Unlock()
272263

273264
_, isDir := w.watchedDirs[path]
274265
return isDir
275266
}
276267

277-
func (w *FileWatcher) addEvent(event protocol.FileEvent) {
268+
func (w *Watcher) addEvent(event protocol.FileEvent) {
278269
w.mu.Lock()
279270
defer w.mu.Unlock()
280271

281-
// Some architectures emit duplicate change events in close
272+
// Some systems emit duplicate change events in close
282273
// succession upon file modification. While the current
283274
// deduplication is naive and only handles immediate duplicates,
284275
// a more robust solution is needed.
@@ -292,27 +283,24 @@ func (w *FileWatcher) addEvent(event protocol.FileEvent) {
292283
}
293284
}
294285

295-
func (w *FileWatcher) sendEvents(eventsChan chan<- []protocol.FileEvent) {
296-
w.mu.Lock() // Guard the w.events read and write. Not w.EventChan.
297-
defer w.mu.Unlock()
286+
func (w *Watcher) sendEvents(handler func([]protocol.FileEvent, error)) {
287+
w.mu.Lock()
288+
events := w.events
289+
w.events = nil
290+
w.mu.Unlock()
298291

299-
if len(w.events) != 0 {
300-
eventsChan <- w.events
301-
w.events = make([]protocol.FileEvent, 0)
292+
if len(events) != 0 {
293+
handler(events, nil)
302294
}
303295
}
304296

305297
// Close shuts down the watcher, waits for the internal goroutine to terminate,
306298
// and returns any final error.
307-
func (w *FileWatcher) Close() error {
308-
w.mu.Lock()
309-
299+
func (w *Watcher) Close() error {
310300
err := w.watcher.Close()
311301
// Wait for the go routine to finish. So all the channels will be closed and
312302
// all go routine will be terminated.
313-
close(w.closed)
314-
315-
w.mu.Unlock()
303+
close(w.stop)
316304

317305
w.wg.Wait()
318306

0 commit comments

Comments
 (0)