Skip to content

Commit dfdbb43

Browse files
committed
libsubprocess: use matchtag in client write req
Problem: RFC 42 now requires write requests to reference the subprocess via the exec matchtag rather than the pid. Send the matchtag in the write request instead of the pid and alter the internal function prototype for subprocess_write() to accept the exec future in lieu of rank, service, and matchtag. Update users of that function. Also update a unit test that uses the raw protocol.
1 parent 5f80d28 commit dfdbb43

File tree

5 files changed

+30
-42
lines changed

5 files changed

+30
-42
lines changed

src/common/libsubprocess/client.c

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ struct rexec_ctx {
4444
json_t *cmd;
4545
int flags;
4646
struct rexec_response response;
47+
uint32_t matchtag;
48+
uint32_t rank;
49+
char *service_name;
4750
};
4851

4952
static void rexec_response_clear (struct rexec_response *resp)
@@ -62,12 +65,16 @@ static void rexec_ctx_destroy (struct rexec_ctx *ctx)
6265
int saved_errno = errno;
6366
rexec_response_clear (&ctx->response);
6467
json_decref (ctx->cmd);
68+
free (ctx->service_name);
6569
free (ctx);
6670
errno = saved_errno;
6771
}
6872
}
6973

70-
static struct rexec_ctx *rexec_ctx_create (flux_cmd_t *cmd, int flags)
74+
static struct rexec_ctx *rexec_ctx_create (flux_cmd_t *cmd,
75+
const char *service_name,
76+
uint32_t rank,
77+
int flags)
7178
{
7279
struct rexec_ctx *ctx;
7380
int valid_flags = SUBPROCESS_REXEC_STDOUT
@@ -80,10 +87,12 @@ static struct rexec_ctx *rexec_ctx_create (flux_cmd_t *cmd, int flags)
8087
}
8188
if (!(ctx = calloc (1, sizeof (*ctx))))
8289
return NULL;
83-
if (!(ctx->cmd = cmd_tojson (cmd)))
90+
if (!(ctx->cmd = cmd_tojson (cmd))
91+
|| !(ctx->service_name = strdup (service_name)))
8492
goto error;
8593
ctx->flags = flags;
8694
ctx->response.pid = -1;
95+
ctx->rank = rank;
8796
return ctx;
8897
error:
8998
rexec_ctx_destroy (ctx);
@@ -106,7 +115,7 @@ flux_future_t *subprocess_rexec (flux_t *h,
106115
}
107116
if (asprintf (&topic, "%s.exec", service_name) < 0)
108117
return NULL;
109-
if (!(ctx = rexec_ctx_create (cmd, flags)))
118+
if (!(ctx = rexec_ctx_create (cmd, service_name, rank, flags)))
110119
goto error;
111120
if (!(f = flux_rpc_pack (h,
112121
topic,
@@ -122,6 +131,7 @@ flux_future_t *subprocess_rexec (flux_t *h,
122131
rexec_ctx_destroy (ctx);
123132
goto error;
124133
}
134+
ctx->matchtag = flux_rpc_get_matchtag (f);
125135
free (topic);
126136
return f;
127137
error:
@@ -223,33 +233,32 @@ bool subprocess_rexec_is_output (flux_future_t *f,
223233
return false;
224234
}
225235

226-
int subprocess_write (flux_t *h,
227-
const char *service_name,
228-
uint32_t rank,
229-
pid_t pid,
236+
int subprocess_write (flux_future_t *f_exec,
230237
const char *stream,
231238
const char *data,
232239
int len,
233240
bool eof)
234241
{
242+
struct rexec_ctx *ctx = flux_future_aux_get (f_exec, "flux::rexec");
243+
flux_t *h = flux_future_get_flux (f_exec);
235244
flux_future_t *f = NULL;
236245
json_t *io;
237246
char *topic;
238247
int rc = -1;
239248

240-
if (!h || pid < 0 || !stream || !service_name) {
249+
if (!stream || !ctx) {
241250
errno = EINVAL;
242251
return -1;
243252
}
244-
if (asprintf (&topic, "%s.write", service_name) < 0)
253+
if (asprintf (&topic, "%s.write", ctx->service_name) < 0)
245254
return -1;
246255
if (!(io = ioencode (stream, "0", data, len, eof))
247256
|| !(f = flux_rpc_pack (h,
248257
topic,
249-
rank,
258+
ctx->rank,
250259
FLUX_RPC_NORESPONSE,
251260
"{s:i s:O}",
252-
"pid", pid,
261+
"matchtag", ctx->matchtag,
253262
"io", io)))
254263
goto out;
255264
rc = 0;

src/common/libsubprocess/client.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,7 @@ bool subprocess_rexec_is_output (flux_future_t *f,
4040
int *len,
4141
bool *eof);
4242

43-
int subprocess_write (flux_t *h,
44-
const char *service_name,
45-
uint32_t rank,
46-
pid_t pid,
43+
int subprocess_write (flux_future_t *f,
4744
const char *stream,
4845
const char *data,
4946
int len,

src/common/libsubprocess/remote.c

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -487,14 +487,7 @@ static int send_channel_data (flux_subprocess_t *p)
487487
return -1;
488488
}
489489
}
490-
if (subprocess_write (p->h,
491-
p->service_name,
492-
p->rank,
493-
p->pid,
494-
c->name,
495-
ptr,
496-
len,
497-
c->closed) < 0) {
490+
if (subprocess_write (p->f, c->name, ptr, len, c->closed) < 0) {
498491
llog_debug (p,
499492
"error sending rexec.write request: %s",
500493
strerror (errno));

src/common/libsubprocess/subprocess.c

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -729,14 +729,7 @@ int flux_subprocess_write (flux_subprocess_t *p,
729729
}
730730
}
731731
else { /* p->state == FLUX_SUBPROCESS_RUNNING */
732-
if (subprocess_write (p->h,
733-
p->service_name,
734-
p->rank,
735-
p->pid,
736-
c->name,
737-
buf,
738-
len,
739-
false) < 0) {
732+
if (subprocess_write (p->f, c->name, buf, len, false) < 0) {
740733
log_err ("error sending rexec.write request: %s",
741734
strerror (errno));
742735
return -1;
@@ -787,14 +780,7 @@ int flux_subprocess_close (flux_subprocess_t *p, const char *stream)
787780
*/
788781
c->closed = true;
789782
if (p->state == FLUX_SUBPROCESS_RUNNING) {
790-
if (subprocess_write (p->h,
791-
p->service_name,
792-
p->rank,
793-
p->pid,
794-
c->name,
795-
NULL,
796-
0,
797-
true) < 0) {
783+
if (subprocess_write (p->f, c->name, NULL, 0, true) < 0) {
798784
log_err ("error sending rexec.write request: %s",
799785
strerror (errno));
800786
return -1;

src/common/libsubprocess/test/iostress.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ static void iostress_state_cb (flux_subprocess_t *p,
120120
}
121121
}
122122

123-
static int rexec_write (flux_t *h, pid_t pid, const char *buf, int len)
123+
static int rexec_write (flux_t *h, uint32_t matchtag, const char *buf, int len)
124124
{
125125
flux_future_t *f;
126126
json_t *io;
@@ -133,7 +133,7 @@ static int rexec_write (flux_t *h, pid_t pid, const char *buf, int len)
133133
0,
134134
FLUX_RPC_NORESPONSE,
135135
"{s:i s:O}",
136-
"pid", pid,
136+
"matchtag", matchtag,
137137
"io", io))) {
138138
json_decref (io);
139139
return -1;
@@ -150,15 +150,18 @@ static void iostress_source_cb (flux_reactor_t *r,
150150
{
151151
struct iostress_ctx *ctx = arg;
152152
char *buf;
153+
uint32_t matchtag;
153154

154155
if (!(buf = malloc (ctx->linesize)))
155156
BAIL_OUT ("out of memory");
156157
memset (buf, 'F', ctx->linesize - 1);
157158
buf[ctx->linesize - 1] = '\n';
158159

160+
matchtag = flux_rpc_get_matchtag (ctx->p->f);
161+
159162
for (int i = 0; i < ctx->batchlines; i++) {
160163
if (ctx->direct) {
161-
if (rexec_write (ctx->h, ctx->pid, buf, ctx->linesize) < 0)
164+
if (rexec_write (ctx->h, matchtag, buf, ctx->linesize) < 0)
162165
BAIL_OUT ("rexec_write failed");
163166
}
164167
else {

0 commit comments

Comments
 (0)