11# coding=utf-8
22import logging
33import threading
4-
54from contextlib import contextmanager
6- from pika . adapters import TwistedProtocolConnection
5+
76from pika .adapters .twisted_connection import TwistedChannel
7+ from pika .adapters .twisted_connection import TwistedProtocolConnection
88from pika .connection import Parameters
9- from twisted .internet import reactor , defer
9+ from twisted .internet import defer
10+ from twisted .internet import reactor
1011from twisted .internet .protocol import ClientCreator
1112
13+ try :
14+ itervalues = dict .itervalues
15+ except AttributeError :
16+ itervalues = dict .values
17+
1218__all__ = ["VERSION" , "PooledConn" ]
1319
14- VERSION = "0.1.6"
20+
21+ VERSION = "0.2.0"
1522
1623logger = logging .getLogger (__name__ )
1724
@@ -47,7 +54,7 @@ class PooledConn(object):
4754
4855 loop = reactor
4956
50- def __new__ (cls , params , timeout_conn = None , max_size = None ):
57+ def __new__ (cls , params , timeout_conn = 0 , max_size = None ):
5158 """
5259 :param cls:
5360 :param params: connection params
@@ -67,18 +74,22 @@ def __new__(cls, params, timeout_conn=None, max_size=None):
6774 cls ._my_pools [1 ][_id ] = {}
6875 cls ._my_channels [_id ] = {}
6976 cls ._max_size [_id ] = max_size if max_size else cls .max_size
70- cls ._timeout [_id ] = timeout_conn if timeout_conn > 0 else cls .timeout_conn
77+ cls ._timeout [_id ] = (
78+ timeout_conn if timeout_conn > 0 else cls .timeout_conn
79+ )
7180 cls ._waiting [_id ] = []
7281 # only works when first created
7382 instance .__params = cls ._my_params [_id ]
7483 instance .__max_size = cls ._max_size [_id ]
7584 instance .timeout_conn = cls ._timeout [_id ]
76- instance .__idle_pool , instance .__using_pool = (cls ._my_pools [i ][_id ] for i in (0 , 1 ))
85+ instance .__idle_pool , instance .__using_pool = (
86+ cls ._my_pools [i ][_id ] for i in (0 , 1 )
87+ )
7788 instance .__channel_pool = cls ._my_channels [_id ]
7889 instance .waiting = cls ._waiting [_id ]
7990 return instance
8091 else :
81- raise TypeError (' only accept pika Parameters type' )
92+ raise TypeError (" only accept pika Parameters type" )
8293
8394 def __init__ (self , * args , ** kwargs ):
8495 """
@@ -97,25 +108,40 @@ def __connect(self, retrying=False):
97108 params = self .__params
98109 cc = ClientCreator (self .loop , TwistedProtocolConnection , params )
99110 _d = cc .connectTCP (params .host , params .port , timeout = self .timeout_conn )
100- _d .addCallback (lambda p : p .ready )
101- _d .addCallbacks (self ._in_pool ,
102- lambda err : err if retrying or not self .retry else self .__connect (True )) # retry once when err
111+
112+ def conn_ready (c ):
113+ c .ready .addCallback (lambda _ : c )
114+ return c .ready
115+
116+ _d .addCallback (conn_ready )
117+ _d .addCallbacks (
118+ self ._in_pool ,
119+ lambda err : err if retrying or not self .retry else self .__connect (True ),
120+ ) # retry once when err
103121 return _d
104122
105123 def _in_pool (self , conn ):
106- assert isinstance (conn , TwistedProtocolConnection ), 'conn must be TwistedProtocolConnection'
107- logger .debug ('in pool : %s' % conn )
124+ assert isinstance (
125+ conn , TwistedProtocolConnection
126+ ), "conn must be TwistedProtocolConnection"
127+ logger .debug ("in pool : %s" % conn )
108128
109129 _id = id (conn )
110130
111131 if self .size < self .__max_size :
112132 # add hook to clear the bad connection object in the pool
113133 conn .ready = defer .Deferred ()
114- conn .ready .addErrback (self ._clear , self .__idle_pool , self .__using_pool , self .__channel_pool , _id )
134+ conn .ready .addErrback (
135+ self ._clear ,
136+ self .__idle_pool ,
137+ self .__using_pool ,
138+ self .__channel_pool ,
139+ _id ,
140+ )
115141 # add new conn in using pool
116142 self .__using_pool [_id ] = conn
117143 else :
118- raise RuntimeError (' _in_pool, unexpected reach' )
144+ raise RuntimeError (" _in_pool, unexpected reach" )
119145
120146 return conn
121147
@@ -133,10 +159,10 @@ def _clear(reason, idle_pool, using_pool, channel_pool, conn_id):
133159 with _lock ():
134160 try :
135161 idle_pool .pop (conn_id )
136- logger .info (' a connection lost when not using' )
162+ logger .info (" a connection lost when not using" )
137163 except KeyError :
138164 if using_pool .pop (conn_id , None ):
139- logger .warn (' connection lost when using, should be handled later' )
165+ logger .warn (" connection lost when using, should be handled later" )
140166 return reason
141167 finally :
142168 channel_pool .pop (conn_id , None )
@@ -152,7 +178,9 @@ def _get_channel(self, conn):
152178 d .callback (p [_id ])
153179 if d is None :
154180 d = conn .channel ()
155- d .addCallback (lambda ch : p .update ({_id : ch }) or setattr (ch , 'pool_id_' , _id ) or ch )
181+ d .addCallback (
182+ lambda ch : p .update ({_id : ch }) or setattr (ch , "pool_id_" , _id ) or ch
183+ )
156184
157185 def _h_err (ch , _conn ):
158186 _conn .ready .addErrback (lambda _ , _c : p .pop (id (_c ), None ), _conn )
@@ -203,5 +231,5 @@ def size(self):
203231
204232 def clear (self ):
205233 with _lock ():
206- for c in self .__idle_pool . itervalues ( ):
234+ for c in itervalues ( self .__idle_pool ):
207235 c .close ()
0 commit comments