Skip to content

Commit 30c5a1a

Browse files
committed
fsmonitor--daemon: create token-based changed path cache
Teach fsmonitor--daemon to build a list of changed paths and associate them with a token-id. This will be used by the platform-specific backends to accumulate changed paths in response to filesystem events. The platform-specific file system listener thread receives file system events containing one or more changed pathnames (with whatever bucketing or grouping that is convenient for the file system). These paths are accumulated (without locking) by the file system layer into a `fsmonitor_batch`. When the file system layer has drained the kernel event queue, it will "publish" them to our token queue and make them visible to concurrent client worker threads. The token layer is free to combine and/or de-dup paths within these batches for efficient presentation to clients. Signed-off-by: Jeff Hostetler <[email protected]>
1 parent 89f8b1f commit 30c5a1a

File tree

2 files changed

+272
-2
lines changed

2 files changed

+272
-2
lines changed

builtin/fsmonitor--daemon.c

Lines changed: 232 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,17 +168,27 @@ struct fsmonitor_token_data {
168168
uint64_t client_ref_count;
169169
};
170170

171+
struct fsmonitor_batch {
172+
struct fsmonitor_batch *next;
173+
uint64_t batch_seq_nr;
174+
const char **interned_paths;
175+
size_t nr, alloc;
176+
time_t pinned_time;
177+
};
178+
171179
static struct fsmonitor_token_data *fsmonitor_new_token_data(void)
172180
{
173181
static int test_env_value = -1;
174182
static uint64_t flush_count = 0;
175183
struct fsmonitor_token_data *token;
184+
struct fsmonitor_batch *batch;
176185

177186
CALLOC_ARRAY(token, 1);
187+
batch = fsmonitor_batch__new();
178188

179189
strbuf_init(&token->token_id, 0);
180-
token->batch_head = NULL;
181-
token->batch_tail = NULL;
190+
token->batch_head = batch;
191+
token->batch_tail = batch;
182192
token->client_ref_count = 0;
183193

184194
if (test_env_value < 0)
@@ -204,9 +214,147 @@ static struct fsmonitor_token_data *fsmonitor_new_token_data(void)
204214
strbuf_addf(&token->token_id, "test_%08x", test_env_value++);
205215
}
206216

217+
/*
218+
* We created a new <token_id> and are starting a new series
219+
* of tokens with a zero <seq_nr>.
220+
*
221+
* Since clients cannot guess our new (non test) <token_id>
222+
* they will always receive a trivial response (because of the
223+
* mismatch on the <token_id>). The trivial response will
224+
* tell them our new <token_id> so that subsequent requests
225+
* will be relative to our new series. (And when sending that
226+
* response, we pin the current head of the batch list.)
227+
*
228+
* Even if the client correctly guesses the <token_id>, their
229+
* request of "builtin:<token_id>:0" asks for all changes MORE
230+
* RECENT than batch/bin 0.
231+
*
232+
* This implies that it is a waste to accumulate paths in the
233+
* initial batch/bin (because they will never be transmitted).
234+
*
235+
* So the daemon could be running for days and watching the
236+
* file system, but doesn't need to actually accumulate any
237+
* paths UNTIL we need to set a reference point for a later
238+
* relative request.
239+
*
240+
* However, it is very useful for testing to always have a
241+
* reference point set. Pin batch 0 to force early file system
242+
* events to accumulate.
243+
*/
244+
if (test_env_value)
245+
batch->pinned_time = time(NULL);
246+
207247
return token;
208248
}
209249

