@@ -110,14 +110,13 @@ static void SendCurrentUserId(void);
110110static void SendBgWorkerPids (void );
111111static Oid GetRemoteBackendUserId (PGPROC * proc );
112112static List * GetRemoteBackendWorkers (PGPROC * proc );
113- static shm_mq_msg * GetRemoteBackendQueryState (PGPROC * proc ,
114- List * parallel_workers ,
115- bool verbose ,
116- bool costs ,
117- bool timing ,
118- bool buffers ,
119- bool triggers ,
120- ExplainFormat format );
113+ static List * GetRemoteBackendQueryStates (List * procs ,
114+ bool verbose ,
115+ bool costs ,
116+ bool timing ,
117+ bool buffers ,
118+ bool triggers ,
119+ ExplainFormat format );
121120
122121/* Shared memory variables */
123122shm_toc * toc = NULL ;
@@ -563,12 +562,19 @@ PG_FUNCTION_INFO_V1(pg_query_state);
563562Datum
564563pg_query_state (PG_FUNCTION_ARGS )
565564{
566- /* multicall context type */
567565 typedef struct
568566 {
569- ListCell * cursor ;
570- int index ;
567+ PGPROC * proc ;
568+ ListCell * frame_cursor ;
569+ int frame_index ;
571570 List * stack ;
571+ } proc_state ;
572+
573+ /* multicall context type */
574+ typedef struct
575+ {
576+ ListCell * proc_cursor ;
577+ List * procs ;
572578 } pg_qs_fctx ;
573579
574580 FuncCallContext * funcctx ;
@@ -591,6 +597,7 @@ pg_query_state(PG_FUNCTION_ARGS)
591597 Oid counterpart_user_id ;
592598 shm_mq_msg * msg ;
593599 List * bg_worker_procs = NIL ;
600+ List * msgs ;
594601
595602 if (!module_initialized )
596603 ereport (ERROR , (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
@@ -630,14 +637,14 @@ pg_query_state(PG_FUNCTION_ARGS)
630637
631638 bg_worker_procs = GetRemoteBackendWorkers (proc );
632639
633- msg = GetRemoteBackendQueryState ( proc ,
634- bg_worker_procs ,
635- verbose ,
636- costs ,
637- timing ,
638- buffers ,
639- triggers ,
640- format );
640+ msgs = GetRemoteBackendQueryStates ( lcons ( proc , bg_worker_procs ) ,
641+ verbose ,
642+ costs ,
643+ timing ,
644+ buffers ,
645+ triggers ,
646+ format );
647+ msg = ( shm_mq_msg * ) linitial ( msgs );
641648
642649 funcctx = SRF_FIRSTCALL_INIT ();
643650 switch (msg -> result_code )
@@ -661,8 +668,9 @@ pg_query_state(PG_FUNCTION_ARGS)
661668 SRF_RETURN_DONE (funcctx );
662669 case QS_RETURNED :
663670 {
664- List * qs_stack ;
665671 TupleDesc tupdesc ;
672+ ListCell * i ;
673+ int64 max_calls = 0 ;
666674
667675 /* print warnings if exist */
668676 if (msg -> warnings & TIMINIG_OFF_WARNING )
@@ -676,13 +684,28 @@ pg_query_state(PG_FUNCTION_ARGS)
676684
677685 /* save stack of calls and current cursor in multicall context */
678686 fctx = (pg_qs_fctx * ) palloc (sizeof (pg_qs_fctx ));
679- qs_stack = deserialize_stack (msg -> stack , msg -> stack_depth );
680- fctx -> stack = qs_stack ;
681- fctx -> index = 0 ;
682- fctx -> cursor = list_head (qs_stack );
687+ fctx -> procs = NIL ;
688+ foreach (i , msgs )
689+ {
690+ List * qs_stack ;
691+ shm_mq_msg * msg = (shm_mq_msg * ) lfirst (i );
692+ proc_state * p_state = (proc_state * ) palloc (sizeof (proc_state ));
693+
694+ qs_stack = deserialize_stack (msg -> stack , msg -> stack_depth );
695+
696+ p_state -> proc = msg -> proc ;
697+ p_state -> stack = qs_stack ;
698+ p_state -> frame_index = 0 ;
699+ p_state -> frame_cursor = list_head (qs_stack );
700+
701+ fctx -> procs = lappend (fctx -> procs , p_state );
702+
703+ max_calls += list_length (qs_stack );
704+ }
705+ fctx -> proc_cursor = list_head (fctx -> procs );
683706
684707 funcctx -> user_fctx = fctx ;
685- funcctx -> max_calls = list_length ( qs_stack ) ;
708+ funcctx -> max_calls = max_calls ;
686709
687710 /* Make tuple descriptor */
688711 tupdesc = CreateTemplateTupleDesc (N_ATTRS , false);
@@ -706,24 +729,31 @@ pg_query_state(PG_FUNCTION_ARGS)
706729
707730 if (funcctx -> call_cntr < funcctx -> max_calls )
708731 {
709- HeapTuple tuple ;
710- Datum values [N_ATTRS ];
711- bool nulls [N_ATTRS ];
712- stack_frame * frame = (stack_frame * ) lfirst (fctx -> cursor );
732+ HeapTuple tuple ;
733+ Datum values [N_ATTRS ];
734+ bool nulls [N_ATTRS ];
735+ proc_state * p_state = (proc_state * ) lfirst (fctx -> proc_cursor );
736+ stack_frame * frame = (stack_frame * ) lfirst (p_state -> frame_cursor );
713737
714738 /* Make and return next tuple to caller */
715739 MemSet (values , 0 , sizeof (values ));
716740 MemSet (nulls , 0 , sizeof (nulls ));
717- values [0 ] = Int32GetDatum (pid );
718- values [1 ] = Int32GetDatum (fctx -> index );
741+ values [0 ] = Int32GetDatum (p_state -> proc -> pid );
742+ values [1 ] = Int32GetDatum (p_state -> frame_index );
719743 values [2 ] = PointerGetDatum (frame -> query );
720744 values [3 ] = PointerGetDatum (frame -> plan );
721- nulls [4 ] = true;
745+ if (p_state -> proc -> pid == pid )
746+ nulls [4 ] = true;
747+ else
748+ values [4 ] = Int32GetDatum (pid );
722749 tuple = heap_form_tuple (funcctx -> tuple_desc , values , nulls );
723750
724751 /* increment cursor */
725- fctx -> cursor = lnext (fctx -> cursor );
726- fctx -> index ++ ;
752+ p_state -> frame_cursor = lnext (p_state -> frame_cursor );
753+ p_state -> frame_index ++ ;
754+
755+ if (p_state -> frame_cursor == NULL )
756+ fctx -> proc_cursor = lnext (fctx -> proc_cursor );
727757
728758 SRF_RETURN_NEXT (funcctx , HeapTupleGetDatum (tuple ));
729759 }
@@ -1017,22 +1047,26 @@ GetRemoteBackendWorkers(PGPROC *proc)
10171047}
10181048
10191049static shm_mq_msg *
1020- GetRemoteBackendQueryState (PGPROC * proc ,
1021- List * parallel_workers ,
1022- bool verbose ,
1023- bool costs ,
1024- bool timing ,
1025- bool buffers ,
1026- bool triggers ,
1027- ExplainFormat format )
1050+ copy_msg (shm_mq_msg * msg )
10281051{
1029- shm_mq_msg * msg ;
1030- shm_mq_handle * mqh ;
1031- shm_mq_result mq_receive_result ;
1032- int sig_result ;
1033- Size len ;
1052+ shm_mq_msg * result = palloc (msg -> length );
1053+
1054+ memcpy (result , msg , msg -> length );
1055+ return result ;
1056+ }
1057+
1058+ static List *
1059+ GetRemoteBackendQueryStates (List * procs ,
1060+ bool verbose ,
1061+ bool costs ,
1062+ bool timing ,
1063+ bool buffers ,
1064+ bool triggers ,
1065+ ExplainFormat format )
1066+ {
1067+ List * result = NIL ;
1068+ ListCell * i ;
10341069
1035- Assert (proc && proc -> backendId != InvalidBackendId );
10361070 Assert (QueryStatePollReason != INVALID_PROCSIGNAL );
10371071 Assert (mq );
10381072
@@ -1045,26 +1079,41 @@ GetRemoteBackendQueryState(PGPROC *proc,
10451079 params -> format = format ;
10461080 pg_write_barrier ();
10471081
1048- /* prepare message queue to transfer data */
1049- mq = shm_mq_create (mq , QUEUE_SIZE );
1050- shm_mq_set_sender (mq , proc );
1051- shm_mq_set_receiver (mq , MyProc );
1082+ foreach (i , procs )
1083+ {
1084+ PGPROC * proc = (PGPROC * ) lfirst (i );
1085+ shm_mq_msg * msg ;
1086+ shm_mq_handle * mqh ;
1087+ shm_mq_result mq_receive_result ;
1088+ int sig_result ;
1089+ Size len ;
10521090
1053- /* send signal to specified backend to extract its state */
1054- sig_result = SendProcSignal (proc -> pid , QueryStatePollReason , proc -> backendId );
1055- if (sig_result == -1 )
1056- ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
1057- errmsg ("invalid send signal" )));
1091+ Assert (proc && proc -> backendId != InvalidBackendId );
10581092
1059- /* retrieve data from message queue */
1060- mqh = shm_mq_attach (mq , NULL , NULL );
1061- mq_receive_result = shm_mq_receive_with_timeout (mqh , & len , (void * * ) & msg , 5000 );
1062- if (mq_receive_result != SHM_MQ_SUCCESS )
1063- ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
1064- errmsg ("invalid read from message queue" )));
1065- shm_mq_detach (mq );
1093+ /* prepare message queue to transfer data */
1094+ mq = shm_mq_create (mq , QUEUE_SIZE );
1095+ shm_mq_set_sender (mq , proc );
1096+ shm_mq_set_receiver (mq , MyProc );
1097+
1098+ /* send signal to specified backend to extract its state */
1099+ sig_result = SendProcSignal (proc -> pid , QueryStatePollReason , proc -> backendId );
1100+ if (sig_result == -1 )
1101+ ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
1102+ errmsg ("invalid send signal" )));
1103+
1104+ /* retrieve data from message queue */
1105+ mqh = shm_mq_attach (mq , NULL , NULL );
1106+ mq_receive_result = shm_mq_receive_with_timeout (mqh , & len , (void * * ) & msg , 5000 );
1107+ if (mq_receive_result != SHM_MQ_SUCCESS )
1108+ ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
1109+ errmsg ("invalid read from message queue" )));
10661110
1067- Assert ( len == msg -> length );
1111+ result = lappend ( result , copy_msg ( msg ) );
10681112
1069- return msg ;
1113+ shm_mq_detach (mq );
1114+
1115+ Assert (len == msg -> length );
1116+ }
1117+
1118+ return result ;
10701119}
0 commit comments