@@ -1065,7 +1065,8 @@ GetRemoteBackendQueryStates(List *procs,
10651065 ExplainFormat format )
10661066{
10671067 List * result = NIL ;
1068- ListCell * i ;
1068+ List * alive_procs = NIL ;
1069+ ListCell * iter ;
10691070
10701071 Assert (QueryStatePollReason != INVALID_PROCSIGNAL );
10711072 Assert (mq );
@@ -1079,41 +1080,67 @@ GetRemoteBackendQueryStates(List *procs,
10791080 params -> format = format ;
10801081 pg_write_barrier ();
10811082
1082- foreach (i , procs )
1083+ /*
1084+ * send signal `QueryStatePollReason` to all processes and define all alive
1085+ * ones
1086+ */
1087+ foreach (iter , procs )
10831088 {
1084- PGPROC * proc = (PGPROC * ) lfirst (i );
1085- shm_mq_msg * msg ;
1089+ PGPROC * proc = (PGPROC * ) lfirst (iter );
1090+ int sig_result ;
1091+
1092+ sig_result = SendProcSignal (proc -> pid ,
1093+ QueryStatePollReason ,
1094+ proc -> backendId );
1095+ if (sig_result == -1 )
1096+ {
1097+ if (errno != ESRCH )
1098+ goto signal_error ;
1099+ continue ;
1100+ }
1101+
1102+ alive_procs = lappend (alive_procs , proc );
1103+ }
1104+
1105+ /*
1106+ * collect results from all alived processes
1107+ */
1108+ foreach (iter , alive_procs )
1109+ {
1110+ PGPROC * proc = (PGPROC * ) lfirst (iter );
10861111 shm_mq_handle * mqh ;
10871112 shm_mq_result mq_receive_result ;
1088- int sig_result ;
1113+ shm_mq_msg * msg ;
10891114 Size len ;
10901115
1091- Assert (proc && proc -> backendId != InvalidBackendId );
1092-
10931116 /* prepare message queue to transfer data */
10941117 mq = shm_mq_create (mq , QUEUE_SIZE );
10951118 shm_mq_set_sender (mq , proc );
1096- shm_mq_set_receiver (mq , MyProc );
1097-
1098- /* send signal to specified backend to extract its state */
1099- sig_result = SendProcSignal (proc -> pid , QueryStatePollReason , proc -> backendId );
1100- if (sig_result == -1 )
1101- ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
1102- errmsg ("invalid send signal" )));
1119+ shm_mq_set_receiver (mq , MyProc ); /* this function notifies the
1120+ counterpart to come into data
1121+ transfer */
11031122
1104- /* retrieve data from message queue */
1123+ /* retrieve result data from message queue */
11051124 mqh = shm_mq_attach (mq , NULL , NULL );
1106- mq_receive_result = shm_mq_receive_with_timeout (mqh , & len , (void * * ) & msg , 5000 );
1125+ mq_receive_result = shm_mq_receive_with_timeout (mqh ,
1126+ & len ,
1127+ (void * * ) & msg ,
1128+ 5000 );
11071129 if (mq_receive_result != SHM_MQ_SUCCESS )
1108- ereport ( ERROR , ( errcode ( ERRCODE_INTERNAL_ERROR ),
1109- errmsg ( "invalid read from message queue" ))) ;
1130+ /* counterpart is died, not consider it */
1131+ continue ;
11101132
1133+ Assert (len == msg -> length );
1134+
1135+ /* aggregate result data */
11111136 result = lappend (result , copy_msg (msg ));
11121137
11131138 shm_mq_detach (mq );
1114-
1115- Assert (len == msg -> length );
11161139 }
11171140
11181141 return result ;
1142+
1143+ signal_error :
1144+ ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
1145+ errmsg ("invalid send signal" )));
11191146}
0 commit comments