Skip to content

Commit 518a522

Browse files
jeffhostetlergitster
authored andcommitted
fsmonitor--daemon: implement handle_client callback
Teach fsmonitor--daemon to respond to IPC requests from client Git processes and respond with a list of modified pathnames relative to the provided token. Signed-off-by: Jeff Hostetler <[email protected]> Signed-off-by: Junio C Hamano <[email protected]>
1 parent 65723b3 commit 518a522

File tree

1 file changed

+309
-2
lines changed

1 file changed

+309
-2
lines changed

builtin/fsmonitor--daemon.c

Lines changed: 309 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "fsmonitor--daemon.h"
88
#include "simple-ipc.h"
99
#include "khash.h"
10+
#include "pkt-line.h"
1011

1112
static const char * const builtin_fsmonitor__daemon_usage[] = {
1213
N_("git fsmonitor--daemon start [<options>]"),
@@ -364,14 +365,318 @@ void fsmonitor_force_resync(struct fsmonitor_daemon_state *state)
364365
pthread_mutex_unlock(&state->main_lock);
365366
}
366367

368+
/*
369+
* Format an opaque token string to send to the client.
370+
*/
371+
static void with_lock__format_response_token(
372+
struct strbuf *response_token,
373+
const struct strbuf *response_token_id,
374+
const struct fsmonitor_batch *batch)
375+
{
376+
/* assert current thread holding state->main_lock */
377+
378+
strbuf_reset(response_token);
379+
strbuf_addf(response_token, "builtin:%s:%"PRIu64,
380+
response_token_id->buf, batch->batch_seq_nr);
381+
}
382+
383+
/*
384+
* Parse an opaque token from the client.
385+
* Returns -1 on error.
386+
*/
387+
static int fsmonitor_parse_client_token(const char *buf_token,
388+
struct strbuf *requested_token_id,
389+
uint64_t *seq_nr)
390+
{
391+
const char *p;
392+
char *p_end;
393+
394+
strbuf_reset(requested_token_id);
395+
*seq_nr = 0;
396+
397+
if (!skip_prefix(buf_token, "builtin:", &p))
398+
return -1;
399+
400+
while (*p && *p != ':')
401+
strbuf_addch(requested_token_id, *p++);
402+
if (!*p++)
403+
return -1;
404+
405+
*seq_nr = (uint64_t)strtoumax(p, &p_end, 10);
406+
if (*p_end)
407+
return -1;
408+
409+
return 0;
410+
}
411+
412+
KHASH_INIT(str, const char *, int, 0, kh_str_hash_func, kh_str_hash_equal)
413+
414+
static int do_handle_client(struct fsmonitor_daemon_state *state,
415+
const char *command,
416+
ipc_server_reply_cb *reply,
417+
struct ipc_server_reply_data *reply_data)
418+
{
419+
struct fsmonitor_token_data *token_data = NULL;
420+
struct strbuf response_token = STRBUF_INIT;
421+
struct strbuf requested_token_id = STRBUF_INIT;
422+
struct strbuf payload = STRBUF_INIT;
423+
uint64_t requested_oldest_seq_nr = 0;
424+
uint64_t total_response_len = 0;
425+
const char *p;
426+
const struct fsmonitor_batch *batch_head;
427+
const struct fsmonitor_batch *batch;
428+
intmax_t count = 0, duplicates = 0;
429+
kh_str_t *shown;
430+
int hash_ret;
431+
int do_trivial = 0;
432+
int do_flush = 0;
433+
434+
/*
435+
* We expect `command` to be of the form:
436+
*
437+
* <command> := quit NUL
438+
* | flush NUL
439+
* | <V1-time-since-epoch-ns> NUL
440+
* | <V2-opaque-fsmonitor-token> NUL
441+
*/
442+
443+
if (!strcmp(command, "quit")) {
444+
/*
445+
* A client has requested over the socket/pipe that the
446+
* daemon shutdown.
447+
*
448+
* Tell the IPC thread pool to shutdown (which completes
449+
* the await in the main thread (which can stop the
450+
* fsmonitor listener thread)).
451+
*
452+
* There is no reply to the client.
453+
*/
454+
return SIMPLE_IPC_QUIT;
455+
456+
} else if (!strcmp(command, "flush")) {
457+
/*
458+
* Flush all of our cached data and generate a new token
459+
* just like if we lost sync with the filesystem.
460+
*
461+
* Then send a trivial response using the new token.
462+
*/
463+
do_flush = 1;
464+
do_trivial = 1;
465+
466+
} else if (!skip_prefix(command, "builtin:", &p)) {
467+
/* assume V1 timestamp or garbage */
468+
469+
char *p_end;
470+
471+
strtoumax(command, &p_end, 10);
472+
trace_printf_key(&trace_fsmonitor,
473+
((*p_end) ?
474+
"fsmonitor: invalid command line '%s'" :
475+
"fsmonitor: unsupported V1 protocol '%s'"),
476+
command);
477+
do_trivial = 1;
478+
479+
} else {
480+
/* We have "builtin:*" */
481+
if (fsmonitor_parse_client_token(command, &requested_token_id,
482+
&requested_oldest_seq_nr)) {
483+
trace_printf_key(&trace_fsmonitor,
484+
"fsmonitor: invalid V2 protocol token '%s'",
485+
command);
486+
do_trivial = 1;
487+
488+
} else {
489+
/*
490+
* We have a V2 valid token:
491+
* "builtin:<token_id>:<seq_nr>"
492+
*/
493+
}
494+
}
495+
496+
pthread_mutex_lock(&state->main_lock);
497+
498+
if (!state->current_token_data)
499+
BUG("fsmonitor state does not have a current token");
500+
501+
if (do_flush)
502+
with_lock__do_force_resync(state);
503+
504+
/*
505+
* We mark the current head of the batch list as "pinned" so
506+
* that the listener thread will treat this item as read-only
507+
* (and prevent any more paths from being added to it) from
508+
* now on.
509+
*/
510+
token_data = state->current_token_data;
511+
batch_head = token_data->batch_head;
512+
((struct fsmonitor_batch *)batch_head)->pinned_time = time(NULL);
513+
514+
/*
515+
* FSMonitor Protocol V2 requires that we send a response header
516+
* with a "new current token" and then all of the paths that changed
517+
* since the "requested token". We send the seq_nr of the just-pinned
518+
* head batch so that future requests from a client will be relative
519+
* to it.
520+
*/
521+
with_lock__format_response_token(&response_token,
522+
&token_data->token_id, batch_head);
523+
524+
reply(reply_data, response_token.buf, response_token.len + 1);
525+
total_response_len += response_token.len + 1;
526+
527+
trace2_data_string("fsmonitor", the_repository, "response/token",
528+
response_token.buf);
529+
trace_printf_key(&trace_fsmonitor, "response token: %s",
530+
response_token.buf);
531+
532+
if (!do_trivial) {
533+
if (strcmp(requested_token_id.buf, token_data->token_id.buf)) {
534+
/*
535+
* The client last spoke to a different daemon
536+
* instance -OR- the daemon had to resync with
537+
* the filesystem (and lost events), so reject.
538+
*/
539+
trace2_data_string("fsmonitor", the_repository,
540+
"response/token", "different");
541+
do_trivial = 1;
542+
543+
} else if (requested_oldest_seq_nr <
544+
token_data->batch_tail->batch_seq_nr) {
545+
/*
546+
* The client wants older events than we have for
547+
* this token_id. This means that the end of our
548+
* batch list was truncated and we cannot give the
549+
* client a complete snapshot relative to their
550+
* request.
551+
*/
552+
trace_printf_key(&trace_fsmonitor,
553+
"client requested truncated data");
554+
do_trivial = 1;
555+
}
556+
}
557+
558+
if (do_trivial) {
559+
pthread_mutex_unlock(&state->main_lock);
560+
561+
reply(reply_data, "/", 2);
562+
563+
trace2_data_intmax("fsmonitor", the_repository,
564+
"response/trivial", 1);
565+
566+
goto cleanup;
567+
}
568+
569+
/*
570+
* We're going to hold onto a pointer to the current
571+
* token-data while we walk the list of batches of files.
572+
* During this time, we will NOT be under the lock.
573+
* So we ref-count it.
574+
*
575+
* This allows the listener thread to continue prepending
576+
* new batches of items to the token-data (which we'll ignore).
577+
*
578+
* AND it allows the listener thread to do a token-reset
579+
* (and install a new `current_token_data`).
580+
*/
581+
token_data->client_ref_count++;
582+
583+
pthread_mutex_unlock(&state->main_lock);
584+
585+
/*
586+
* The client request is relative to the token that they sent,
587+
* so walk the batch list backwards from the current head back
588+
* to the batch (sequence number) they named.
589+
*
590+
* We use khash to de-dup the list of pathnames.
591+
*
592+
* NEEDSWORK: each batch contains a list of interned strings,
593+
* so we only need to do pointer comparisons here to build the
594+
* hash table. Currently, we're still comparing the string
595+
* values.
596+
*/
597+
shown = kh_init_str();
598+
for (batch = batch_head;
599+
batch && batch->batch_seq_nr > requested_oldest_seq_nr;
600+
batch = batch->next) {
601+
size_t k;
602+
603+
for (k = 0; k < batch->nr; k++) {
604+
const char *s = batch->interned_paths[k];
605+
size_t s_len;
606+
607+
if (kh_get_str(shown, s) != kh_end(shown))
608+
duplicates++;
609+
else {
610+
kh_put_str(shown, s, &hash_ret);
611+
612+
trace_printf_key(&trace_fsmonitor,
613+
"send[%"PRIuMAX"]: %s",
614+
count, s);
615+
616+
/* Each path gets written with a trailing NUL */
617+
s_len = strlen(s) + 1;
618+
619+
if (payload.len + s_len >=
620+
LARGE_PACKET_DATA_MAX) {
621+
reply(reply_data, payload.buf,
622+
payload.len);
623+
total_response_len += payload.len;
624+
strbuf_reset(&payload);
625+
}
626+
627+
strbuf_add(&payload, s, s_len);
628+
count++;
629+
}
630+
}
631+
}
632+
633+
if (payload.len) {
634+
reply(reply_data, payload.buf, payload.len);
635+
total_response_len += payload.len;
636+
}
637+
638+
kh_release_str(shown);
639+
640+
pthread_mutex_lock(&state->main_lock);
641+
642+
if (token_data->client_ref_count > 0)
643+
token_data->client_ref_count--;
644+
645+
if (token_data->client_ref_count == 0) {
646+
if (token_data != state->current_token_data) {
647+
/*
648+
* The listener thread did a token-reset while we were
649+
* walking the batch list. Therefore, this token is
650+
* stale and can be discarded completely. If we are
651+
* the last reader thread using this token, we own
652+
* that work.
653+
*/
654+
fsmonitor_free_token_data(token_data);
655+
}
656+
}
657+
658+
pthread_mutex_unlock(&state->main_lock);
659+
660+
trace2_data_intmax("fsmonitor", the_repository, "response/length", total_response_len);
661+
trace2_data_intmax("fsmonitor", the_repository, "response/count/files", count);
662+
trace2_data_intmax("fsmonitor", the_repository, "response/count/duplicates", duplicates);
663+
664+
cleanup:
665+
strbuf_release(&response_token);
666+
strbuf_release(&requested_token_id);
667+
strbuf_release(&payload);
668+
669+
return 0;
670+
}
671+
367672
static ipc_server_application_cb handle_client;
368673

369674
static int handle_client(void *data,
370675
const char *command, size_t command_len,
371676
ipc_server_reply_cb *reply,
372677
struct ipc_server_reply_data *reply_data)
373678
{
374-
/* struct fsmonitor_daemon_state *state = data; */
679+
struct fsmonitor_daemon_state *state = data;
375680
int result;
376681

377682
/*
@@ -382,10 +687,12 @@ static int handle_client(void *data,
382687
if (command_len != strlen(command))
383688
BUG("FSMonitor assumes text messages");
384689

690+
trace_printf_key(&trace_fsmonitor, "requested token: %s", command);
691+
385692
trace2_region_enter("fsmonitor", "handle_client", the_repository);
386693
trace2_data_string("fsmonitor", the_repository, "request", command);
387694

388-
result = 0; /* TODO Do something here. */
695+
result = do_handle_client(state, command, reply, reply_data);
389696

390697
trace2_region_leave("fsmonitor", "handle_client", the_repository);
391698

0 commit comments

Comments
 (0)