@@ -139,6 +139,25 @@ static const flux_msg_t *lookup_message_byaux (struct flux_msglist *msglist,
139139 return NULL ;
140140}
141141
142+ /* Find an sdexec.exec message with the same sender as msg and matchtag as
143+ * specified in the msg matchtag field.
144+ * N.B. flux_cancel_match() happens to be helpful because RFC 42 subprocess
145+ * write works like RFC 6 cancel.
146+ */
147+ static const flux_msg_t * lookup_message_byclient (struct flux_msglist * msglist ,
148+ const flux_msg_t * msg )
149+ {
150+ const flux_msg_t * m ;
151+
152+ m = flux_msglist_first (msglist );
153+ while (m ) {
154+ if (flux_cancel_match (msg , m ))
155+ return m ;
156+ m = flux_msglist_next (msglist );
157+ }
158+ return NULL ;
159+ }
160+
142161static void exec_respond_error (struct sdproc * proc ,
143162 int errnum ,
144163 const char * errstr )
@@ -713,7 +732,7 @@ static void write_cb (flux_t *h,
713732 void * arg )
714733{
715734 struct sdexec_ctx * ctx = arg ;
716- pid_t pid ;
735+ int matchtag ;
717736 json_t * io ;
718737 const flux_msg_t * exec_request ;
719738 flux_error_t error ;
@@ -723,7 +742,7 @@ static void write_cb (flux_t *h,
723742 if (flux_request_unpack (msg ,
724743 NULL ,
725744 "{s:i s:o}" ,
726- "pid " , & pid ,
745+ "matchtag " , & matchtag ,
727746 "io" , & io ) < 0 ) {
728747 flux_log_error (h , "error decoding write request" );
729748 return ;
@@ -736,22 +755,18 @@ static void write_cb (flux_t *h,
736755 flux_log_error (h , "%s" , error .text );
737756 return ;
738757 }
739- if (!(exec_request = lookup_message_bypid (ctx -> requests , pid ))
758+ if (!(exec_request = lookup_message_byclient (ctx -> requests , msg ))
740759 || !(proc = flux_msg_aux_get (exec_request , "sdproc" ))) {
741- flux_log (h , LOG_ERR , "write pid=%d: not found" , pid );
760+ flux_log (h , LOG_ERR , "sdexec. write: subprocess no longer exists" );
742761 return ;
743762 }
744763 if (iodecode (io , & stream , NULL , NULL , NULL , NULL ) == 0
745764 && !streq (stream , "stdin" )) {
746- flux_log (h ,
747- LOG_ERR ,
748- "write pid=%d stream=%s: invalid stream" ,
749- pid ,
750- stream );
765+ flux_log (h , LOG_ERR , "sdexec.write: %s is an invalid stream" , stream );
751766 return ;
752767 }
753768 if (sdexec_channel_write (proc -> in , io ) < 0 ) {
754- flux_log_error (h , "write pid=%d " , pid );
769+ flux_log_error (h , "sdexec. write %s " , stream );
755770 return ;
756771 }
757772}
0 commit comments