5
5
6
6
import { watchFile , unwatchFile , Stats } from 'fs' ;
7
7
import { Disposable , DisposableMap , DisposableStore , toDisposable } from 'vs/base/common/lifecycle' ;
8
- import { ILogMessage , IUniversalWatchRequest , IWatchRequestWithCorrelation , IWatcher , isWatchRequestWithCorrelation } from 'vs/platform/files/common/watcher' ;
8
+ import { ILogMessage , IRecursiveWatcherWithSubscribe , IUniversalWatchRequest , IWatchRequestWithCorrelation , IWatcher , isWatchRequestWithCorrelation } from 'vs/platform/files/common/watcher' ;
9
9
import { Emitter , Event } from 'vs/base/common/event' ;
10
10
import { FileChangeType , IFileChange } from 'vs/platform/files/common/files' ;
11
11
import { URI } from 'vs/base/common/uri' ;
12
+ import { DeferredPromise } from 'vs/base/common/async' ;
12
13
13
14
export abstract class BaseWatcher extends Disposable implements IWatcher {
14
15
@@ -25,9 +26,12 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
25
26
private readonly allCorrelatedWatchRequests = new Map < number /* correlation ID */ , IWatchRequestWithCorrelation > ( ) ;
26
27
27
28
private readonly suspendedWatchRequests = this . _register ( new DisposableMap < number /* correlation ID */ > ( ) ) ;
29
+ private readonly suspendedWatchRequestsWithPolling = new Set < number /* correlation ID */ > ( ) ;
28
30
29
31
protected readonly suspendedWatchRequestPollingInterval : number = 5007 ; // node.js default
30
32
33
+ private joinWatch = new DeferredPromise < void > ( ) ;
34
+
31
35
constructor ( ) {
32
36
super ( ) ;
33
37
@@ -41,6 +45,11 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
41
45
// to experiment with this feature in a controlled way. Monitoring requests
42
46
// requires us to install polling watchers (via `fs.watchFile()`) and thus
43
47
// should be used sparingly.
48
+ //
49
+ // TODO@bpasero revisit this in the future to have a more general approach
50
+ // for suspend/resume and drop the `legacyMonitorRequest` in parcel.
51
+ // One issue is that we need to be able to uniquely identify a request and
52
+ // without correlation that is actually harder...
44
53
45
54
return ;
46
55
}
@@ -53,26 +62,36 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
53
62
}
54
63
55
64
async watch ( requests : IUniversalWatchRequest [ ] ) : Promise < void > {
56
- this . allCorrelatedWatchRequests . clear ( ) ;
57
- this . allNonCorrelatedWatchRequests . clear ( ) ;
58
-
59
- // Figure out correlated vs. non-correlated requests
60
- for ( const request of requests ) {
61
- if ( this . isCorrelated ( request ) ) {
62
- this . allCorrelatedWatchRequests . set ( request . correlationId , request ) ;
63
- } else {
64
- this . allNonCorrelatedWatchRequests . add ( request ) ;
65
- }
65
+ if ( ! this . joinWatch . isSettled ) {
66
+ this . joinWatch . complete ( ) ;
66
67
}
68
+ this . joinWatch = new DeferredPromise < void > ( ) ;
67
69
68
- // Remove all suspended correlated watch requests that are no longer watched
69
- for ( const [ correlationId ] of this . suspendedWatchRequests ) {
70
- if ( ! this . allCorrelatedWatchRequests . has ( correlationId ) ) {
71
- this . suspendedWatchRequests . deleteAndDispose ( correlationId ) ;
70
+ try {
71
+ this . allCorrelatedWatchRequests . clear ( ) ;
72
+ this . allNonCorrelatedWatchRequests . clear ( ) ;
73
+
74
+ // Figure out correlated vs. non-correlated requests
75
+ for ( const request of requests ) {
76
+ if ( this . isCorrelated ( request ) ) {
77
+ this . allCorrelatedWatchRequests . set ( request . correlationId , request ) ;
78
+ } else {
79
+ this . allNonCorrelatedWatchRequests . add ( request ) ;
80
+ }
72
81
}
73
- }
74
82
75
- return this . updateWatchers ( ) ;
83
+ // Remove all suspended correlated watch requests that are no longer watched
84
+ for ( const [ correlationId ] of this . suspendedWatchRequests ) {
85
+ if ( ! this . allCorrelatedWatchRequests . has ( correlationId ) ) {
86
+ this . suspendedWatchRequests . deleteAndDispose ( correlationId ) ;
87
+ this . suspendedWatchRequestsWithPolling . delete ( correlationId ) ;
88
+ }
89
+ }
90
+
91
+ return await this . updateWatchers ( ) ;
92
+ } finally {
93
+ this . joinWatch . complete ( ) ;
94
+ }
76
95
}
77
96
78
97
private updateWatchers ( ) : Promise < void > {
@@ -82,29 +101,78 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
82
101
] ) ;
83
102
}
84
103
85
- private suspendWatchRequest ( request : IWatchRequestWithCorrelation ) : void {
104
+ isSuspended ( request : IUniversalWatchRequest ) : 'polling' | boolean {
105
+ if ( typeof request . correlationId !== 'number' ) {
106
+ return false ;
107
+ }
108
+
109
+ return this . suspendedWatchRequestsWithPolling . has ( request . correlationId ) ? 'polling' : this . suspendedWatchRequests . has ( request . correlationId ) ;
110
+ }
111
+
112
+ private async suspendWatchRequest ( request : IWatchRequestWithCorrelation ) : Promise < void > {
86
113
if ( this . suspendedWatchRequests . has ( request . correlationId ) ) {
87
114
return ; // already suspended
88
115
}
89
116
90
117
const disposables = new DisposableStore ( ) ;
91
118
this . suspendedWatchRequests . set ( request . correlationId , disposables ) ;
92
119
120
+ // It is possible that a watch request fails right during watch()
121
+ // phase while other requests succeed. To increase the chance of
122
+ // reusing another watcher for suspend/resume tracking, we await
123
+ // all watch requests having processed.
124
+
125
+ await this . joinWatch . p ;
126
+
127
+ if ( disposables . isDisposed ) {
128
+ return ;
129
+ }
130
+
93
131
this . monitorSuspendedWatchRequest ( request , disposables ) ;
94
132
95
133
this . updateWatchers ( ) ;
96
134
}
97
135
98
136
private resumeWatchRequest ( request : IWatchRequestWithCorrelation ) : void {
99
137
this . suspendedWatchRequests . deleteAndDispose ( request . correlationId ) ;
138
+ this . suspendedWatchRequestsWithPolling . delete ( request . correlationId ) ;
100
139
101
140
this . updateWatchers ( ) ;
102
141
}
103
142
104
- private monitorSuspendedWatchRequest ( request : IWatchRequestWithCorrelation , disposables : DisposableStore ) {
105
- const resource = URI . file ( request . path ) ;
106
- const that = this ;
143
+ private monitorSuspendedWatchRequest ( request : IWatchRequestWithCorrelation , disposables : DisposableStore ) : void {
144
+ if ( this . doMonitorWithExistingWatcher ( request , disposables ) ) {
145
+ this . trace ( `reusing an existing recursive watcher to monitor ${ request . path } ` ) ;
146
+ this . suspendedWatchRequestsWithPolling . delete ( request . correlationId ) ;
147
+ } else {
148
+ this . doMonitorWithNodeJS ( request , disposables ) ;
149
+ this . suspendedWatchRequestsWithPolling . add ( request . correlationId ) ;
150
+ }
151
+ }
152
+
153
+ private doMonitorWithExistingWatcher ( request : IWatchRequestWithCorrelation , disposables : DisposableStore ) : boolean {
154
+ const subscription = this . recursiveWatcher ?. subscribe ( request . path , ( error , change ) => {
155
+ if ( disposables . isDisposed ) {
156
+ return ; // return early if already disposed
157
+ }
158
+
159
+ if ( error ) {
160
+ this . monitorSuspendedWatchRequest ( request , disposables ) ;
161
+ } else if ( change ?. type === FileChangeType . ADDED ) {
162
+ this . onMonitoredPathAdded ( request ) ;
163
+ }
164
+ } ) ;
165
+
166
+ if ( subscription ) {
167
+ disposables . add ( subscription ) ;
168
+
169
+ return true ;
170
+ }
171
+
172
+ return false ;
173
+ }
107
174
175
+ private doMonitorWithNodeJS ( request : IWatchRequestWithCorrelation , disposables : DisposableStore ) : void {
108
176
let pathNotFound = false ;
109
177
110
178
const watchFileCallback : ( curr : Stats , prev : Stats ) => void = ( curr , prev ) => {
@@ -119,15 +187,7 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
119
187
120
188
// Watch path created: resume watching request
121
189
if ( ! currentPathNotFound && ( previousPathNotFound || oldPathNotFound ) ) {
122
- this . trace ( `fs.watchFile() detected ${ request . path } exists again, resuming watcher (correlationId: ${ request . correlationId } )` ) ;
123
-
124
- // Emit as event
125
- const event : IFileChange = { resource, type : FileChangeType . ADDED , cId : request . correlationId } ;
126
- that . _onDidChangeFile . fire ( [ event ] ) ;
127
- this . traceEvent ( event , request ) ;
128
-
129
- // Resume watching
130
- this . resumeWatchRequest ( request ) ;
190
+ this . onMonitoredPathAdded ( request ) ;
131
191
}
132
192
} ;
133
193
@@ -149,12 +209,25 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
149
209
} ) ) ;
150
210
}
151
211
212
+ private onMonitoredPathAdded ( request : IWatchRequestWithCorrelation ) {
213
+ this . trace ( `detected ${ request . path } exists again, resuming watcher (correlationId: ${ request . correlationId } )` ) ;
214
+
215
+ // Emit as event
216
+ const event : IFileChange = { resource : URI . file ( request . path ) , type : FileChangeType . ADDED , cId : request . correlationId } ;
217
+ this . _onDidChangeFile . fire ( [ event ] ) ;
218
+ this . traceEvent ( event , request ) ;
219
+
220
+ // Resume watching
221
+ this . resumeWatchRequest ( request ) ;
222
+ }
223
+
152
224
private isPathNotFound ( stats : Stats ) : boolean {
153
225
return stats . ctimeMs === 0 && stats . ino === 0 ;
154
226
}
155
227
156
228
async stop ( ) : Promise < void > {
157
229
this . suspendedWatchRequests . clearAndDisposeAll ( ) ;
230
+ this . suspendedWatchRequestsWithPolling . clear ( ) ;
158
231
}
159
232
160
233
protected traceEvent ( event : IFileChange , request : IUniversalWatchRequest ) : void {
@@ -168,6 +241,8 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
168
241
169
242
protected abstract doWatch ( requests : IUniversalWatchRequest [ ] ) : Promise < void > ;
170
243
244
+ protected abstract readonly recursiveWatcher : IRecursiveWatcherWithSubscribe | undefined ;
245
+
171
246
protected abstract trace ( message : string ) : void ;
172
247
protected abstract warn ( message : string ) : void ;
173
248
0 commit comments