Skip to content

Commit 823223f

Browse files
peffdscho
authored andcommitted
simple-ipc: split async server initialization and running
To start an async ipc server, you call ipc_server_run_async(). That initializes the ipc_server_data object, and starts all of the threads running, which may immediately start serving clients. This can create some awkward timing problems, though. In the fsmonitor daemon (the sole user of the simple-ipc system), we want to create the ipc server early in the process, which means we may start serving clients before the rest of the daemon is fully initialized. To solve this, let's break run_async() into two parts: an initialization which allocates all data and spawns the threads (without letting them run), and a start function which actually lets them begin work. Since we have two simple-ipc implementations, we have to handle this twice: - in ipc-unix-socket.c, we have a central listener thread which hands connections off to worker threads using a work_available mutex. We can hold that mutex after init, and release it when we're ready to start. We do need an extra "started" flag so that we know whether the main thread is holding the mutex or not (e.g., if we prematurely stop the server, we want to make sure all of the worker threads are released to hear about the shutdown). - in ipc-win32.c, we don't have a central mutex. So we'll introduce a new startup_barrier mutex, which we'll similarly hold until we're ready to let the threads proceed. We again need a "started" flag here to make sure that we release the barrier mutex when shutting down, so that the sub-threads can proceed to the finish. I've renamed the run_async() function to init_async() to make sure we catch all callers, since they'll now need to call the matching start_async(). We could leave run_async() as a wrapper that does both, but there's not much point. There are only two callers, one of which is fsmonitor, which will want to actually do work between the two calls. And the other is just a test-tool wrapper. For now I've added the start_async() calls in fsmonitor where they would otherwise have happened, so there should be no behavior change with this patch. Signed-off-by: Jeff King <[email protected]> Acked-by: Koji Nakamaru <[email protected]> Signed-off-by: Junio C Hamano <[email protected]>
1 parent a89881e commit 823223f

File tree

5 files changed

+88
-18
lines changed

5 files changed

+88
-18
lines changed

builtin/fsmonitor--daemon.c

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1208,13 +1208,15 @@ static int fsmonitor_run_daemon_1(struct fsmonitor_daemon_state *state)
12081208
* system event listener thread so that we have the IPC handle
12091209
* before we need it.
12101210
*/
1211-
if (ipc_server_run_async(&state->ipc_server_data,
1212-
state->path_ipc.buf, &ipc_opts,
1213-
handle_client, state))
1211+
if (ipc_server_init_async(&state->ipc_server_data,
1212+
state->path_ipc.buf, &ipc_opts,
1213+
handle_client, state))
12141214
return error_errno(
12151215
_("could not start IPC thread pool on '%s'"),
12161216
state->path_ipc.buf);
12171217

1218+
ipc_server_start_async(&state->ipc_server_data);
1219+
12181220
/*
12191221
* Start the fsmonitor listener thread to collect filesystem
12201222
* events.

compat/simple-ipc/ipc-shared.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@ int ipc_server_run(const char *path, const struct ipc_server_opts *opts,
1616
struct ipc_server_data *server_data = NULL;
1717
int ret;
1818

19-
ret = ipc_server_run_async(&server_data, path, opts,
20-
application_cb, application_data);
19+
ret = ipc_server_init_async(&server_data, path, opts,
20+
application_cb, application_data);
2121
if (ret)
2222
return ret;
2323

24+
ipc_server_start_async(server_data);
2425
ret = ipc_server_await(server_data);
2526

2627
ipc_server_free(server_data);

compat/simple-ipc/ipc-unix-socket.c

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ struct ipc_server_data {
328328
int back_pos;
329329
int front_pos;
330330

331+
int started;
331332
int shutdown_requested;
332333
int is_stopped;
333334
};
@@ -824,10 +825,10 @@ static int setup_listener_socket(
824825
/*
825826
* Start IPC server in a pool of background threads.
826827
*/
827-
int ipc_server_run_async(struct ipc_server_data **returned_server_data,
828-
const char *path, const struct ipc_server_opts *opts,
829-
ipc_server_application_cb *application_cb,
830-
void *application_data)
828+
int ipc_server_init_async(struct ipc_server_data **returned_server_data,
829+
const char *path, const struct ipc_server_opts *opts,
830+
ipc_server_application_cb *application_cb,
831+
void *application_data)
831832
{
832833
struct unix_ss_socket *server_socket = NULL;
833834
struct ipc_server_data *server_data;
@@ -888,6 +889,12 @@ int ipc_server_run_async(struct ipc_server_data **returned_server_data,
888889
server_data->accept_thread->fd_send_shutdown = sv[0];
889890
server_data->accept_thread->fd_wait_shutdown = sv[1];
890891

892+
/*
893+
* Hold work-available mutex so that no work can start until
894+
* we unlock it.
895+
*/
896+
pthread_mutex_lock(&server_data->work_available_mutex);
897+
891898
if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
892899
accept_thread_proc, server_data->accept_thread))
893900
die_errno(_("could not start accept_thread '%s'"), path);
@@ -918,6 +925,15 @@ int ipc_server_run_async(struct ipc_server_data **returned_server_data,
918925
return 0;
919926
}
920927

928+
void ipc_server_start_async(struct ipc_server_data *server_data)
929+
{
930+
if (!server_data || server_data->started)
931+
return;
932+
933+
server_data->started = 1;
934+
pthread_mutex_unlock(&server_data->work_available_mutex);
935+
}
936+
921937
/*
922938
* Gently tell the IPC server treads to shutdown.
923939
* Can be run on any thread.
@@ -933,7 +949,9 @@ int ipc_server_stop_async(struct ipc_server_data *server_data)
933949

934950
trace2_region_enter("ipc-server", "server-stop-async", NULL);
935951

936-
pthread_mutex_lock(&server_data->work_available_mutex);
952+
/* If we haven't started yet, we are already holding lock. */
953+
if (server_data->started)
954+
pthread_mutex_lock(&server_data->work_available_mutex);
937955