250+
struct fsmonitor_batch *fsmonitor_batch__new(void)
251+
{
252+
struct fsmonitor_batch *batch;
253+
254+
CALLOC_ARRAY(batch, 1);
255+
256+
return batch;
257+
}
258+
259+
struct fsmonitor_batch *fsmonitor_batch__pop(struct fsmonitor_batch *batch)
260+
{
261+
struct fsmonitor_batch *next;
262+
263+
if (!batch)
264+
return NULL;
265+
266+
next = batch->next;
267+
268+
/*
269+
* The actual strings within the array are interned, so we don't
270+
* own them.
271+
*/
272+
free(batch->interned_paths);
273+
274+
return next;
275+
}
276+
277+
void fsmonitor_batch__add_path(struct fsmonitor_batch *batch,
278+
const char *path)
279+
{
280+
const char *interned_path = strintern(path);
281+
282+
trace_printf_key(&trace_fsmonitor, "event: %s", interned_path);
283+
284+
ALLOC_GROW(batch->interned_paths, batch->nr + 1, batch->alloc);
285+
batch->interned_paths[batch->nr++] = interned_path;
286+
}
287+
288+
static void fsmonitor_batch__combine(struct fsmonitor_batch *batch_dest,
289+
const struct fsmonitor_batch *batch_src)
290+
{
291+
size_t k;
292+
293+
ALLOC_GROW(batch_dest->interned_paths,
294+
batch_dest->nr + batch_src->nr + 1,
295+
batch_dest->alloc);
296+
297+
for (k = 0; k < batch_src->nr; k++)
298+
batch_dest->interned_paths[batch_dest->nr++] =
299+
batch_src->interned_paths[k];
300+
}
301+
302+
static void fsmonitor_free_token_data(struct fsmonitor_token_data *token)
303+
{
304+
struct fsmonitor_batch *p;
305+
306+
if (!token)
307+
return;
308+
309+
assert(token->client_ref_count == 0);
310+
311+
strbuf_release(&token->token_id);
312+
313+
for (p = token->batch_head; p; p = fsmonitor_batch__pop(p))
314+
;
315+
316+
free(token);
317+
}
318+
319+
/*
320+
* Flush all of our cached data about the filesystem. Call this if we
321+
* lose sync with the filesystem and miss some notification events.
322+
*
323+
* [1] If we are missing events, then we no longer have a complete
324+
* history of the directory (relative to our current start token).
325+
* We should create a new token and start fresh (as if we just
326+
* booted up).
327+
*
328+
* If there are no concurrent threads readering the current token data
329+
* series, we can free it now. Otherwise, let the last reader free
330+
* it.
331+
*
332+
* Either way, the old token data series is no longer associated with
333+
* our state data.
334+
*/
335+
static void with_lock__do_force_resync(struct fsmonitor_daemon_state *state)
336+
{
337+
/* assert current thread holding state->main_lock */
338+
339+
struct fsmonitor_token_data *free_me = NULL;
340+
struct fsmonitor_token_data *new_one = NULL;
341+
342+
new_one = fsmonitor_new_token_data();
343+
344+
if (state->current_token_data->client_ref_count == 0)
345+
free_me = state->current_token_data;
346+
state->current_token_data = new_one;
347+
348+
fsmonitor_free_token_data(free_me);
349+
}
350+
351+
void fsmonitor_force_resync(struct fsmonitor_daemon_state *state)
352+
{
353+
pthread_mutex_lock(&state->main_lock);
354+
with_lock__do_force_resync(state);
355+
pthread_mutex_unlock(&state->main_lock);
356+
}
357+
210358
static ipc_server_application_cb handle_client;
211359

