Skip to content

Commit 6300544

Browse files
committed
sdexec: handle early sdexec.write requests
Problem: if an sdexec.write request arrives before the unit stdin is ready, it will be dropped. This is possible now that sdexec.write identifies the unit by the matchtag instead of the pid. Queue early sdexec.write requests until stdin is valid, then move them back to the flux handle for processing as though they are being received for the first time.
1 parent dfdbb43 commit 6300544

File tree

1 file changed

+33
-0
lines changed

1 file changed

+33
-0
lines changed

src/modules/sdexec/sdexec.c

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ struct sdproc {
6565
flux_future_t *f_start;
6666
flux_future_t *f_stop;
6767
struct unit *unit;
68+
struct flux_msglist *write_requests;
6869
struct channel *in;
6970
struct channel *out;
7071
struct channel *err;
@@ -365,6 +366,21 @@ static void start_continuation (flux_future_t *f, void *arg)
365366
sdexec_channel_close_fd (proc->in);
366367
sdexec_channel_close_fd (proc->out);
367368
sdexec_channel_close_fd (proc->err);
369+
/* Now that stdin is ready, re-queue any messages write_cb() left in
370+
* proc->write_requests. Push these messages to the front of the flux_t
371+
* queue so that they come before unprocessed writes, if any.
372+
*/
373+
if (proc->write_requests) {
374+
const flux_msg_t *request;
375+
while ((request = flux_msglist_pop (proc->write_requests))) {
376+
int rc = flux_requeue (ctx->h, request, FLUX_RQ_HEAD);
377+
flux_msg_decref (request);
378+
if (rc < 0) {
379+
flux_log_error (ctx->h, "error requeuing early sdexec.write");
380+
break;
381+
}
382+
}
383+
}
368384
return;
369385
error:
370386
if (flux_respond_error (ctx->h, msg, errno, future_strerror (f, errno)))
@@ -445,6 +461,7 @@ static void sdproc_destroy (struct sdproc *proc)
445461
flux_future_destroy (proc->f_stop);
446462
sdexec_unit_destroy (proc->unit);
447463
json_decref (proc->cmd);
464+
flux_msglist_destroy (proc->write_requests);
448465
free (proc);
449466
errno = saved_errno;
450467
}
@@ -760,6 +777,22 @@ static void write_cb (flux_t *h,
760777
flux_log (h, LOG_ERR, "sdexec.write: subprocess no longer exists");
761778
return;
762779
}
780+
/* If the systemd unit has not started yet, enqueue the write request for
781+
* later processing in start_continuation(). We can tell that it hasn't
782+
* started if start_continuation() has not yet handed the stdin channel
783+
* file descriptor over to systemd by calling the close function.
784+
*/
785+
if (sdexec_channel_get_fd (proc->in) != -1) { // not yet claimed by systemd
786+
if (!proc->write_requests) {
787+
if (!(proc->write_requests = flux_msglist_create ())) {
788+
flux_log_error (h, "sdexec.write: error creating write queue");
789+
return;
790+
}
791+
}
792+
if (flux_msglist_push (proc->write_requests, msg) < 0)
793+
flux_log_error (h, "sdexec.write: error enqueueing write request");
794+
return;
795+
}
763796
if (iodecode (io, &stream, NULL, NULL, NULL, NULL) == 0
764797
&& !streq (stream, "stdin")) {
765798
flux_log (h, LOG_ERR, "sdexec.write: %s is an invalid stream", stream);

0 commit comments

Comments
 (0)