Skip to content

Commit a046a68

Browse files
authored
Merge pull request #5282 from grondo/pty-output-ref
shell: ensure captured pty data does not come after stdout EOF in output eventlog
2 parents e915f2c + 67cc8b2 commit a046a68

File tree

12 files changed

+253
-7
lines changed

12 files changed

+253
-7
lines changed

src/cmd/flux-job.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1933,6 +1933,9 @@ void attach_stdin_cb (flux_reactor_t *r, flux_watcher_t *w,
19331933
*/
19341934
void attach_output_start (struct attach_ctx *ctx)
19351935
{
1936+
if (ctx->output_f)
1937+
return;
1938+
19361939
if (!(ctx->output_f = flux_job_event_watch (ctx->h,
19371940
ctx->id,
19381941
"guest.output",

src/common/libflux/ev_buffer_read.c

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ static void buffer_read_cb (struct ev_loop *loop, ev_io *iow, int revents)
8787
return;
8888

8989
if (!ret) {
90-
ebr->eof_read = true;
90+
ev_buffer_read_decref (ebr);
9191
(void)flux_buffer_readonly (ebr->fb);
9292
ev_io_stop (ebr->loop, iow);
9393
}
@@ -138,6 +138,7 @@ int ev_buffer_read_init (struct ev_buffer_read *ebr,
138138
ebr->start = false;
139139
ebr->eof_read = false;
140140
ebr->eof_sent = false;
141+
ebr->refcnt = 1;
141142

142143
if (!(ebr->fb = flux_buffer_create (size)))
143144
goto cleanup;
@@ -195,6 +196,17 @@ void ev_buffer_read_stop (struct ev_loop *loop, struct ev_buffer_read *ebr)
195196
}
196197
}
197198

199+
void ev_buffer_read_incref (struct ev_buffer_read *ebr)
200+
{
201+
ebr->refcnt++;
202+
}
203+
204+
void ev_buffer_read_decref (struct ev_buffer_read *ebr)
205+
{
206+
if (--ebr->refcnt == 0)
207+
ebr->eof_read = true;
208+
}
209+
198210
/*
199211
* vi:tabstop=4 shiftwidth=4 expandtab
200212
*/

src/common/libflux/ev_buffer_read.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ typedef void (*ev_buffer_read_f)(struct ev_loop *loop,
2121
int revents);
2222

2323
struct ev_buffer_read {
24+
int refcnt;
2425
ev_io io_w;
2526
ev_prepare prepare_w;
2627
ev_idle idle_w;
@@ -44,5 +45,7 @@ int ev_buffer_read_init (struct ev_buffer_read *ebr,
4445
void ev_buffer_read_cleanup (struct ev_buffer_read *ebr);
4546
void ev_buffer_read_start (struct ev_loop *loop, struct ev_buffer_read *ebr);
4647
void ev_buffer_read_stop (struct ev_loop *loop, struct ev_buffer_read *ebr);
48+
void ev_buffer_read_incref (struct ev_buffer_read *ebr);
49+
void ev_buffer_read_decref (struct ev_buffer_read *ebr);
4750

4851
#endif /* !_EV_BUFFER_READ_H */

src/common/libflux/reactor.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,18 @@ const char *flux_buffer_read_watcher_get_data (flux_watcher_t *w, int *lenp)
463463
return NULL;
464464
}
465465

466+
void flux_buffer_read_watcher_incref (flux_watcher_t *w)
467+
{
468+
if (w)
469+
ev_buffer_read_incref ((struct ev_buffer_read *)w->data);
470+
}
471+
472+
void flux_buffer_read_watcher_decref (flux_watcher_t *w)
473+
{
474+
if (w)
475+
ev_buffer_read_decref ((struct ev_buffer_read *)w->data);
476+
}
477+
466478
static void buffer_write_start (flux_watcher_t *w)
467479
{
468480
struct ev_buffer_write *ebw = (struct ev_buffer_write *)w->data;

src/common/libflux/reactor.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ flux_buffer_t *flux_buffer_read_watcher_get_buffer (flux_watcher_t *w);
115115
const char *flux_buffer_read_watcher_get_data (flux_watcher_t *w,
116116
int *lenp);
117117

118+
/* Take a reference on read watcher to prevent read of EOF
119+
* EOF will be delayed until decref drops refcount to 0.
120+
*/
121+
void flux_buffer_read_watcher_incref (flux_watcher_t *w);
122+
void flux_buffer_read_watcher_decref (flux_watcher_t *w);
123+
118124
/* 'cb' only called after fd closed (FLUX_POLLOUT) or error (FLUX_POLLERR) */
119125
flux_watcher_t *flux_buffer_write_watcher_create (flux_reactor_t *r, int fd,
120126
int size, flux_watcher_f cb,

src/common/libflux/test/reactor.c

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,10 +708,72 @@ static void test_buffer (flux_reactor_t *reactor)
708708

709709
struct buffer_fd_close
710710
{
711+
flux_watcher_t *w;
711712
int count;
712713
int fd;
713714
};
714715

716+
static void buffer_decref (flux_reactor_t *r,
717+
flux_watcher_t *w,
718+
int revents,
719+
void *arg)
720+
{
721+
struct buffer_fd_close *bfc = arg;
722+
bfc->count++;
723+
flux_buffer_read_watcher_decref (bfc->w);
724+
ok (true, "flux_buffer_read_watcher_decref");
725+
flux_watcher_destroy (w);
726+
}
727+
728+
static void buffer_read_fd_decref (flux_reactor_t *r,
729+
flux_watcher_t *w,
730+
int revents,
731+
void *arg)
732+
{
733+
struct buffer_fd_close *bfc = arg;
734+
flux_buffer_t *fb;
735+
const void *ptr;
736+
int len;
737+
738+
if (revents & FLUX_POLLERR) {
739+
fail ("buffer decref: got FLUX_POLLERR");
740+
return;
741+
}
742+
if (!(revents & FLUX_POLLIN)) {
743+
fail ("buffer decref: got FLUX_POLLERR");
744+
return;
745+
}
746+
747+
fb = flux_buffer_read_watcher_get_buffer (w);
748+
ok ((ptr = flux_buffer_read (fb, -1, &len)) != NULL,
749+
"buffer decref: read from buffer success");
750+
if (!bfc->count) {
751+
flux_watcher_t *w;
752+
ok (len == 6,
753+
"buffer decref: read returned correct length");
754+
ok (!memcmp (ptr, "foobar", 6),
755+
"buffer decref: read returned correct data");
756+
diag ("closing write side of read buffer");
757+
close (bfc->fd);
758+
759+
/* Schedule decref of read buffer
760+
*/
761+
w = flux_timer_watcher_create (r, 0.01, 0., buffer_decref, bfc);
762+
flux_watcher_start (w);
763+
}
764+
else {
765+
ok (bfc->count == 2,
766+
"buffer decref: EOF called only after manual decref");
767+
ok ((ptr = flux_buffer_read (fb, -1, &len)) != NULL,
768+
"buffer decref: read from buffer success");
769+
770+
ok (len == 0,
771+
"buffer decref: read returned 0, socketpair is closed");
772+
flux_watcher_stop (w);
773+
}
774+
bfc->count++;
775+
}
776+
715777
static void buffer_read_fd_close (flux_reactor_t *r, flux_watcher_t *w,
716778
int revents, void *arg)
717779
{
@@ -865,6 +927,49 @@ static void buffer_read_line_fd_close_and_left_over_data (flux_reactor_t *r,
865927
return;
866928
}
867929

930+
static void test_buffer_refcnt (flux_reactor_t *reactor)
931+
{
932+
int fd[2];
933+
flux_watcher_t *w;
934+
struct buffer_fd_close bfc;
935+
936+
/* read buffer decref test - other end closes stream */
937+
938+
ok (socketpair (PF_LOCAL, SOCK_STREAM|SOCK_NONBLOCK, 0, fd) == 0,
939+
"buffer decref: successfully created socketpair");
940+
941+
bfc.count = 0;
942+
bfc.fd = fd[1];
943+
w = flux_buffer_read_watcher_create (reactor,
944+
fd[0],
945+
1024,
946+
buffer_read_fd_decref,
947+
0,
948+
&bfc);
949+
ok (w != NULL,
950+
"buffer decref: read created");
951+
bfc.w = w;
952+
953+
ok (write (fd[1], "foobar", 6) == 6,
954+
"buffer decref: write to socketpair success");
955+
956+
flux_watcher_start (w);
957+
958+
diag ("calling flux_buffer_read_watcher_incref");
959+
flux_buffer_read_watcher_incref (w);
960+
961+
ok (flux_reactor_run (reactor, 0) == 0,
962+
"buffer decref: reactor ran to completion");
963+
964+
ok (bfc.count == 3,
965+
"buffer decref: read callback successfully called thrice");
966+
967+
flux_watcher_stop (w);
968+
flux_watcher_destroy (w);
969+
970+
close (fd[0]);
971+
}
972+
868973
static void test_buffer_corner_case (flux_reactor_t *reactor)
869974
{
870975
int fd[2];
@@ -1521,6 +1626,7 @@ int main (int argc, char *argv[])
15211626
test_periodic (reactor);
15221627
test_fd (reactor);
15231628
test_buffer (reactor);
1629+
test_buffer_refcnt (reactor);
15241630
test_buffer_corner_case (reactor);
15251631
test_idle (reactor);
15261632
test_prepcheck (reactor);

src/common/libsubprocess/subprocess.c

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,6 +1232,27 @@ int flux_set_default_subprocess_log (flux_t *h,
12321232
return 0;
12331233
}
12341234

1235+
void flux_subprocess_channel_incref (flux_subprocess_t *p, const char *name)
1236+
{
1237+
struct subprocess_channel *c;
1238+
if (!p || !p->local)
1239+
return;
1240+
if (!(c = zhash_lookup (p->channels, name)))
1241+
return;
1242+
flux_buffer_read_watcher_incref (c->buffer_read_w);
1243+
}
1244+
1245+
void flux_subprocess_channel_decref (flux_subprocess_t *p, const char *name)
1246+
{
1247+
struct subprocess_channel *c;
1248+
if (!p || !p->local)
1249+
return;
1250+
if (!(c = zhash_lookup (p->channels, name)))
1251+
return;
1252+
flux_buffer_read_watcher_decref (c->buffer_read_w);
1253+
}
1254+
1255+
12351256
/*
12361257
* vi: ts=4 sw=4 expandtab
12371258
*/

src/common/libsubprocess/subprocess.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,13 @@ int flux_subprocess_aux_set (flux_subprocess_t *p,
493493
*/
494494
void *flux_subprocess_aux_get (flux_subprocess_t *p, const char *name);
495495

496+
/*
497+
* Take/drop a reference on a subprocess output channel `name` (e.g. "stdout"
498+
* or "stderr"). EOF will not be produced from this channel the reference
499+
* count drops to zero.
500+
*/
501+
void flux_subprocess_channel_incref (flux_subprocess_t *p, const char *name);
502+
void flux_subprocess_channel_decref (flux_subprocess_t *p, const char *name);
496503

497504
#ifdef __cplusplus
498505
}

src/common/libsubprocess/test/iochan.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ struct iochan_ctx {
3636
int recvcount;
3737
int sendcount;
3838
int count;
39+
int refcount;
3940
const char *name;
4041
};
4142

@@ -108,6 +109,11 @@ static void iochan_state_cb (flux_subprocess_t *p,
108109
diag ("%s: %s", ctx->name, strsignal (WTERMSIG (status)));
109110
// completion callback will exit the reactor, but just in case
110111
iochan_start_doomsday (ctx, 2.);
112+
// if testing refcnt, release stdout reference now
113+
if (streq (ctx->name, "refcnt")) {
114+
ctx->refcount--;
115+
flux_subprocess_channel_decref (ctx->p, "stdout");
116+
}
111117
break;
112118
}
113119
case FLUX_SUBPROCESS_FAILED:
@@ -200,6 +206,10 @@ bool iochan_run_check (flux_t *h, const char *name, int count)
200206
tap_logger,
201207
NULL)))
202208
BAIL_OUT ("flux_rexec_ex failed");
209+
if (streq (ctx.name, "refcnt")) {
210+
ctx.refcount++;
211+
flux_subprocess_channel_incref (ctx.p, "stdout");
212+
}
203213
if (flux_subprocess_aux_set (ctx.p, "ctx", &ctx, NULL) < 0)
204214
BAIL_OUT ("flux_subprocess_aux_set failed");
205215

@@ -224,6 +234,12 @@ bool iochan_run_check (flux_t *h, const char *name, int count)
224234
if (ctx.recvcount < ctx.sendcount)
225235
ret = false;
226236

237+
if (streq (ctx.name, "refcnt")) {
238+
diag ("%s: final refcount: %d", ctx.name, ctx.refcount);
239+
if (ctx.refcount != 0)
240+
ret = false;
241+
}
242+
227243
flux_watcher_destroy (ctx.source);
228244
flux_watcher_destroy (ctx.timer);
229245
diag ("%s: destroying subprocess", name);
@@ -247,6 +263,8 @@ int main (int argc, char *argv[])
247263
"medium check worked");
248264
ok (iochan_run_check (h, "simple", linesize * 10000),
249265
"large check worked");
266+
ok (iochan_run_check (h, "refcnt", linesize * 10),
267+
"refcount check worked");
250268
test_server_stop (h);
251269
flux_close (h);
252270

src/common/libterminus/pty.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,12 @@ void pty_client_send_data (struct flux_pty *pty, void *data, int len)
425425
}
426426
}
427427

428+
void pty_client_monitor_send_eof (struct flux_pty *pty)
429+
{
430+
if (pty->monitor)
431+
(*pty->monitor) (pty, NULL, 0);
432+
}
433+
428434
static void pty_read (flux_reactor_t *r,
429435
flux_watcher_t *w,
430436
int revents,
@@ -452,6 +458,7 @@ static void pty_read (flux_reactor_t *r,
452458
flux_watcher_stop (pty->fdw);
453459
pty->wait_on_close = false;
454460
check_pty_complete (pty);
461+
pty_client_monitor_send_eof (pty);
455462
return;
456463
}
457464
llog_error (pty, "read: %s", strerror (errno));

0 commit comments

Comments
 (0)