@@ -36,6 +36,8 @@ PG_MODULE_MAGIC;
3636#define PG_QS_MODULE_KEY 0xCA94B108
3737#define PG_QUERY_STATE_KEY 0
3838
39+ #define MIN_TIMEOUT 5000
40+
3941#define TEXT_CSTR_CMP (text , cstr ) \
4042 (memcmp(VARDATA(text), (cstr), VARSIZE(text) - VARHDRSZ))
4143
@@ -87,7 +89,8 @@ static void SendCurrentUserId(void);
8789static void SendBgWorkerPids (void );
8890static Oid GetRemoteBackendUserId (PGPROC * proc );
8991static List * GetRemoteBackendWorkers (PGPROC * proc );
90- static List * GetRemoteBackendQueryStates (List * procs ,
92+ static List * GetRemoteBackendQueryStates (PGPROC * leader ,
93+ List * pworkers ,
9194 bool verbose ,
9295 bool costs ,
9396 bool timing ,
@@ -474,7 +477,7 @@ pg_query_state(PG_FUNCTION_ARGS)
474477 FuncCallContext * funcctx ;
475478 MemoryContext oldcontext ;
476479 pg_qs_fctx * fctx ;
477- const int N_ATTRS = 5 ;
480+ #define N_ATTRS 5
478481 pid_t pid = PG_GETARG_INT32 (0 );
479482
480483 if (SRF_IS_FIRSTCALL ())
@@ -531,16 +534,24 @@ pg_query_state(PG_FUNCTION_ARGS)
531534
532535 bg_worker_procs = GetRemoteBackendWorkers (proc );
533536
534- msgs = GetRemoteBackendQueryStates (lcons (proc , bg_worker_procs ),
537+ msgs = GetRemoteBackendQueryStates (proc ,
538+ bg_worker_procs ,
535539 verbose ,
536540 costs ,
537541 timing ,
538542 buffers ,
539543 triggers ,
540544 format );
541- msg = (shm_mq_msg * ) linitial (msgs );
542545
543546 funcctx = SRF_FIRSTCALL_INIT ();
547+ if (list_length (msgs ) == 0 )
548+ {
549+ elog (WARNING , "backend does not reply" );
550+ LockRelease (& tag , ExclusiveLock , false);
551+ SRF_RETURN_DONE (funcctx );
552+ }
553+
554+ msg = (shm_mq_msg * ) linitial (msgs );
544555 switch (msg -> result_code )
545556 {
546557 case QUERY_NOT_RUNNING :
@@ -716,31 +727,30 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh,
716727 void * * datap ,
717728 long timeout )
718729{
719-
720- #ifdef HAVE_INT64_TIMESTAMP
721- #define GetNowFloat () ((float8) GetCurrentTimestamp() / 1000.0)
722- #else
723- #define GetNowFloat () 1000.0 * GetCurrentTimestamp()
724- #endif
725-
726- float8 endtime = GetNowFloat () + timeout ;
727- int rc = 0 ;
730+ int rc = 0 ;
731+ long delay = timeout ;
728732
729733 for (;;)
730734 {
731- long delay ;
735+ instr_time start_time ;
736+ instr_time cur_time ;
732737 shm_mq_result mq_receive_result ;
733738
734- mq_receive_result = shm_mq_receive ( mqh , nbytesp , datap , true );
739+ INSTR_TIME_SET_CURRENT ( start_time );
735740
741+ mq_receive_result = shm_mq_receive (mqh , nbytesp , datap , true);
736742 if (mq_receive_result != SHM_MQ_WOULD_BLOCK )
737743 return mq_receive_result ;
738-
739- if (rc & WL_TIMEOUT )
744+ if (rc & WL_TIMEOUT || delay <= 0 )
740745 return SHM_MQ_WOULD_BLOCK ;
741746
742- delay = (long ) (endtime - GetNowFloat ());
743747 rc = WaitLatch (MyLatch , WL_LATCH_SET | WL_TIMEOUT , delay );
748+
749+ INSTR_TIME_SET_CURRENT (cur_time );
750+ INSTR_TIME_SUBTRACT (cur_time , start_time );
751+
752+ delay = timeout - (long ) INSTR_TIME_GET_MILLISEC (cur_time );
753+
744754 CHECK_FOR_INTERRUPTS ();
745755 ResetLatch (MyLatch );
746756 }
@@ -844,12 +854,12 @@ GetRemoteBackendWorkers(PGPROC *proc)
844854
845855 sig_result = SendProcSignal (proc -> pid , WorkerPollReason , proc -> backendId );
846856 if (sig_result == -1 )
847- return NIL ;
857+ goto signal_error ;
848858
849859 mqh = shm_mq_attach (mq , NULL , NULL );
850- mq_receive_result = shm_mq_receive_with_timeout (mqh , & msg_len , (void * * ) & msg , 1000 );
860+ mq_receive_result = shm_mq_receive (mqh , & msg_len , (void * * ) & msg , false );
851861 if (mq_receive_result != SHM_MQ_SUCCESS )
852- return NIL ;
862+ goto mq_error ;
853863
854864 for (i = 0 ; i < msg -> number ; i ++ )
855865 {
@@ -862,6 +872,13 @@ GetRemoteBackendWorkers(PGPROC *proc)
862872 shm_mq_detach (mq );
863873
864874 return result ;
875+
876+ signal_error :
877+ ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
878+ errmsg ("invalid send signal" )));
879+ mq_error :
880+ ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
881+ errmsg ("error in message queue data transmitting" )));
865882}
866883
867884static shm_mq_msg *
@@ -874,7 +891,8 @@ copy_msg(shm_mq_msg *msg)
874891}
875892
876893static List *
877- GetRemoteBackendQueryStates (List * procs ,
894+ GetRemoteBackendQueryStates (PGPROC * leader ,
895+ List * pworkers ,
878896 bool verbose ,
879897 bool costs ,
880898 bool timing ,
@@ -885,6 +903,11 @@ GetRemoteBackendQueryStates(List *procs,
885903 List * result = NIL ;
886904 List * alive_procs = NIL ;
887905 ListCell * iter ;
906+ int sig_result ;
907+ shm_mq_handle * mqh ;
908+ shm_mq_result mq_receive_result ;
909+ shm_mq_msg * msg ;
910+ Size len ;
888911
889912 Assert (QueryStatePollReason != INVALID_PROCSIGNAL );
890913 Assert (mq );
@@ -898,14 +921,21 @@ GetRemoteBackendQueryStates(List *procs,
898921 params -> format = format ;
899922 pg_write_barrier ();
900923
924+ /* initialize message queue that will transfer query states */
925+ mq = shm_mq_create (mq , QUEUE_SIZE );
926+
901927 /*
902928 * send signal `QueryStatePollReason` to all processes and define all alive
903929 * ones
904930 */
905- foreach (iter , procs )
931+ sig_result = SendProcSignal (leader -> pid ,
932+ QueryStatePollReason ,
933+ leader -> backendId );
934+ if (sig_result == -1 )
935+ goto signal_error ;
936+ foreach (iter , pworkers )
906937 {
907938 PGPROC * proc = (PGPROC * ) lfirst (iter );
908- int sig_result ;
909939
910940 sig_result = SendProcSignal (proc -> pid ,
911941 QueryStatePollReason ,
@@ -920,16 +950,23 @@ GetRemoteBackendQueryStates(List *procs,
920950 alive_procs = lappend (alive_procs , proc );
921951 }
922952
953+ /* extract query state from leader process */
954+ shm_mq_set_sender (mq , leader );
955+ shm_mq_set_receiver (mq , MyProc );
956+ mqh = shm_mq_attach (mq , NULL , NULL );
957+ mq_receive_result = shm_mq_receive (mqh , & len , (void * * ) & msg , false);
958+ if (mq_receive_result != SHM_MQ_SUCCESS )
959+ goto mq_error ;
960+ Assert (len == msg -> length );
961+ result = lappend (result , copy_msg (msg ));
962+ shm_mq_detach (mq );
963+
923964 /*
924- * collect results from all alived processes
965+ * collect results from all alived parallel workers
925966 */
926967 foreach (iter , alive_procs )
927968 {
928969 PGPROC * proc = (PGPROC * ) lfirst (iter );
929- shm_mq_handle * mqh ;
930- shm_mq_result mq_receive_result ;
931- shm_mq_msg * msg ;
932- Size len ;
933970
934971 /* prepare message queue to transfer data */
935972 mq = shm_mq_create (mq , QUEUE_SIZE );
@@ -943,7 +980,7 @@ GetRemoteBackendQueryStates(List *procs,
943980 mq_receive_result = shm_mq_receive_with_timeout (mqh ,
944981 & len ,
945982 (void * * ) & msg ,
946- 5000 );
983+ MIN_TIMEOUT );
947984 if (mq_receive_result != SHM_MQ_SUCCESS )
948985 /* counterpart is died, not consider it */
949986 continue ;
@@ -961,4 +998,7 @@ GetRemoteBackendQueryStates(List *procs,
961998signal_error :
962999 ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
9631000 errmsg ("invalid send signal" )));
1001+ mq_error :
1002+ ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
1003+ errmsg ("error in message queue data transmitting" )));
9641004}
0 commit comments