Skip to content

Commit c719062

Browse files
committed
fsmonitor--daemon: implement handle_client callback
Teach fsmonitor--daemon to respond to IPC requests from client Git processes and respond with a list of modified pathnames relative to the provided token. Signed-off-by: Jeff Hostetler <[email protected]>
1 parent f8ef89c commit c719062

File tree

1 file changed

+310
-2
lines changed

1 file changed

+310
-2
lines changed

builtin/fsmonitor--daemon.c

Lines changed: 310 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "fsmonitor--daemon.h"
88
#include "simple-ipc.h"
99
#include "khash.h"
10+
#include "pkt-line.h"
1011

1112
static const char * const builtin_fsmonitor__daemon_usage[] = {
1213
N_("git fsmonitor--daemon start [<options>]"),
@@ -355,14 +356,319 @@ void fsmonitor_force_resync(struct fsmonitor_daemon_state *state)
355356
pthread_mutex_unlock(&state->main_lock);
356357
}
357358

359+
/*
360+
* Format an opaque token string to send to the client.
361+
*/
362+
static void with_lock__format_response_token(
363+
struct strbuf *response_token,
364+
const struct strbuf *response_token_id,
365+
const struct fsmonitor_batch *batch)
366+
{
367+
/* assert current thread holding state->main_lock */
368+
369+
strbuf_reset(response_token);
370+
strbuf_addf(response_token, "builtin:%s:%"PRIu64,
371+
response_token_id->buf, batch->batch_seq_nr);
372+
}
373+
374+
/*
375+
* Parse an opaque token from the client.
376+
* Returns -1 on error.
377+
*/
378+
static int fsmonitor_parse_client_token(const char *buf_token,
379+
struct strbuf *requested_token_id,
380+
uint64_t *seq_nr)
381+
{
382+
const char *p;
383+
char *p_end;
384+
385+
strbuf_reset(requested_token_id);
386+
*seq_nr = 0;
387+
388+
if (!skip_prefix(buf_token, "builtin:", &p))
389+
return -1;
390+
391+
while (*p && *p != ':')
392+
strbuf_addch(requested_token_id, *p++);
393+
if (!*p++)
394+
return -1;
395+
396+
*seq_nr = (uint64_t)strtoumax(p, &p_end, 10);
397+
if (*p_end)
398+
return -1;
399+
400+
return 0;
401+
}
402+
403+
KHASH_INIT(str, const char *, int, 0, kh_str_hash_func, kh_str_hash_equal);
404+
405+
static int do_handle_client(struct fsmonitor_daemon_state *state,
406+
const char *command,
407+
ipc_server_reply_cb *reply,
408+
struct ipc_server_reply_data *reply_data)
409+
{
410+
struct fsmonitor_token_data *token_data = NULL;
411+
struct strbuf response_token = STRBUF_INIT;
412+
struct strbuf requested_token_id = STRBUF_INIT;
413+
struct strbuf payload = STRBUF_INIT;
414+
uint64_t requested_oldest_seq_nr = 0;
415+
uint64_t total_response_len = 0;
416+
const char *p;
417+
const struct fsmonitor_batch *batch_head;
418+
const struct fsmonitor_batch *batch;
419+
intmax_t count = 0, duplicates = 0;
420+
kh_str_t *shown;
421+
int hash_ret;
422+
int do_trivial = 0;
423+
int do_flush = 0;
424+
425+
/*
426+
* We expect `command` to be of the form:
427+
*
428+
* <command> := quit NUL
429+
* | flush NUL
430+
* | <V1-time-since-epoch-ns> NUL
431+
* | <V2-opaque-fsmonitor-token> NUL
432+
*/
433+
434+
if (!strcmp(command, "quit")) {
435+
/*
436+
* A client has requested over the socket/pipe that the
437+
* daemon shutdown.
438+
*
439+
* Tell the IPC thread pool to shutdown (which completes
440+
* the await in the main thread (which can stop the
441+
* fsmonitor listener thread)).
442+
*
443+
* There is no reply to the client.
444+
*/
445+
return SIMPLE_IPC_QUIT;
446+
447+
} else if (!strcmp(command, "flush")) {
448+
/*
449+
* Flush all of our cached data and generate a new token
450+
* just like if we lost sync with the filesystem.
451+
*
452+
* Then send a trivial response using the new token.
453+
*/
454+
do_flush = 1;
455+
do_trivial = 1;
456+
457+
} else if (!skip_prefix(command, "builtin:", &p)) {
458+
/* assume V1 timestamp or garbage */
459+
460+
char *p_end;
461+
462+
strtoumax(command, &p_end, 10);
463+
trace_printf_key(&trace_fsmonitor,
464+
((*p_end) ?
465+
"fsmonitor: invalid command line '%s'" :
466+
"fsmonitor: unsupported V1 protocol '%s'"),
467+
command);
468+
do_trivial = 1;
469+
470+
} else {
471+
/* We have "builtin:*" */
472+
if (fsmonitor_parse_client_token(command, &requested_token_id,
473+
&requested_oldest_seq_nr)) {
474+
trace_printf_key(&trace_fsmonitor,
475+
"fsmonitor: invalid V2 protocol token '%s'",
476+
command);
477+
do_trivial = 1;
478+
479+
} else {
480+
/*
481+
* We have a V2 valid token:
482+
* "builtin:<token_id>:<seq_nr>"
483+
*/
484+
}
485+
}
486+
487+
pthread_mutex_lock(&state->main_lock);
488+
489+
if (!state->current_token_data)
490+
BUG("fsmonitor state does not have a current token");
491+
492+
if (do_flush)
493+
with_lock__do_force_resync(state);
494+
495+
/*
496+
* We mark the current head of the batch list as "pinned" so
497+
* that the listener thread will treat this item as read-only
498+
* (and prevent any more paths from being added to it) from
499+
* now on.
500+
*/
501+
token_data = state->current_token_data;
502+
batch_head = token_data->batch_head;
503+
((struct fsmonitor_batch *)batch_head)->pinned_time = time(NULL);
504+
505+
/*
506+
* FSMonitor Protocol V2 requires that we send a response header
507+
* with a "new current token" and then all of the paths that changed
508+
* since the "requested token". We send the seq_nr of the just-pinned
509+
* head batch so that future requests from a client will be relative
510+
* to it.
511+
*/
512+
with_lock__format_response_token(&response_token,
513+
&token_data->token_id, batch_head);
514+
515+
reply(reply_data, response_token.buf, response_token.len + 1);
516+
total_response_len += response_token.len + 1;
517+
518+
trace2_data_string("fsmonitor", the_repository, "response/token",
519+
response_token.buf);
520+
trace_printf_key(&trace_fsmonitor, "response token: %s",
521+
response_token.buf);
522+
523+
if (!do_trivial) {
524+
if (strcmp(requested_token_id.buf, token_data->token_id.buf)) {
525+
/*
526+
* The client last spoke to a different daemon
527+
* instance -OR- the daemon had to resync with
528+
* the filesystem (and lost events), so reject.
529+
*/
530+
trace2_data_string("fsmonitor", the_repository,
531+
"response/token", "different");
532+
do_trivial = 1;
533+
534+
} else if (requested_oldest_seq_nr <
535+
token_data->batch_tail->batch_seq_nr) {
536+
/*
537+
* The client wants older events than we have for
538+
* this token_id. This means that the end of our
539+
* batch list was truncated and we cannot give the
540+
* client a complete snapshot relative to their
541+
* request.
542+
*/
543+
trace_printf_key(&trace_fsmonitor,
544+
"client requested truncated data");
545+
do_trivial = 1;
546+
}
547+
}
548+
549+
if (do_trivial) {
550+
pthread_mutex_unlock(&state->main_lock);
551+
552+
reply(reply_data, "/", 2);
553+
554+
trace2_data_intmax("fsmonitor", the_repository,
555+
"response/trivial", 1);
556+
557+
strbuf_release(&response_token);
558+
strbuf_release(&requested_token_id);
559+
return 0;
560+
}
561+
562+
/*
563+
* We're going to hold onto a pointer to the current
564+
* token-data while we walk the list of batches of files.
565+
* During this time, we will NOT be under the lock.
566+
* So we ref-count it.
567+
*
568+
* This allows the listener thread to continue prepending
569+
* new batches of items to the token-data (which we'll ignore).
570+
*
571+
* AND it allows the listener thread to do a token-reset
572+
* (and install a new `current_token_data`).
573+
*/
574+
token_data->client_ref_count++;
575+
576+
pthread_mutex_unlock(&state->main_lock);
577+
578+
/*
579+
* The client request is relative to the token that they sent,
580+
* so walk the batch list backwards from the current head back
581+
* to the batch (sequence number) they named.
582+
*
583+
* We use khash to de-dup the list of pathnames.
584+
*
585+
* NEEDSWORK: each batch contains a list of interned strings,
586+
* so we only need to do pointer comparisons here to build the
587+
* hash table. Currently, we're still comparing the string
588+
* values.
589+
*/
590+
shown = kh_init_str();
591+
for (batch = batch_head;
592+
batch && batch->batch_seq_nr > requested_oldest_seq_nr;
593+
batch = batch->next) {
594+
size_t k;
595+
596+
for (k = 0; k < batch->nr; k++) {
597+
const char *s = batch->interned_paths[k];
598+
size_t s_len;
599+
600+
if (kh_get_str(shown, s) != kh_end(shown))
601+
duplicates++;
602+
else {
603+
kh_put_str(shown, s, &hash_ret);
604+
605+
trace_printf_key(&trace_fsmonitor,
606+
"send[%"PRIuMAX"]: %s",
607+
count, s);
608+
609+
/* Each path gets written with a trailing NUL */
610+
s_len = strlen(s) + 1;
611+
612+
if (payload.len + s_len >=
613+
LARGE_PACKET_DATA_MAX) {
614+
reply(reply_data, payload.buf,
615+
payload.len);
616+
total_response_len += payload.len;
617+
strbuf_reset(&payload);
618+
}
619+
620+
strbuf_add(&payload, s, s_len);
621+
count++;
622+
}
623+
}
624+
}
625+
626+
if (payload.len) {
627+
reply(reply_data, payload.buf, payload.len);
628+
total_response_len += payload.len;
629+
}
630+
631+
kh_release_str(shown);
632+
633+
pthread_mutex_lock(&state->main_lock);
634+
635+
if (token_data->client_ref_count > 0)
636+
token_data->client_ref_count--;
637+
638+
if (token_data->client_ref_count == 0) {
639+
if (token_data != state->current_token_data) {
640+
/*
641+
* The listener thread did a token-reset while we were
642+
* walking the batch list. Therefore, this token is
643+
* stale and can be discarded completely. If we are
644+
* the last reader thread using this token, we own
645+
* that work.
646+
*/
647+
fsmonitor_free_token_data(token_data);
648+
}
649+
}
650+
651+
pthread_mutex_unlock(&state->main_lock);
652+
653+
trace2_data_intmax("fsmonitor", the_repository, "response/length", total_response_len);
654+
trace2_data_intmax("fsmonitor", the_repository, "response/count/files", count);
655+
trace2_data_intmax("fsmonitor", the_repository, "response/count/duplicates", duplicates);
656+
657+
strbuf_release(&response_token);
658+
strbuf_release(&requested_token_id);
659+
strbuf_release(&payload);
660+
661+
return 0;
662+
}
663+
358664
static ipc_server_application_cb handle_client;
359665

360666
static int handle_client(void *data,
361667
const char *command, size_t command_len,
362668
ipc_server_reply_cb *reply,
363669
struct ipc_server_reply_data *reply_data)
364670
{
365-
/* struct fsmonitor_daemon_state *state = data; */
671+
struct fsmonitor_daemon_state *state = data;
366672
int result;
367673

368674
/*
@@ -373,10 +679,12 @@ static int handle_client(void *data,
373679
if (command_len != strlen(command))
374680
BUG("FSMonitor assumes text messages");
375681

682+
trace_printf_key(&trace_fsmonitor, "requested token: %s", command);
683+
376684
trace2_region_enter("fsmonitor", "handle_client", the_repository);
377685
trace2_data_string("fsmonitor", the_repository, "request", command);
378686

379-
result = 0; /* TODO Do something here. */
687+
result = do_handle_client(state, command, reply, reply_data);
380688

381689
trace2_region_leave("fsmonitor", "handle_client", the_repository);
382690

0 commit comments

Comments
 (0)