@@ -47,6 +47,9 @@ class ManabiContextLockMixin(ABC):
4747 def acquire (self ):
4848 pass
4949
50+ def __init__ (self ):
51+ self ._id = - 1
52+
5053 @abstractmethod
5154 def release (self ):
5255 pass
@@ -57,6 +60,13 @@ def __enter__(self):
5760 def __exit__ (self , exc_type , exc_value , traceback ):
5861 self .release ()
5962
63+ def _check_and_set_tid (self ):
64+ tid : int = threading .get_ident ()
65+ tid_is_set = self ._id != - 1
66+ if tid_is_set and self ._id != tid :
67+ _logger .error ("Do not use from multiple threads" )
68+ self ._id = tid
69+
6070
6171class ManabiShelfLock (ManabiContextLockMixin ):
6272 _storage_object : Callable [[], "ManabiShelfLockLockStorage" ]
@@ -69,35 +79,29 @@ def __init__(self, storage_path, storage_object: "ManabiShelfLockLockStorage"):
6979 self ._lock_file = open (f"{ storage_path } .lock" , "wb+" )
7080 self ._fd = self ._lock_file .fileno ()
7181 self .acquire_write = self .acquire_read = self .acquire
72- self . _id = - 1
82+ super (). __init__ ()
7383
7484 def acquire (self ):
75- tid = threading .get_ident ()
76- if self ._id and self ._id != tid :
77- _logger .error ("Do not use from multiple threads" )
78- self ._id = tid
85+ self ._check_and_set_tid ()
7986 if self ._semaphore == 0 :
8087 fcntl .flock (self ._fd , fcntl .LOCK_EX )
8188 self ._storage_object ()._dict = shelve .open (str (self ._storage_path ))
8289 self ._semaphore += 1
8390
8491 def release (self ):
85- tid = threading .get_ident ()
8692 if self ._semaphore == 0 :
8793 _logger .error (
8894 f"Inconsistent use of lock. { '' .join (traceback .format_stack ())} "
8995 )
90- if self ._id and self ._id != tid :
91- _logger .error ("Do not use from multiple threads" )
92- self ._id = tid
96+
97+ self ._check_and_set_tid ()
9398 self ._semaphore -= 1
9499 if self ._semaphore == 0 :
95100 storage_object = self ._storage_object ()
96101 if storage_object ._dict is not None :
97102 storage_object ._dict .close ()
98103 storage_object ._dict = None
99104 fcntl .flock (self ._fd , fcntl .LOCK_UN )
100- self ._id = threading .get_ident ()
101105
102106
103107class ManabiShelfLockLockStorage (LockStorageDict , ManabiTimeoutMixin ):
@@ -150,18 +154,15 @@ class ManabiPostgresLock(ManabiContextLockMixin):
150154 _storage_object : Callable [[], "ManabiDbLockStorage" ]
151155
152156 def __init__ (self , storage_object : "ManabiDbLockStorage" ):
153- self ._id = None
154157 # type manually checked
155158 self ._storage_object = weakref .ref (storage_object ) # type: ignore
156159 self .acquire_write = self .acquire_read = self .acquire
157160 self ._semaphore = 0
158- self . _id = - 1
161+ super (). __init__ ()
159162
160163 def acquire (self ):
161164 tid = threading .get_ident ()
162- if self ._id and self ._id != tid :
163- _logger .error ("Do not use from multiple threads" )
164- self ._id = tid
165+ self ._check_and_set_tid ()
165166 if self ._semaphore == 0 :
166167 _logger .info (f"{ tid } acquire" )
167168 self ._storage_object ().execute (
@@ -175,9 +176,7 @@ def release(self):
175176 _logger .error (
176177 f"Inconsistent use of lock. { '' .join (traceback .format_stack ())} "
177178 )
178- if self ._id and self ._id != tid :
179- _logger .error ("Do not use from multiple threads" )
180- self ._id = tid
179+ self ._check_and_set_tid ()
181180 self ._semaphore -= 1
182181 if self ._semaphore == 0 :
183182 _logger .info (f"{ tid } release" )
@@ -258,10 +257,11 @@ def __getitem__(self, token):
258257 cursor = self ._storage_object ().execute (
259258 "SELECT data FROM manabi_lock WHERE token = %s" , (str (token ),)
260259 )
261- lock = cursor .fetchone ( )
262- if lock is None :
260+ locks = cursor .fetchmany ( 1 )
261+ if not len ( locks ) :
263262 raise KeyError (f"{ token } not found" )
264- lock = lock [0 ]
263+
264+ lock = locks [0 ][0 ] # first row, first col
265265 self .encode_lock (lock )
266266 return lock
267267
0 commit comments