@@ -66,9 +66,9 @@ static void qs_postExecProcNode(PlanState *planstate, TupleTableSlot *result);
6666
6767/* Global variables */
6868List * QueryDescStack = NIL ;
69- static ProcSignalReason UserIdPollReason ;
70- static ProcSignalReason QueryStatePollReason ;
71- static ProcSignalReason WorkerPollReason ;
69+ static ProcSignalReason UserIdPollReason = INVALID_PROCSIGNAL ;
70+ static ProcSignalReason QueryStatePollReason = INVALID_PROCSIGNAL ;
71+ static ProcSignalReason WorkerPollReason = INVALID_PROCSIGNAL ;
7272static bool module_initialized = false;
7373static const char * be_state_str [] = { /* BackendState -> string repr */
7474 "undefined" , /* STATE_UNDEFINED */
@@ -107,9 +107,9 @@ typedef struct
107107} trace_request ;
108108
109109static void SendCurrentUserId (void );
110- static void SendWorkerPids (void );
110+ static void SendBgWorkerPids (void );
111111static Oid GetRemoteBackendUserId (PGPROC * proc );
112- static List * GetRemoteBackendWorkers (PGPROC * proc , int * error_code );
112+ static List * GetRemoteBackendWorkers (PGPROC * proc );
113113
114114/* Shared memory variables */
115115shm_toc * toc = NULL ;
@@ -208,7 +208,7 @@ _PG_init(void)
208208 /* Register interrupt on custom signal of polling query state */
209209 UserIdPollReason = RegisterCustomProcSignalHandler (SendCurrentUserId );
210210 QueryStatePollReason = RegisterCustomProcSignalHandler (SendQueryState );
211- WorkerPollReason = RegisterCustomProcSignalHandler (SendWorkerPids );
211+ WorkerPollReason = RegisterCustomProcSignalHandler (SendBgWorkerPids );
212212 if (QueryStatePollReason == INVALID_PROCSIGNAL
213213 || WorkerPollReason == INVALID_PROCSIGNAL
214214 || UserIdPollReason == INVALID_PROCSIGNAL )
@@ -571,21 +571,22 @@ pg_query_state(PG_FUNCTION_ARGS)
571571
572572 if (SRF_IS_FIRSTCALL ())
573573 {
574- LOCKTAG tag ;
575- bool verbose = PG_GETARG_BOOL (1 ),
576- costs = PG_GETARG_BOOL (2 ),
577- timing = PG_GETARG_BOOL (3 ),
578- buffers = PG_GETARG_BOOL (4 ),
579- triggers = PG_GETARG_BOOL (5 );
574+ LOCKTAG tag ;
575+ bool verbose = PG_GETARG_BOOL (1 ),
576+ costs = PG_GETARG_BOOL (2 ),
577+ timing = PG_GETARG_BOOL (3 ),
578+ buffers = PG_GETARG_BOOL (4 ),
579+ triggers = PG_GETARG_BOOL (5 );
580580 text * format_text = PG_GETARG_TEXT_P (6 );
581- ExplainFormat format ;
581+ ExplainFormat format ;
582582 PGPROC * proc ;
583- Oid counterpart_user_id ;
583+ Oid counterpart_user_id ;
584584 shm_mq_handle * mqh ;
585- shm_mq_result mq_receive_result ;
586- int send_signal_result ;
587- Size len ;
585+ shm_mq_result mq_receive_result ;
586+ int send_signal_result ;
587+ Size len ;
588588 shm_mq_msg * msg ;
589+ List * bg_worker_pids = NIL ;
589590
590591 if (!module_initialized )
591592 ereport (ERROR , (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
@@ -631,6 +632,8 @@ pg_query_state(PG_FUNCTION_ARGS)
631632 params -> triggers = triggers ;
632633 params -> format = format ;
633634
635+ bg_worker_pids = GetRemoteBackendWorkers (proc );
636+
634637 /* prepare message queue to transfer data */
635638 mq = shm_mq_create (mq , QUEUE_SIZE );
636639 shm_mq_set_sender (mq , proc );
@@ -843,8 +846,13 @@ GetRemoteBackendUserId(PGPROC *proc)
843846{
844847 Oid result ;
845848
849+ Assert (proc && proc -> backendId != InvalidBackendId );
850+ Assert (UserIdPollReason != INVALID_PROCSIGNAL );
851+ Assert (counterpart_userid );
852+
846853 counterpart_userid -> userid = InvalidOid ;
847854 counterpart_userid -> caller = MyLatch ;
855+ pg_write_barrier ();
848856
849857 SendProcSignal (proc -> pid , UserIdPollReason , proc -> backendId );
850858 for (;;)
@@ -864,8 +872,54 @@ GetRemoteBackendUserId(PGPROC *proc)
864872 return result ;
865873}
866874
875+ /*
876+ * Receive a message from a shared message queue until timeout is exceeded.
877+ *
878+ * Parameter `*nbytes` is set to the message length and *data to point to the
879+ * message payload. If timeout is exceeded SHM_MQ_WOULD_BLOCK is returned.
880+ */
881+ static shm_mq_result
882+ shm_mq_receive_with_timeout (shm_mq_handle * mqh ,
883+ Size * nbytesp ,
884+ void * * datap ,
885+ long timeout )
886+ {
887+
888+ #ifdef HAVE_INT64_TIMESTAMP
889+ #define GetNowFloat () ((float8) GetCurrentTimestamp() / 1000.0)
890+ #else
891+ #define GetNowFloat () 1000.0 * GetCurrentTimestamp()
892+ #endif
893+
894+ float8 endtime = GetNowFloat () + timeout ;
895+ int rc = 0 ;
896+
897+ for (;;)
898+ {
899+ long delay ;
900+ shm_mq_result mq_receive_result ;
901+
902+ mq_receive_result = shm_mq_receive (mqh , nbytesp , datap , true);
903+
904+ if (mq_receive_result != SHM_MQ_WOULD_BLOCK )
905+ return mq_receive_result ;
906+
907+ if (rc & WL_TIMEOUT )
908+ return SHM_MQ_WOULD_BLOCK ;
909+
910+ delay = (long ) (endtime - GetNowFloat ());
911+ rc = WaitLatch (MyLatch , WL_LATCH_SET | WL_TIMEOUT , delay );
912+ CHECK_FOR_INTERRUPTS ();
913+ ResetLatch (MyLatch );
914+ }
915+ }
916+
917+ /*
918+ * Extract to *result pids of all parallel workers running from leader process
919+ * that executes plan tree whose state root is `node`.
920+ */
867921static bool
868- extract_worker_handles (PlanState * node , List * * result )
922+ extract_running_bgworkers (PlanState * node , List * * result )
869923{
870924 if (node == NULL )
871925 return false;
@@ -879,10 +933,11 @@ extract_worker_handles(PlanState *node, List **result)
879933 {
880934 for (i = 0 ; i < gather_node -> pei -> pcxt -> nworkers_launched ; i ++ )
881935 {
882- pid_t pid ;
883- BackgroundWorkerHandle * bgwh = gather_node -> pei -> pcxt -> worker [ i ]. bgwhandle ;
884- BgwHandleStatus status ;
936+ pid_t pid ;
937+ BackgroundWorkerHandle * bgwh ;
938+ BgwHandleStatus status ;
885939
940+ bgwh = gather_node -> pei -> pcxt -> worker [i ].bgwhandle ;
886941 if (!bgwh )
887942 continue ;
888943
@@ -892,37 +947,40 @@ extract_worker_handles(PlanState *node, List **result)
892947 }
893948 }
894949 }
895- return planstate_tree_walker (node , extract_worker_handles , (void * ) result );
950+ return planstate_tree_walker (node , extract_running_bgworkers , (void * ) result );
896951}
897952
898953typedef struct
899954{
900- int num ;
901- pid_t pids [FLEXIBLE_ARRAY_MEMBER ];
902- } workers_msg ;
955+ int number ;
956+ pid_t pids [FLEXIBLE_ARRAY_MEMBER ];
957+ } BgWorkerPids ;
903958
904959static void
905- SendWorkerPids (void )
960+ SendBgWorkerPids (void )
906961{
907- ListCell * iter ;
908- List * all_workers = NIL ;
909- workers_msg * msg ;
910- int msg_len ;
911- int i ;
912- shm_mq_handle * mqh = shm_mq_attach (mq , NULL , NULL );
962+ ListCell * iter ;
963+ List * all_workers = NIL ;
964+ BgWorkerPids * msg ;
965+ int msg_len ;
966+ int i ;
967+ shm_mq_handle * mqh ;
968+
969+ mqh = shm_mq_attach (mq , NULL , NULL );
913970
914971 foreach (iter , QueryDescStack )
915972 {
916973 QueryDesc * curQueryDesc = (QueryDesc * ) lfirst (iter );
917974 List * bgworker_pids = NIL ;
918975
919- extract_worker_handles (curQueryDesc -> planstate , & bgworker_pids );
976+ extract_running_bgworkers (curQueryDesc -> planstate , & bgworker_pids );
920977 all_workers = list_concat (all_workers , bgworker_pids );
921978 }
922979
923- msg_len = offsetof(workers_msg , pids ) + sizeof (pid_t ) * list_length (all_workers );
980+ msg_len = offsetof(BgWorkerPids , pids )
981+ + sizeof (pid_t ) * list_length (all_workers );
924982 msg = palloc (msg_len );
925- msg -> num = list_length (all_workers );
983+ msg -> number = list_length (all_workers );
926984 i = 0 ;
927985 foreach (iter , all_workers )
928986 msg -> pids [i ++ ] = lfirst_int (iter );
@@ -931,44 +989,40 @@ SendWorkerPids(void)
931989}
932990
933991/*
992+ * Extracts all parallel worker pids running by process `proc`
993+ */
934994List *
935- GetRemoteBackendWorkers(PGPROC *proc, int *error_code )
995+ GetRemoteBackendWorkers (PGPROC * proc )
936996{
937- int sig_result;
997+ int sig_result ;
938998 shm_mq_handle * mqh ;
939- shm_mq_result mq_receive_result;
940- workers_msg *msg;
941- Size msg_len;
942- int i;
999+ shm_mq_result mq_receive_result ;
1000+ BgWorkerPids * msg ;
1001+ Size msg_len ;
1002+ int i ;
9431003 List * result = NIL ;
9441004
945- if (proc->backendId == InvalidBackendId)
946- {
947- return NIL;
948- }
1005+ Assert (proc && proc -> backendId != InvalidBackendId );
1006+ Assert (WorkerPollReason != INVALID_PROCSIGNAL );
1007+ Assert (mq );
9491008
9501009 mq = shm_mq_create (mq , QUEUE_SIZE );
9511010 shm_mq_set_sender (mq , proc );
9521011 shm_mq_set_receiver (mq , MyProc );
9531012
9541013 sig_result = SendProcSignal (proc -> pid , WorkerPollReason , proc -> backendId );
9551014 if (sig_result == -1 )
956- {
9571015 return NIL ;
958- }
9591016
9601017 mqh = shm_mq_attach (mq , NULL , NULL );
9611018 mq_receive_result = shm_mq_receive_with_timeout (mqh , & msg_len , (void * * ) & msg , 1000 );
9621019 if (mq_receive_result != SHM_MQ_SUCCESS )
963- {
9641020 return NIL ;
965- }
9661021
967- for (i = 0; i < msg->num ; i++)
1022+ for (i = 0 ; i < msg -> number ; i ++ )
9681023 result = lcons_int (msg -> pids [i ], result );
9691024
9701025 shm_mq_detach (mq );
9711026
9721027 return result ;
9731028}
974- */
0 commit comments