Skip to content

Commit bde17f1

Browse files
authored
Merge pull request #652 from kpribylov/fix/#651
Prevent sending unexpected messages in client message queue
2 parents d8a0d97 + 23d44cb commit bde17f1

File tree

1 file changed

+57
-31
lines changed

1 file changed

+57
-31
lines changed

src/hackney_pool.erl

Lines changed: 57 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@
4747
max_connections,
4848
timeout,
4949
clients = dict:new(),
50-
queues = dict:new(), % Dest => queue of Froms,
51-
pending = dict:new(),
50+
queues = dict:new(), % Dest => queue of {From, Ref, Requester}
51+
pending = dict:new(), % Ref => {From, Dest, Requester}
5252
connections = dict:new(),
5353
sockets = dict:new()}).
5454

@@ -59,17 +59,37 @@ start() ->
5959
ok.
6060

6161
%% @doc fetch a socket from the pool
62-
checkout(Host, _Port, Transport, #client{options=Opts,
63-
mod_metrics=Metrics}=Client) ->
62+
checkout(Host, Port, Transport, #client{options = Opts} = Client) ->
63+
ConnectTimeout = proplists:get_value(connect_timeout, Opts, 8000),
64+
%% Fall back to using connect_timeout if checkout_timeout is not set
65+
CheckoutTimeout = proplists:get_value(checkout_timeout, Opts, ConnectTimeout),
66+
Requester = self(),
67+
Ref = make_ref(),
68+
Fun =
69+
fun() ->
70+
Result =
71+
try
72+
do_checkout(Requester, Host, Port, Transport, Client, ConnectTimeout, CheckoutTimeout)
73+
catch _:_ ->
74+
{error, checkout_failure}
75+
end,
76+
Requester ! {checkout, Ref, Result}
77+
end,
78+
_ = spawn(Fun),
79+
receive
80+
{checkout, Ref, Result} ->
81+
Result
82+
after CheckoutTimeout ->
83+
{error, checkout_timeout}
84+
end.
85+
86+
do_checkout(Requester, Host, _Port, Transport, #client{options=Opts,
87+
mod_metrics=Metrics}=Client, ConnectTimeout, CheckoutTimeout) ->
6488
{Connection, ConnectOptions} = hackney_connection:new(Client),
65-
Pid = self(),
6689
RequestRef = Client#client.request_ref,
6790
PoolName = proplists:get_value(pool, Opts, default),
6891
Pool = find_pool(PoolName, Opts),
69-
ConnectTimeout = proplists:get_value(connect_timeout, Opts, 8000),
70-
%% Fall back to using connect_timeout if checkout_timeout is not set
71-
CheckoutTimeout = proplists:get_value(checkout_timeout, Opts, ConnectTimeout),
72-
case catch gen_server:call(Pool, {checkout, Connection, Pid, RequestRef}, CheckoutTimeout) of
92+
case catch gen_server:call(Pool, {checkout, Connection, Requester, RequestRef}, CheckoutTimeout) of
7393
{ok, Socket, Owner} ->
7494

