11import gevent
22import time
3+ from gevent .event import AsyncResult
34
45
56class Noparallel (object ): # Only allow function running once in same time
@@ -25,12 +26,14 @@ def wrapper(*args, **kwargs):
2526 self .queued = True
2627 thread = self .threads [key ]
2728 if self .blocking :
28- thread .join () # Blocking until its finished
2929 if self .queued :
30+ res = thread .get () # Blocking until its finished
31+ if key in self .threads :
32+ return self .threads [key ].get () # Queue finished since started running
3033 self .queued = False
3134 return wrapper (* args , ** kwargs ) # Run again after the end
3235 else :
33- return thread .value # Return the value
36+ return thread .get () # Return the value
3437
3538 else : # No blocking
3639 if thread .ready (): # Its finished, create a new
@@ -40,14 +43,22 @@ def wrapper(*args, **kwargs):
4043 else : # Still running
4144 return thread
4245 else : # Thread not running
43- thread = gevent .spawn (func , * args , ** kwargs ) # Spawning new thread
44- thread .link (lambda thread : self .cleanup (key , thread ))
45- self .threads [key ] = thread
4646 if self .blocking : # Wait for finish
47- thread .join ()
48- ret = thread .value
49- return ret
47+ asyncres = AsyncResult ()
48+ self .threads [key ] = asyncres
49+ try :
50+ res = func (* args , ** kwargs )
51+ asyncres .set (res )
52+ self .cleanup (key , asyncres )
53+ return res
54+ except Exception as err :
55+ asyncres .set_exception (err )
56+ self .cleanup (key , asyncres )
57+ raise (err )
5058 else : # No blocking just return the thread
59+ thread = gevent .spawn (func , * args , ** kwargs ) # Spawning new thread
60+ thread .link (lambda thread : self .cleanup (key , thread ))
61+ self .threads [key ] = thread
5162 return thread
5263 wrapper .__name__ = func .__name__
5364
@@ -60,6 +71,8 @@ def cleanup(self, key, thread):
6071
6172
6273if __name__ == "__main__" :
74+
75+
6376 class Test ():
6477
6578 @Noparallel ()
@@ -145,11 +158,40 @@ def printThreadNum():
145158 print ("Created in %.3fs" % (time .time () - s ))
146159 printThreadNum ()
147160 time .sleep (5 )
161+
162+ def testException ():
163+ import time
164+ @Noparallel (blocking = True , queue = True )
165+ def count (self , num = 5 ):
166+ s = time .time ()
167+ # raise Exception("err")
168+ for i in range (num ):
169+ print (self , i )
170+ time .sleep (1 )
171+ return "%s return:%s" % (s , i )
172+ def caller ():
173+ try :
174+ print ("Ret:" , count (5 ))
175+ except Exception as err :
176+ print ("Raised:" , repr (err ))
177+
178+ gevent .joinall ([
179+ gevent .spawn (caller ),
180+ gevent .spawn (caller ),
181+ gevent .spawn (caller ),
182+ gevent .spawn (caller )
183+ ])
184+
185+
148186 from gevent import monkey
149187 monkey .patch_all ()
150188
189+ testException ()
190+
191+ """
151192 testBenchmark()
152193 print("Testing blocking mode...")
153194 testBlocking()
154195 print("Testing noblocking mode...")
155196 testNoblocking()
197+ """
0 commit comments