212360
static int handle_client(void *data,
@@ -316,6 +464,81 @@ enum fsmonitor_path_type fsmonitor_classify_path_absolute(
316464
return fsmonitor_classify_path_gitdir_relative(rel);
317465
}
318466

467+
/*
468+
* We try to combine small batches at the front of the batch-list to avoid
469+
* having a long list. This hopefully makes it a little easier when we want
470+
* to truncate and maintain the list. However, we don't want the paths array
471+
* to just keep growing and growing with realloc, so we insert an arbitrary
472+
* limit.
473+
*/
474+
#define MY_COMBINE_LIMIT (1024)
475+
476+
void fsmonitor_publish(struct fsmonitor_daemon_state *state,
477+
struct fsmonitor_batch *batch,
478+
const struct string_list *cookie_names)
479+
{
480+
if (!batch && !cookie_names->nr)
481+
return;
482+
483+
pthread_mutex_lock(&state->main_lock);
484+
485+
if (batch) {
486+
struct fsmonitor_batch *head;
487+
488+
head = state->current_token_data->batch_head;
489+
if (!head) {
490+
BUG("token does not have batch");
491+
} else if (head->pinned_time) {
492+
/*
493+
* We cannot alter the current batch list
494+
* because:
495+
*
496+
* [a] it is being transmitted to at least one
497+
* client and the handle_client() thread has a
498+
* ref-count, but not a lock on the batch list
499+
* starting with this item.
500+
*
501+
* [b] it has been transmitted in the past to
502+
* at least one client such that future
503+
* requests are relative to this head batch.
504+
*
505+
* So, we can only prepend a new batch onto
506+
* the front of the list.
507+
*/
508+
batch->batch_seq_nr = head->batch_seq_nr + 1;
509+
batch->next = head;
510+
state->current_token_data->batch_head = batch;
511+
} else if (!head->batch_seq_nr) {
512+
/*
513+
* Batch 0 is unpinned. See the note in
514+
* `fsmonitor_new_token_data()` about why we
515+
* don't need to accumulate these paths.
516+
*/
517+
fsmonitor_batch__pop(batch);
518+
} else if (head->nr + batch->nr > MY_COMBINE_LIMIT) {
519+
/*
520+
* The head batch in the list has never been
521+
* transmitted to a client, but folding the
522+
* contents of the new batch onto it would
523+
* exceed our arbitrary limit, so just prepend
524+
* the new batch onto the list.
525+
*/
526+
batch->batch_seq_nr = head->batch_seq_nr + 1;
527+
batch->next = head;
528+
state->current_token_data->batch_head = batch;
529+
} else {
530+
/*
531+
* We are free to append the paths in the given
532+
* batch onto the end of the current head batch.
533+
*/
534+
fsmonitor_batch__combine(head, batch);
535+
fsmonitor_batch__pop(batch);
536+
}
537+
}
538+
539+
pthread_mutex_unlock(&state->main_lock);
540+
}
541+
319542
static void *fsm_listen__thread_proc(void *_state)
320543
{
321544
struct fsmonitor_daemon_state *state = _state;
@@ -330,6 +553,13 @@ static void *fsm_listen__thread_proc(void *_state)
330553

331554
fsm_listen__loop(state);
332555

556+
pthread_mutex_lock(&state->main_lock);
557+
if (state->current_token_data &&
558+
state->current_token_data->client_ref_count == 0)
559+
fsmonitor_free_token_data(state->current_token_data);
560+
state->current_token_data = NULL;
561+
pthread_mutex_unlock(&state->main_lock);
562+
333563
trace2_thread_exit();
334564
return NULL;
335565
}

fsmonitor--daemon.h

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,27 @@
1212
struct fsmonitor_batch;
1313
struct fsmonitor_token_data;
1414

15+
/*
16+
* Create a new batch of path(s). The returned batch is considered
17+
* private and not linked into the fsmonitor daemon state. The caller
18+
* should fill this batch with one or more paths and then publish it.
19+
*/
20+
struct fsmonitor_batch *fsmonitor_batch__new(void);
21+
22+
/*
23+
* Free this batch and return the value of the batch->next field.
24+
*/
25+
struct fsmonitor_batch *fsmonitor_batch__pop(struct fsmonitor_batch *batch);
26+
27+
/*
28+
* Add this path to this batch of modified files.
29+
*
30+
* The batch should be private and NOT (yet) linked into the fsmonitor
31+
* daemon state and therefore not yet visible to worker threads and so
32+
* no locking is required.
33+
*/
34+
void fsmonitor_batch__add_path(struct fsmonitor_batch *batch, const char *path);
35+
1536
struct fsmonitor_daemon_backend_data; /* opaque platform-specific data */
1637

1738
struct fsmonitor_daemon_state {
@@ -91,5 +112,24 @@ enum fsmonitor_path_type fsmonitor_classify_path_absolute(
91112
struct fsmonitor_daemon_state *state,
92113
const char *path);
93114

115+
/*
116+
* Prepend the this batch of path(s) onto the list of batches associated
117+
* with the current token. This makes the batch visible to worker threads.
118+
*
119+
* The caller no longer owns the batch and must not free it.
120+
*
121+
* Wake up the client threads waiting on these cookies.
122+
*/
123+
void fsmonitor_publish(struct fsmonitor_daemon_state *state,
124+
struct fsmonitor_batch *batch,
125+
const struct string_list *cookie_names);
126+
127+
/*
128+
* If the platform-specific layer loses sync with the filesystem,
129+
* it should call this to invalidate cached data and abort waiting
130+
* threads.
131+
*/
132+
void fsmonitor_force_resync(struct fsmonitor_daemon_state *state);
133+
94134
#endif /* HAVE_FSMONITOR_DAEMON_BACKEND */
95135
#endif /* FSMONITOR_DAEMON_H */

0 commit comments

Comments
 (0)