938956
server_data->shutdown_requested = 1;
939957

compat/simple-ipc/ipc-win32.c

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,9 @@ struct ipc_server_data {
371371
HANDLE hEventStopRequested;
372372
struct ipc_server_thread_data *thread_list;
373373
int is_stopped;
374+
375+
pthread_mutex_t startup_barrier;
376+
int started;
374377
};
375378

376379
enum connect_result {
@@ -526,6 +529,16 @@ static int use_connection(struct ipc_server_thread_data *server_thread_data)
526529
return ret;
527530
}
528531

532+
static void wait_for_startup_barrier(struct ipc_server_data *server_data)
533+
{
534+
/*
535+
* Temporarily hold the startup_barrier mutex before starting,
536+
* which lets us know that it's OK to start serving requests.
537+
*/
538+
pthread_mutex_lock(&server_data->startup_barrier);
539+
pthread_mutex_unlock(&server_data->startup_barrier);
540+
}
541+
529542
/*
530543
* Thread proc for an IPC server worker thread. It handles a series of
531544
* connections from clients. It cleans and reuses the hPipe between each
@@ -550,6 +563,8 @@ static void *server_thread_proc(void *_server_thread_data)
550563
memset(&oConnect, 0, sizeof(oConnect));
551564
oConnect.hEvent = hEventConnected;
552565

566+
wait_for_startup_barrier(server_thread_data->server_data);
567+
553568
for (;;) {
554569
cr = wait_for_connection(server_thread_data, &oConnect);
555570

@@ -752,10 +767,10 @@ static HANDLE create_new_pipe(wchar_t *wpath, int is_first)
752767
return hPipe;
753768
}
754769

755-
int ipc_server_run_async(struct ipc_server_data **returned_server_data,
756-
const char *path, const struct ipc_server_opts *opts,
757-
ipc_server_application_cb *application_cb,
758-
void *application_data)
770+
int ipc_server_init_async(struct ipc_server_data **returned_server_data,
771+
const char *path, const struct ipc_server_opts *opts,
772+
ipc_server_application_cb *application_cb,
773+
void *application_data)
759774
{
760775
struct ipc_server_data *server_data;
761776
wchar_t wpath[MAX_PATH];
@@ -787,6 +802,13 @@ int ipc_server_run_async(struct ipc_server_data **returned_server_data,
787802
strbuf_addstr(&server_data->buf_path, path);
788803
wcscpy(server_data->wpath, wpath);
789804

805+
/*
806+
* Hold the startup_barrier lock so that no threads will progress
807+
* until ipc_server_start_async() is called.
808+
*/
809+
pthread_mutex_init(&server_data->startup_barrier, NULL);
810+
pthread_mutex_lock(&server_data->startup_barrier);
811+
790812
if (nr_threads < 1)
791813
nr_threads = 1;
792814

@@ -837,6 +859,15 @@ int ipc_server_run_async(struct ipc_server_data **returned_server_data,
837859
return 0;
838860
}
839861

862+
void ipc_server_start_async(struct ipc_server_data *server_data)
863+
{
864+
if (!server_data || server_data->started)
865+
return;
866+
867+
server_data->started = 1;
868+
pthread_mutex_unlock(&server_data->startup_barrier);
869+
}
870+
840871
int ipc_server_stop_async(struct ipc_server_data *server_data)
841872
{
842873
if (!server_data)
@@ -850,6 +881,13 @@ int ipc_server_stop_async(struct ipc_server_data *server_data)
850881
* We DO NOT attempt to force them to drop an active connection.
851882
*/
852883
SetEvent(server_data->hEventStopRequested);
884+
885+
/*
886+
* If we haven't yet told the threads they are allowed to run,
887+
* do so now, so they can receive the shutdown event.
888+
*/
889+
ipc_server_start_async(server_data);
890+
853891
return 0;
854892
}
855893

@@ -900,5 +938,7 @@ void ipc_server_free(struct ipc_server_data *server_data)
900938
free(std);
901939
}
902940

941+
pthread_mutex_destroy(&server_data->startup_barrier);
942+
903943
free(server_data);
904944
}

simple-ipc.h

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,11 +179,20 @@ struct ipc_server_opts
179179
* When a client IPC message is received, the `application_cb` will be
180180
* called (possibly on a random thread) to handle the message and
181181
* optionally compose a reply message.
182+
*
183+
* This initializes all threads but no actual work will be done until
184+
* ipc_server_start_async() is called.
185+
*/
186+
int ipc_server_init_async(struct ipc_server_data **returned_server_data,
187+
const char *path, const struct ipc_server_opts *opts,
188+
ipc_server_application_cb *application_cb,
189+
void *application_data);
190+
191+
/*
192+
* Let an async server start running. This needs to be called only once
193+
* after initialization.
182194
*/
183-
int ipc_server_run_async(struct ipc_server_data **returned_server_data,
184-
const char *path, const struct ipc_server_opts *opts,
185-
ipc_server_application_cb *application_cb,
186-
void *application_data);
195+
void ipc_server_start_async(struct ipc_server_data *server_data);
187196

188197
/*
189198
* Gently signal the IPC server pool to shutdown. No new client

0 commit comments

Comments
 (0)