@@ -107,6 +107,30 @@ static flux_subprocess_t *proc_find_bypid (subprocess_server_t *s, pid_t pid)
107107 return NULL ;
108108}
109109
110+ /* Find a <service>.exec message with the same sender as msg and matchtag as
111+ * specified in the request matchtag field.
112+ * N.B. flux_cancel_match() happens to be helpful because RFC 42 subprocess
113+ * write works like RFC 6 cancel.
114+ */
115+ static flux_subprocess_t * proc_find_byclient (subprocess_server_t * s ,
116+ const flux_msg_t * request )
117+ {
118+ flux_subprocess_t * p ;
119+
120+ p = zlistx_first (s -> subprocesses );
121+ while (p ) {
122+ const flux_msg_t * msg ;
123+
124+ if ((msg = flux_subprocess_aux_get (p , msgkey ))
125+ && flux_cancel_match (request , msg ))
126+ return p ;
127+ p = zlistx_next (s -> subprocesses );
128+ }
129+ errno = ESRCH ;
130+ return NULL ;
131+ }
132+
133+
110134static void proc_completion_cb (flux_subprocess_t * p )
111135{
112136 subprocess_server_t * s = flux_subprocess_aux_get (p , srvkey );
@@ -390,14 +414,14 @@ static void server_write_cb (flux_t *h,
390414 char * data = NULL ;
391415 int len = 0 ;
392416 bool eof = false;
393- pid_t pid ;
417+ int matchtag ;
394418 json_t * io = NULL ;
395419 flux_error_t error ;
396420
397421 if (flux_request_unpack (msg ,
398422 NULL ,
399423 "{ s:i s:o }" ,
400- "pid " , & pid ,
424+ "matchtag " , & matchtag ,
401425 "io" , & io ) < 0
402426 || iodecode (io , & stream , NULL , & data , & len , & eof ) < 0 ) {
403427 llog_error (s ,
@@ -415,27 +439,23 @@ static void server_write_cb (flux_t *h,
415439 * in flight, and is not necessarily an error, and can be common enough
416440 * that the log messages end up being a nuisance.
417441 */
418- if (!(p = proc_find_bypid (s , pid ))
442+ if (!(p = proc_find_byclient (s , msg ))
419443 || p -> state != FLUX_SUBPROCESS_RUNNING )
420444 goto out ;
421445
422446 if (data && len ) {
423447 int rc = flux_subprocess_write (p , stream , data , len );
424448 if (rc < 0 ) {
425449 llog_error (s ,
426- "Error writing %d bytes to subprocess pid %d %s" ,
450+ "Error writing %d bytes to subprocess %s" ,
427451 len ,
428- (int )pid ,
429452 stream );
430453 goto error ;
431454 }
432455 }
433456 if (eof ) {
434457 if (flux_subprocess_close (p , stream ) < 0 ) {
435- llog_error (s ,
436- "Error writing EOF to subprocess pid %d %s" ,
437- (int )pid ,
438- stream );
458+ llog_error (s , "Error writing EOF to subprocess %s" , stream );
439459 goto error ;
440460 }
441461 }
0 commit comments