@@ -6216,35 +6216,20 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) ->
62166216 end ,
62176217 flush (receiver_attached ),
62186218
6219- {_GenStatemStateSession , StateSession } = formatted_state (Session ),
6220- Socket = case web_amqp (Config ) of
6221- true ->
6222- #{socket := {ws , GunPid , _GunStreamRef }} = StateSession ,
6223- {_GenStatemStateGun , StateGun } = formatted_state (GunPid ),
6224- % % https://github.com/ninenines/gun/blob/2.1.0/src/gun.erl#L315
6225- element (12 , StateGun );
6226- false ->
6227- #{socket := {tcp , Sock }} = StateSession ,
6228- Sock
6229- end ,
6230- ? assert (is_port (Socket )),
6219+ {_GenStatemState ,
6220+ #{reader := ReaderPid ,
6221+ socket := {tcp , Socket }}} = formatted_state (Session ),
62316222
6232- % % Provoke TCP back-pressure from client to server by:
6233- % % 1. using very small buffers
6223+ % % Provoke TCP back-pressure from client to server by using very small buffers.
62346224 ok = inet :setopts (Socket , [{recbuf , 256 },
62356225 {buffer , 256 }]),
6236- % % 2. stopping reading from the socket
6237- Mod = inet ,
6238- ok = meck :new (Mod , [unstick , no_link , passthrough ]),
6239- ok = meck :expect (Mod , setopts , fun (_Sock , [{active , once }]) ->
6240- ok ;
6241- (Sock , Opts ) ->
6242- meck :passthrough ([Sock , Opts ])
6243- end ),
6226+ % % Suspend the receiving client such that it stops reading from its socket
6227+ % % causing TCP back-pressure to the server being applied.
6228+ true = erlang :suspend_process (ReaderPid ),
62446229
62456230 ok = amqp10_client :flow_link_credit (Receiver , Num , never ),
62466231 % % We give the queue time to send messages to the session proc and writer proc.
6247- timer :sleep (2000 ),
6232+ timer :sleep (1000 ),
62486233
62496234 % % Here, we do a bit of white box testing: We assert that RabbitMQ has some form of internal
62506235 % % flow control by checking that the queue sent some but, more importantly, not all its
@@ -6258,14 +6243,14 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) ->
62586243 ok = inet :setopts (Socket , [{recbuf , 65536 },
62596244 {buffer , 65536 }]),
62606245 % % When we resume the receiving client, we expect to receive all messages.
6261- ? assert (meck :validate (Mod )),
6262- ok = meck :unload (Mod ),
6263- ok = Mod :setopts (Socket , [{active , once }]),
6246+ true = erlang :resume_process (ReaderPid ),
62646247 receive_messages (Receiver , Num ),
62656248
62666249 ok = detach_link_sync (Receiver ),
62676250 {ok , #{message_count := 0 }} = rabbitmq_amqp_client :delete_queue (LinkPair , QName ),
6268- ok = close ({Connection , Session , LinkPair }).
6251+ ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
6252+ ok = end_session_sync (Session ),
6253+ ok = close_connection_sync (Connection ).
62696254
62706255session_flow_control_default_max_frame_size (Config ) ->
62716256 QName = atom_to_binary (? FUNCTION_NAME ),
0 commit comments