4747 max_connections ,
4848 timeout ,
4949 clients = dict :new (),
50- queues = dict :new (), % Dest => queue of Froms,
51- pending = dict :new (),
50+ queues = dict :new (), % Dest => queue of {From, Ref, Requester}
51+ pending = dict :new (), % Ref => {From, Dest, Requester}
5252 connections = dict :new (),
5353 sockets = dict :new ()}).
5454
@@ -59,17 +59,35 @@ start() ->
5959 ok .
6060
6161% % @doc fetch a socket from the pool
62- checkout (Host , _Port , Transport , # client {options = Opts ,
63- mod_metrics = Metrics }= Client ) ->
62+ checkout (Host , Port , Transport , Client ) ->
63+ Requester = self (),
64+ Ref = make_ref (),
65+ Fun =
66+ fun () ->
67+ Result =
68+ try
69+ do_checkout (Requester , Host , Port , Transport , Client )
70+ catch _ :_ ->
71+ {error , checkout_failure }
72+ end ,
73+ Requester ! {checkout , Ref , Result }
74+ end ,
75+ _ = spawn (Fun ),
76+ receive
77+ {checkout , Ref , Result } ->
78+ Result
79+ end .
80+
81+ do_checkout (Requester , Host , _Port , Transport , # client {options = Opts ,
82+ mod_metrics = Metrics }= Client ) ->
6483 {Connection , ConnectOptions } = hackney_connection :new (Client ),
65- Pid = self (),
6684 RequestRef = Client # client .request_ref ,
6785 PoolName = proplists :get_value (pool , Opts , default ),
6886 Pool = find_pool (PoolName , Opts ),
6987 ConnectTimeout = proplists :get_value (connect_timeout , Opts , 8000 ),
7088 % % Fall back to using connect_timeout if checkout_timeout is not set
7189 CheckoutTimeout = proplists :get_value (checkout_timeout , Opts , ConnectTimeout ),
72- case catch gen_server :call (Pool , {checkout , Connection , Pid , RequestRef }, CheckoutTimeout ) of
90+ case catch gen_server :call (Pool , {checkout , Connection , Requester , RequestRef }, CheckoutTimeout ) of
7391 {ok , Socket , Owner } ->
7492
7593 % % stats
@@ -84,12 +102,18 @@ checkout(Host, _Port, Transport, #client{options=Opts,
84102 Begin = os :timestamp (),
85103 case hackney_connection :connect (Connection , ConnectOptions , ConnectTimeout ) of
86104 {ok , Socket } ->
87- ? report_trace (" new connection" , []),
88- ConnectTime = timer :now_diff (os :timestamp (), Begin )/ 1000 ,
89- _ = metrics :update_histogram (Metrics , [hackney , Host , connect_time ], ConnectTime ),
90- _ = metrics :increment_counter (Metrics , [hackney_pool , Host , new_connection ]),
91-
92- {ok , {PoolName , RequestRef , Connection , Owner , Transport }, Socket };
105+ case hackney_connection :controlling_process (Connection , Socket , Requester ) of
106+ ok ->
107+ ? report_trace (" new connection" , []),
108+ ConnectTime = timer :now_diff (os :timestamp (), Begin )/ 1000 ,
109+ _ = metrics :update_histogram (Metrics , [hackney , Host , connect_time ], ConnectTime ),
110+ _ = metrics :increment_counter (Metrics , [hackney_pool , Host , new_connection ]),
111+ {ok , {PoolName , RequestRef , Connection , Owner , Transport }, Socket };
112+ Error ->
113+ catch hackney_connection :close (Connection , Socket ),
114+ _ = metrics :increment_counter (Metrics , [hackney , Host , connect_error ]),
115+ Error
116+ end ;
93117 {error , timeout } ->
94118 _ = metrics :increment_counter (Metrics , [hackney , Host , connect_timeout ]),
95119 {error , timeout };
@@ -270,15 +294,15 @@ handle_call(timeout, _From, #state{timeout=Timeout}=State) ->
270294 {reply , Timeout , State };
271295handle_call (max_connections , _From , # state {max_connections = MaxConn }= State ) ->
272296 {reply , MaxConn , State };
273- handle_call ({checkout , Dest , Pid , RequestRef }, From , State ) ->
297+ handle_call ({checkout , Dest , Requester , RequestRef }, From , State ) ->
274298 # state {name = PoolName ,
275299 metrics = Engine ,
276300 max_connections = MaxConn ,
277301 clients = Clients ,
278302 queues = Queues ,
279303 pending = Pending } = State ,
280304
281- {Reply , State2 } = find_connection (Dest , Pid , State ),
305+ {Reply , State2 } = find_connection (Dest , Requester , State ),
282306 case Reply of
283307 {ok , _Socket , _Owner } ->
284308 State3 = monitor_client (Dest , RequestRef , State2 ),
@@ -287,8 +311,8 @@ handle_call({checkout, Dest, Pid, RequestRef}, From, State) ->
287311 no_socket ->
288312 case dict :size (Clients ) >= MaxConn of
289313 true ->
290- Queues2 = add_to_queue (Dest , From , RequestRef , Queues ),
291- Pending2 = add_pending (RequestRef , From , Dest , Pending ),
314+ Queues2 = add_to_queue (Dest , From , RequestRef , Requester , Queues ),
315+ Pending2 = add_pending (RequestRef , From , Dest , Requester , Pending ),
292316 _ = metrics :update_histogram (
293317 Engine , [hackney_pool , PoolName , queue_count ], dict :size (Pending2 )
294318 ),
@@ -395,7 +419,7 @@ dequeue(Dest, Ref, State) ->
395419 case queue_out (Dest , Queues ) of
396420 empty ->
397421 State # state {clients = Clients2 };
398- {ok , {From , Ref2 }, Queues2 } ->
422+ {ok , {From , Ref2 , _Requester }, Queues2 } ->
399423 Pending2 = del_pending (Ref , Pending ),
400424 _ = metrics :update_histogram (
401425 State # state .metrics , [hackney_pool , State # state .name , queue_count ], dict :size (Pending2 )
@@ -487,12 +511,12 @@ cancel_timer(Socket, Timer) ->
487511% ------------------------------------------------------------------------------
488512% % @private
489513% %------------------------------------------------------------------------------
490- add_to_queue (Connection , From , Ref , Queues ) ->
514+ add_to_queue (Connection , From , Ref , Requester , Queues ) ->
491515 case dict :find (Connection , Queues ) of
492516 error ->
493- dict :store (Connection , queue :in ({From , Ref }, queue :new ()), Queues );
517+ dict :store (Connection , queue :in ({From , Ref , Requester }, queue :new ()), Queues );
494518 {ok , Q } ->
495- dict :store (Connection , queue :in ({From , Ref }, Q ), Queues )
519+ dict :store (Connection , queue :in ({From , Ref , Requester }, Q ), Queues )
496520 end .
497521
498522% ------------------------------------------------------------------------------
@@ -503,7 +527,7 @@ del_from_queue(Connection, Ref, Queues) ->
503527 error ->
504528 {Queues , false };
505529 {ok , Q } ->
506- Q2 = queue :filter (fun ({_ , R }) -> R =/= Ref end , Q ),
530+ Q2 = queue :filter (fun ({_ , R , _ }) -> R =/= Ref end , Q ),
507531 Removed = queue :len (Q ) =/= queue :len (Q2 ),
508532 Queues2 = case queue :is_empty (Q2 ) of
509533 true ->
@@ -523,14 +547,14 @@ queue_out(Connection, Queues) ->
523547 empty ;
524548 {ok , Q } ->
525549 case queue :out (Q ) of
526- {{value , {From , Ref }}, Q2 } ->
550+ {{value , {From , Ref , Requester }}, Q2 } ->
527551 Queues2 = case queue :is_empty (Q2 ) of
528552 true ->
529553 dict :erase (Connection , Queues );
530554 false ->
531555 dict :store (Connection , Q2 , Queues )
532556 end ,
533- {ok , {From , Ref }, Queues2 };
557+ {ok , {From , Ref , Requester }, Queues2 };
534558 {empty , _ } ->
535559 % % fix race condition
536560 empty
@@ -545,7 +569,7 @@ deliver_socket(Socket, Connection, State) ->
545569 case queue_out (Connection , Queues ) of
546570 empty ->
547571 store_socket (Connection , Socket , State );
548- {ok , {{ PidWaiter , _ } = FromWaiter , Ref }, Queues2 } ->
572+ {ok , {FromWaiter , Ref , PidWaiter }, Queues2 } ->
549573 Pending2 = del_pending (Ref , Pending ),
550574 _ = metrics :update_histogram (
551575 State # state .metrics , [hackney_pool , State # state .name , queue_count ], dict :size (Pending2 )
@@ -564,8 +588,8 @@ deliver_socket(Socket, Connection, State) ->
564588 end .
565589
566590
567- add_pending (Ref , From , Connection , Pending ) ->
568- dict :store (Ref , {From , Connection }, Pending ).
591+ add_pending (Ref , From , Connection , Requester , Pending ) ->
592+ dict :store (Ref , {From , Connection , Requester }, Pending ).
569593
570594
571595del_pending (Ref , Pending ) ->
@@ -574,13 +598,13 @@ del_pending(Ref, Pending) ->
574598
575599remove_pending (Ref , # state {queues = Queues0 , pending = Pending0 } = State ) ->
576600 case dict :find (Ref , Pending0 ) of
577- {ok , {From , Connection }} ->
601+ {ok , {From , Connection , Requester }} ->
578602 Pending1 = dict :erase (Ref , Pending0 ),
579603 Queues1 = case dict :find (Connection , Queues0 ) of
580604 {ok , Q0 } ->
581605 Q1 = queue :filter (
582606 fun
583- (PendingReq ) when PendingReq =:= {From , Ref } -> false ;
607+ (PendingReq ) when PendingReq =:= {From , Ref , Requester } -> false ;
584608 (_ ) -> true
585609 end ,
586610 Q0
0 commit comments