7595
%% stats
@@ -84,12 +104,18 @@ checkout(Host, _Port, Transport, #client{options=Opts,
84104
Begin = os:timestamp(),
85105
case hackney_connection:connect(Connection, ConnectOptions, ConnectTimeout) of
86106
{ok, Socket} ->
87-
?report_trace("new connection", []),
88-
ConnectTime = timer:now_diff(os:timestamp(), Begin)/1000,
89-
_ = metrics:update_histogram(Metrics, [hackney, Host, connect_time], ConnectTime),
90-
_ = metrics:increment_counter(Metrics, [hackney_pool, Host, new_connection]),
91-
92-
{ok, {PoolName, RequestRef, Connection, Owner, Transport}, Socket};
107+
case hackney_connection:controlling_process(Connection, Socket, Requester) of
108+
ok ->
109+
?report_trace("new connection", []),
110+
ConnectTime = timer:now_diff(os:timestamp(), Begin)/1000,
111+
_ = metrics:update_histogram(Metrics, [hackney, Host, connect_time], ConnectTime),
112+
_ = metrics:increment_counter(Metrics, [hackney_pool, Host, new_connection]),
113+
{ok, {PoolName, RequestRef, Connection, Owner, Transport}, Socket};
114+
Error ->
115+
catch hackney_connection:close(Connection, Socket),
116+
_ = metrics:increment_counter(Metrics, [hackney, Host, connect_error]),
117+
Error
118+
end;
93119
{error, timeout} ->
94120
_ = metrics:increment_counter(Metrics, [hackney, Host, connect_timeout]),
95121
{error, timeout};
@@ -270,15 +296,15 @@ handle_call(timeout, _From, #state{timeout=Timeout}=State) ->
270296
{reply, Timeout, State};
271297
handle_call(max_connections, _From, #state{max_connections=MaxConn}=State) ->
272298
{reply, MaxConn, State};
273-
handle_call({checkout, Dest, Pid, RequestRef}, From, State) ->
299+
handle_call({checkout, Dest, Requester, RequestRef}, From, State) ->
274300
#state{name=PoolName,
275301
metrics = Engine,
276302
max_connections=MaxConn,
277303
clients=Clients,
278304
queues = Queues,
279305
pending = Pending} = State,
280306

281-
{Reply, State2} = find_connection(Dest, Pid, State),
307+
{Reply, State2} = find_connection(Dest, Requester, State),
282308
case Reply of
283309
{ok, _Socket, _Owner} ->
284310
State3 = monitor_client(Dest, RequestRef, State2),
@@ -287,8 +313,8 @@ handle_call({checkout, Dest, Pid, RequestRef}, From, State) ->
287313
no_socket ->
288314
case dict:size(Clients) >= MaxConn of
289315
true ->
290-
Queues2 = add_to_queue(Dest, From, RequestRef, Queues),
291-
Pending2 =add_pending(RequestRef, From, Dest, Pending),
316+
Queues2 = add_to_queue(Dest, From, RequestRef, Requester, Queues),
317+
Pending2 = add_pending(RequestRef, From, Dest, Requester, Pending),
292318
_ = metrics:update_histogram(
293319
Engine, [hackney_pool, PoolName, queue_count], dict:size(Pending2)
294320
),
@@ -395,7 +421,7 @@ dequeue(Dest, Ref, State) ->
395421
case queue_out(Dest, Queues) of
396422
empty ->
397423
State#state{clients = Clients2};
398-
{ok, {From, Ref2}, Queues2} ->
424+
{ok, {From, Ref2, _Requester}, Queues2} ->
399425
Pending2 = del_pending(Ref, Pending),
400426
_ = metrics:update_histogram(
401427
State#state.metrics, [hackney_pool, State#state.name, queue_count], dict:size(Pending2)
@@ -487,12 +513,12 @@ cancel_timer(Socket, Timer) ->
487513
%------------------------------------------------------------------------------
488514
%% @private
489515
%%------------------------------------------------------------------------------
490-
add_to_queue(Connection, From, Ref, Queues) ->
516+
add_to_queue(Connection, From, Ref, Requester, Queues) ->
491517
case dict:find(Connection, Queues) of
492518
error ->
493-
dict:store(Connection, queue:in({From, Ref}, queue:new()), Queues);
519+
dict:store(Connection, queue:in({From, Ref, Requester}, queue:new()), Queues);
494520
{ok, Q} ->
495-
dict:store(Connection, queue:in({From, Ref}, Q), Queues)
521+
dict:store(Connection, queue:in({From, Ref, Requester}, Q), Queues)
496522
end.
497523

498524
%------------------------------------------------------------------------------
@@ -503,7 +529,7 @@ del_from_queue(Connection, Ref, Queues) ->
503529
error ->
504530
{Queues, false};
505531
{ok, Q} ->
506-
Q2 = queue:filter(fun({_, R}) -> R =/= Ref end, Q),
532+
Q2 = queue:filter(fun({_, R, _}) -> R =/= Ref end, Q),
507533
Removed = queue:len(Q) =/= queue:len(Q2),
508534
Queues2 = case queue:is_empty(Q2) of
509535
true ->
@@ -523,14 +549,14 @@ queue_out(Connection, Queues) ->
523549
empty;
524550
{ok, Q} ->
525551
case queue:out(Q) of
526-
{{value, {From, Ref}}, Q2} ->
552+
{{value, {From, Ref, Requester}}, Q2} ->
527553
Queues2 = case queue:is_empty(Q2) of
528554
true ->
529555
dict:erase(Connection, Queues);
530556
false ->
531557
dict:store(Connection, Q2, Queues)
532558
end,
533-
{ok, {From, Ref}, Queues2};
559+
{ok, {From, Ref, Requester}, Queues2};
534560
{empty, _} ->
535561
%% fix race condition
536562
empty
@@ -545,7 +571,7 @@ deliver_socket(Socket, Connection, State) ->
545571
case queue_out(Connection, Queues) of
546572
empty ->
547573
store_socket(Connection, Socket, State);
548-
{ok, {{PidWaiter, _} = FromWaiter, Ref}, Queues2} ->
574+
{ok, {FromWaiter, Ref, PidWaiter}, Queues2} ->
549575
Pending2 = del_pending(Ref, Pending),
550576
_ = metrics:update_histogram(
551577
State#state.metrics, [hackney_pool, State#state.name, queue_count], dict:size(Pending2)
@@ -564,8 +590,8 @@ deliver_socket(Socket, Connection, State) ->
564590
end.
565591

566592

567-
add_pending(Ref, From, Connection, Pending) ->
568-
dict:store(Ref, {From, Connection}, Pending).
593+
add_pending(Ref, From, Connection, Requester, Pending) ->
594+
dict:store(Ref, {From, Connection, Requester}, Pending).
569595

570596

571597
del_pending(Ref, Pending) ->
@@ -574,13 +600,13 @@ del_pending(Ref, Pending) ->
574600

575601
remove_pending(Ref, #state{queues=Queues0, pending=Pending0} = State) ->
576602
case dict:find(Ref, Pending0) of
577-
{ok, {From, Connection}} ->
603+
{ok, {From, Connection, Requester}} ->
578604
Pending1 = dict:erase(Ref, Pending0),
579605
Queues1 = case dict:find(Connection, Queues0) of
580606
{ok, Q0} ->
581607
Q1 = queue:filter(
582608
fun
583-
(PendingReq) when PendingReq =:= {From, Ref} -> false;
609+
(PendingReq) when PendingReq =:= {From, Ref, Requester} -> false;
584610
(_) -> true
585611
end,
586612
Q0

0 commit comments

Comments
 (0)