@@ -61,7 +61,7 @@ static void qs_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction,
61
61
static void qs_ExecutorFinish (QueryDesc * queryDesc );
62
62
63
63
static shm_mq_result receive_msg_by_parts (shm_mq_handle * mqh , Size * total ,
64
- void * * datap , bool nowait );
64
+ void * * datap , int64 timeout , int * rc , bool nowait );
65
65
66
66
/* Global variables */
67
67
List * QueryDescStack = NIL ;
@@ -780,7 +780,7 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh,
780
780
{
781
781
shm_mq_result mq_receive_result ;
782
782
783
- mq_receive_result = receive_msg_by_parts (mqh , nbytesp , datap , true);
783
+ mq_receive_result = receive_msg_by_parts (mqh , nbytesp , datap , timeout , & rc , true);
784
784
if (mq_receive_result != SHM_MQ_WOULD_BLOCK )
785
785
return mq_receive_result ;
786
786
if (rc & WL_TIMEOUT || delay <= 0 )
@@ -967,33 +967,61 @@ copy_msg(shm_mq_msg *msg)
967
967
968
968
static shm_mq_result
969
969
receive_msg_by_parts (shm_mq_handle * mqh , Size * total , void * * datap ,
970
- bool nowait )
970
+ int64 timeout , int * rc , bool nowait )
971
971
{
972
972
shm_mq_result mq_receive_result ;
973
973
shm_mq_msg * buff ;
974
974
int offset ;
975
- Size * expected ;
976
- Size expected_data ;
975
+ Size * expected ;
976
+ Size expected_data ;
977
977
Size len ;
978
978
979
979
/* Get the expected number of bytes in message */
980
980
mq_receive_result = shm_mq_receive (mqh , & len , (void * * ) & expected , nowait );
981
- expected_data = * expected ;
982
981
if (mq_receive_result != SHM_MQ_SUCCESS )
983
982
return mq_receive_result ;
984
983
Assert (len == sizeof (Size ));
985
984
985
+ expected_data = * expected ;
986
986
* datap = palloc0 (expected_data );
987
987
988
988
/* Get the message itself */
989
989
for (offset = 0 ; offset < expected_data ; )
990
990
{
991
+ int64 delay = timeout ;
991
992
/* Keep receiving new messages until we assemble the full message */
992
- mq_receive_result = shm_mq_receive (mqh , & len , ((void * * ) & buff ), nowait );
993
+ for (;;)
994
+ {
995
+ mq_receive_result = shm_mq_receive (mqh , & len , ((void * * ) & buff ), nowait );
996
+ if (mq_receive_result != SHM_MQ_SUCCESS )
997
+ {
998
+ if (nowait && mq_receive_result == SHM_MQ_WOULD_BLOCK )
999
+ {
1000
+ /*
1001
+ * We can't leave this function during reading parts with
1002
+ * error code SHM_MQ_WOULD_BLOCK because can be be error
1003
+ * at next call receive_msg_by_parts() with continuing
1004
+ * reading non-readed parts.
1005
+ * So we should wait whole MAX_RCV_TIMEOUT timeout and
1006
+ * return error after that only.
1007
+ */
1008
+ if (delay > 0 )
1009
+ {
1010
+ pg_usleep (PART_RCV_DELAY * 1000 );
1011
+ delay -= PART_RCV_DELAY ;
1012
+ continue ;
1013
+ }
1014
+ if (rc )
1015
+ { /* Mark that the timeout has expired: */
1016
+ * rc |= WL_TIMEOUT ;
1017
+ }
1018
+ }
1019
+ return mq_receive_result ;
1020
+ }
1021
+ break ;
1022
+ }
993
1023
memcpy ((char * ) * datap + offset , buff , len );
994
1024
offset += len ;
995
- if (mq_receive_result != SHM_MQ_SUCCESS )
996
- return mq_receive_result ;
997
1025
}
998
1026
999
1027
* total = offset ;
@@ -1074,7 +1102,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
1074
1102
mqh = shm_mq_attach (mq , NULL , NULL );
1075
1103
elog (DEBUG1 , "Wait response from leader %d" , leader -> pid );
1076
1104
mq_receive_result = receive_msg_by_parts (mqh , & len , (void * * ) & msg ,
1077
- false);
1105
+ 0 , NULL , false);
1078
1106
if (mq_receive_result != SHM_MQ_SUCCESS )
1079
1107
goto mq_error ;
1080
1108
if (msg -> reqid != reqid )
0 commit comments