@@ -124,24 +124,24 @@ UInt32 FSEventStreamEventFlagItemIsLastHardlink () {
124124 *****************************************************************************/
125125
126126/* Write an event to the pipe input fd */
127- static void writeEvent (int fd , UInt64 eventId , UInt64 eventFlags , char * path )
127+ static void writeEvent (int fd , UInt64 eventId , UInt64 eventFlags , char * path )
128128{
129129 UInt64 buf [3 ];
130+ /* XXX Is the path string in UTF-8? */
131+ size_t len = strlen (path );
132+
130133 buf [0 ] = eventId ;
131134 buf [1 ] = eventFlags ;
132- /* XXX Is the path string in UTF-8? */
133- buf [2 ] = (UInt64 )strlen (path );
135+ buf [2 ] = (UInt64 )len ;
134136 write (fd , buf , 3 * sizeof (UInt64 ));
135- write (fd , path , strlen ( path ) );
137+ write (fd , path , len );
136138}
137139
138- /* thread state */
139140struct watch
140141{
141142 FSEventStreamRef eventStream ;
142- CFRunLoopRef runLoop ;
143+ dispatch_queue_t queue ;
143144 int writefd ;
144- pthread_mutex_t mut ;
145145};
146146
147147/* Just writes the event to the pipe input fd */
@@ -163,26 +163,16 @@ static void watchCallback
163163 }
164164}
165165
166- /******************************************************************************
167- * Start a watch event loop
168- *****************************************************************************/
166+ #define MAX_WATCH_PATHS 4096
169167
170- /* Event loop run in a pthread */
171- static void * watchRunLoop (void * vw )
172- {
173- struct watch * w = (struct watch * ) vw ;
174- CFRunLoopRef rl = CFRunLoopGetCurrent ();
175- CFRetain (rl );
176- w -> runLoop = rl ;
177- FSEventStreamScheduleWithRunLoop (w -> eventStream , rl , kCFRunLoopDefaultMode );
178- FSEventStreamStart (w -> eventStream );
179- pthread_mutex_unlock (& w -> mut );
180- CFRunLoopRun ();
181- pthread_exit (NULL );
168+ static void free_cffolders (CFStringRef * cffolders , int n ) {
169+ int i ;
170+ for (i = 0 ; i < n ; i ++ ) {
171+ CFRelease (cffolders [i ]);
172+ }
173+ free (cffolders );
182174}
183175
184- #define MAX_WATCH_PATHS 4096
185-
186176int createWatch
187177 ( struct pathName * folders
188178 , int n /* number of entries in folders */
@@ -210,8 +200,18 @@ int createWatch
210200 since = kFSEventStreamEventIdSinceNow ;
211201 }
212202
203+ struct watch * w = malloc (sizeof (struct watch ));
204+ if (!w ) {
205+ goto cleanup_pipe ;
206+ }
207+
213208 /* Setup paths array */
214209 CFStringRef * cffolders = malloc (n * sizeof (CFStringRef ));
210+ if (!cffolders ) {
211+ goto cleanup_watch ;
212+ }
213+
214+ /* Create event stream using paths and context*/
215215 int i ;
216216 for (i = 0 ; i < n ; i ++ ) {
217217 cffolders [i ] = CFStringCreateWithBytes
@@ -221,74 +221,67 @@ int createWatch
221221 , kCFStringEncodingUTF8
222222 , false
223223 );
224+ if (!cffolders [i ]) {
225+ free_cffolders (cffolders , i );
226+ goto cleanup_watch ;
227+ }
224228 }
225229 CFArrayRef paths = CFArrayCreate (NULL , (const void * * )cffolders , n , NULL );
230+ if (!paths ) {
231+ free_cffolders (cffolders , n );
232+ goto cleanup_watch ;
233+ }
226234
227- /* Setup context */
228- struct watch * w = malloc (sizeof (struct watch ));
229235 FSEventStreamContext ctx ;
230236 ctx .version = 0 ;
231237 ctx .info = (void * )w ;
232238 ctx .retain = NULL ;
233239 ctx .release = NULL ;
234240 ctx .copyDescription = NULL ;
235241
236- /* Create watch using paths and context*/
237- FSEventStreamRef es = FSEventStreamCreate
242+ w -> eventStream = FSEventStreamCreate
238243 (NULL , & watchCallback , & ctx , paths , since , latency , createFlags );
244+ free_cffolders (cffolders , n );
245+ CFRelease (paths );
239246
240- /* Run the event loop in a pthread */
241- int retval ;
242- if (es != NULL ) {
243- /* Success */
244- w -> writefd = pfds [1 ];
245- w -> eventStream = es ;
246- w -> runLoop = NULL ;
247-
248- /* Lock to prevent race against watch destroy */
249- pthread_mutex_init (& w -> mut , NULL );
250- pthread_mutex_lock (& w -> mut );
251- pthread_t t ;
252- pthread_create (& t , NULL , & watchRunLoop , (void * )w );
253-
254- /* return the out fd and the watch struct */
255- * fd = pfds [0 ];
256- * wp = w ;
257- retval = 0 ;
258- } else {
259- /* Failure */
260- close (pfds [0 ]);
261- close (pfds [1 ]);
262- free (w );
263- retval = -1 ;
247+ if (w -> eventStream == NULL ) {
248+ goto cleanup_watch ;
264249 }
265250
266- /* Cleanup */
267- for ( i = 0 ; i < n ; i ++ ) {
268- CFRelease ( cffolders [ i ]) ;
251+ w -> queue = dispatch_queue_create ( "com.composewell.streamly" , NULL );
252+ if (! w -> queue ) {
253+ goto cleanup_es ;
269254 }
270- free (cffolders );
271- CFRelease (paths );
272- return retval ;
255+
256+ w -> writefd = pfds [1 ];
257+ * fd = pfds [0 ];
258+ * wp = w ;
259+ FSEventStreamSetDispatchQueue (w -> eventStream , w -> queue );
260+ FSEventStreamStart (w -> eventStream );
261+ return 0 ;
262+
263+ cleanup_es :
264+ FSEventStreamRelease (w -> eventStream );
265+ cleanup_watch :
266+ free (w );
267+ cleanup_pipe :
268+ close (pfds [0 ]);
269+ close (pfds [1 ]);
270+ return -1 ;
273271}
274272
275273/******************************************************************************
276274 * Stop a watch event loop
277275 *****************************************************************************/
278276
279277void destroyWatch (struct watch * w ) {
280- /* Stop the loop so the thread will exit */
281- pthread_mutex_lock ( & w -> mut );
278+ /* Stop and invalidate the event stream */
279+ FSEventStreamFlushSync ( w -> eventStream );
282280 FSEventStreamStop (w -> eventStream );
283281 FSEventStreamInvalidate (w -> eventStream );
284- CFRunLoopStop (w -> runLoop );
285- CFRelease (w -> runLoop );
282+ dispatch_release (w -> queue );
286283 FSEventStreamRelease (w -> eventStream );
287284 close (w -> writefd );
288- pthread_mutex_unlock (& w -> mut );
289-
290- /* Cleanup */
291- pthread_mutex_destroy (& w -> mut );
292285 free (w );
293286}
294287#endif
0 commit comments