2
2
3
3
import asyncio
4
4
import concurrent .futures
5
- import contextlib
6
5
import contextvars
7
6
import functools
8
7
import inspect
9
8
import threading
10
9
import time
11
- import traceback
12
10
import warnings
13
11
import weakref
14
12
from collections import deque
@@ -285,9 +283,6 @@ def check_canceled_sync() -> bool:
285
283
return True
286
284
287
285
288
- # __threadpool_executor = ThreadPoolExecutor(thread_name_prefix="sub_asyncio")
289
-
290
-
291
286
def run_in_thread (func : Callable [..., _T ], / , * args : Any , ** kwargs : Any ) -> asyncio .Future [_T ]:
292
287
global __tread_pool_executor
293
288
@@ -296,12 +291,6 @@ def run_in_thread(func: Callable[..., _T], /, *args: Any, **kwargs: Any) -> asyn
296
291
ctx = contextvars .copy_context ()
297
292
func_call = functools .partial (ctx .run , func , * args , ** kwargs )
298
293
299
- # return cast(
300
- # "asyncio.Future[_T]",
301
- # # loop.run_in_executor(__threadpool_executor, cast(Callable[..., _T], func_call)),
302
- # loop.run_in_executor(None, cast(Callable[..., _T], func_call)),
303
- # )
304
-
305
294
executor = ThreadPoolExecutor (max_workers = 1 , thread_name_prefix = "sub_asyncio" )
306
295
try :
307
296
return cast (
@@ -341,33 +330,9 @@ async def create_inner_task(coro: Callable[..., Coroutine[Any, Any, _T]], *args:
341
330
return await inner_task
342
331
343
332
def run (coro : Callable [..., Coroutine [Any , Any , _T ]], * args : Any , ** kwargs : Any ) -> _T :
344
-
345
- old_name = threading .current_thread ().name
346
333
threading .current_thread ().name = coro .__qualname__
347
- try :
348
- return asyncio .run (create_inner_task (coro , * args , ** kwargs ))
349
- finally :
350
- threading .current_thread ().name = old_name
351
-
352
- # loop = asyncio.new_event_loop()
353
-
354
- # try:
355
- # asyncio.set_event_loop(loop)
356
-
357
- # t = loop.create_task(create_inner_task(coro, *args, **kwargs), name=coro.__qualname__)
358
-
359
- # return loop.run_until_complete(t)
360
- # finally:
361
- # try:
362
- # running_tasks = asyncio.all_tasks(loop)
363
- # if running_tasks:
364
- # loop.run_until_complete(asyncio.gather(*running_tasks, return_exceptions=True))
365
334
366
- # loop.run_until_complete(loop.shutdown_asyncgens())
367
- # finally:
368
- # asyncio.set_event_loop(None)
369
- # loop.close()
370
- # threading.current_thread().setName(old_name)
335
+ return asyncio .run (create_inner_task (coro , * args , ** kwargs ))
371
336
372
337
cti = get_current_future_info ()
373
338
result = run_in_thread (run , coro , * args , ** kwargs )
@@ -391,58 +356,6 @@ def done(task: asyncio.Future[_T]) -> None:
391
356
return result
392
357
393
358
394
- @contextlib .asynccontextmanager
395
- async def async_lock (lock : threading .RLock ) -> AsyncGenerator [None , None ]:
396
- import time
397
-
398
- start_time = time .monotonic ()
399
- locked = lock .acquire (blocking = False )
400
- while not locked :
401
- if time .monotonic () - start_time >= 1800 :
402
- raise TimeoutError ("Timeout waiting for lock" )
403
-
404
- await asyncio .sleep (0.001 )
405
-
406
- locked = lock .acquire (blocking = False )
407
- try :
408
- yield
409
- finally :
410
- if locked :
411
- lock .release ()
412
- # with lock:
413
- # yield
414
-
415
-
416
- class NewEvent :
417
- def __init__ (self , value : bool = False ) -> None :
418
- self ._event = threading .Event ()
419
- if value :
420
- self ._event .set ()
421
-
422
- def is_set (self ) -> bool :
423
- return self ._event .is_set ()
424
-
425
- def set (self ) -> None :
426
- self ._event .set ()
427
-
428
- def clear (self ) -> None :
429
- self ._event .clear ()
430
-
431
- async def wait (self , timeout : Optional [float ] = None ) -> bool :
432
- if timeout is not None and timeout > 0 :
433
- start = time .monotonic ()
434
- else :
435
- start = None
436
-
437
- while not (result := self .is_set ()):
438
- if start is not None and timeout is not None and (time .monotonic () - start ) > timeout :
439
- break
440
-
441
- await asyncio .sleep (0 )
442
-
443
- return result
444
-
445
-
446
359
class Event :
447
360
"""Thread safe version of an async Event"""
448
361
@@ -516,208 +429,6 @@ async def wait(self, timeout: Optional[float] = None) -> bool:
516
429
return False
517
430
518
431
519
- class Semaphore :
520
- """Thread safe version of a Semaphore"""
521
-
522
- def __init__ (self , value : int = 1 ) -> None :
523
- if value < 0 :
524
- raise ValueError ("Semaphore initial value must be >= 0" )
525
- self ._value = value
526
- self ._waiters : Deque [asyncio .Future [Any ]] = deque ()
527
-
528
- self ._lock = threading .RLock ()
529
-
530
- def __repr__ (self ) -> str :
531
- res = super ().__repr__ ()
532
- extra = "locked" if self .locked () else f"unlocked, value:{ self ._value } "
533
- if self ._waiters :
534
- extra = f"{ extra } , waiters:{ len (self ._waiters )} "
535
- return f"<{ res [1 :- 1 ]} [{ extra } ]>"
536
-
537
- async def _wake_up_next (self ) -> None :
538
- async with async_lock (self ._lock ):
539
- while self ._waiters :
540
- waiter = self ._waiters .popleft ()
541
-
542
- if not waiter .done ():
543
- if waiter .get_loop () == asyncio .get_running_loop ():
544
- if not waiter .done ():
545
- waiter .set_result (True )
546
- else :
547
- if waiter .get_loop ().is_running ():
548
-
549
- def set_result (w : asyncio .Future [Any ], ev : threading .Event ) -> None :
550
- try :
551
- if w .get_loop ().is_running () and not w .done ():
552
- w .set_result (True )
553
- finally :
554
- ev .set ()
555
-
556
- if not waiter .done ():
557
- done = threading .Event ()
558
-
559
- waiter .get_loop ().call_soon_threadsafe (set_result , waiter , done )
560
-
561
- start = time .monotonic ()
562
- while not done .is_set ():
563
-
564
- if time .monotonic () - start > 120 :
565
- raise TimeoutError ("Can't set future result." )
566
-
567
- await asyncio .sleep (0.001 )
568
-
569
- def locked (self ) -> bool :
570
- with self ._lock :
571
- return self ._value == 0
572
-
573
- async def acquire (self , timeout : Optional [float ] = None ) -> bool :
574
- while True :
575
- async with async_lock (self ._lock ):
576
- if self ._value > 0 :
577
- break
578
- fut = create_sub_future ()
579
- self ._waiters .append (fut )
580
-
581
- try :
582
- await asyncio .wait_for (fut , timeout )
583
- except asyncio .TimeoutError :
584
- return False
585
- except (SystemExit , KeyboardInterrupt ):
586
- raise
587
-
588
- except BaseException :
589
- if not fut .done ():
590
- fut .cancel ()
591
- if self ._value > 0 and not fut .cancelled ():
592
- await self ._wake_up_next ()
593
-
594
- raise
595
-
596
- async with async_lock (self ._lock ):
597
- self ._value -= 1
598
-
599
- return True
600
-
601
- async def release (self ) -> None :
602
- self ._value += 1
603
- await self ._wake_up_next ()
604
-
605
- async def __aenter__ (self ) -> None :
606
- await self .acquire ()
607
-
608
- async def __aexit__ (
609
- self , exc_type : Optional [Type [BaseException ]], exc_val : Optional [BaseException ], exc_tb : Optional [TracebackType ]
610
- ) -> None :
611
- await self .release ()
612
-
613
-
614
- class BoundedSemaphore (Semaphore ):
615
- """Thread safe version of a BoundedSemaphore"""
616
-
617
- def __init__ (self , value : int = 1 ) -> None :
618
- self ._bound_value = value
619
- super ().__init__ (value )
620
-
621
- async def release (self ) -> None :
622
- if self ._value >= self ._bound_value :
623
- raise ValueError ("BoundedSemaphore released too many times" )
624
- await super ().release ()
625
-
626
-
627
- class NewNewLock :
628
- def __init__ (self ) -> None :
629
- self ._lock = threading .Lock ()
630
- self ._owner_thread : Optional [threading .Thread ] = None
631
- self ._owner_task : Optional [asyncio .Task [Any ]] = None
632
-
633
- def locked (self ) -> bool :
634
- return self ._lock .locked ()
635
-
636
- def acquire (self , blocking : bool = True , timeout : float = - 1 ) -> bool :
637
- return self ._lock .acquire (blocking = blocking , timeout = timeout )
638
-
639
- def release (self ) -> None :
640
- self ._lock .release ()
641
- self ._owner_task = None
642
- self ._owner_thread = None
643
-
644
- async def acquire_async (self , blocking : bool = True , timeout : float = - 1 ) -> bool :
645
- start = time .monotonic ()
646
- while not (aquired := self .acquire (blocking = False )):
647
- if not blocking :
648
- return False
649
-
650
- current = time .monotonic () - start
651
- if timeout > 0 and current > timeout :
652
- break
653
-
654
- if current > 30 and self ._owner_task is not None :
655
- tb = traceback .format_stack (self ._owner_task .get_stack ()[0 ]) if self ._owner_task is not None else ""
656
- warnings .warn (
657
- f"locking takes to long { self ._owner_thread } { self ._owner_task } { tb } " ,
658
- )
659
-
660
- await asyncio .sleep (0 )
661
-
662
- try :
663
- await asyncio .sleep (0 )
664
- except asyncio .CancelledError :
665
- if aquired :
666
- self ._lock .release ()
667
- aquired = False
668
- raise
669
-
670
- self ._owner_task = asyncio .current_task ()
671
- self ._owner_thread = threading .current_thread ()
672
-
673
- return aquired
674
-
675
- async def release_async (self ) -> None :
676
- self .release ()
677
-
678
- def __enter__ (self ) -> None :
679
- self .acquire ()
680
-
681
- def __exit__ (
682
- self , exc_type : Optional [Type [BaseException ]], exc_val : Optional [BaseException ], exc_tb : Optional [TracebackType ]
683
- ) -> None :
684
- self .release ()
685
-
686
- async def __aenter__ (self ) -> None :
687
- await self .acquire_async ()
688
-
689
- async def __aexit__ (
690
- self , exc_type : Optional [Type [BaseException ]], exc_val : Optional [BaseException ], exc_tb : Optional [TracebackType ]
691
- ) -> None :
692
- await self .release_async ()
693
-
694
-
695
- class NewLock :
696
- def __init__ (self ) -> None :
697
- self ._block = BoundedSemaphore (value = 1 )
698
-
699
- def __repr__ (self ) -> str :
700
- return "<%s _block=%s>" % (self .__class__ .__name__ , self ._block )
701
-
702
- async def acquire (self , timeout : Optional [float ] = None ) -> bool :
703
- return await self ._block .acquire (timeout )
704
-
705
- async def release (self ) -> None :
706
- await self ._block .release ()
707
-
708
- @property
709
- def locked (self ) -> bool :
710
- return self ._block .locked ()
711
-
712
- async def __aenter__ (self ) -> None :
713
- await self .acquire ()
714
-
715
- async def __aexit__ (
716
- self , exc_type : Optional [Type [BaseException ]], exc_val : Optional [BaseException ], exc_tb : Optional [TracebackType ]
717
- ) -> None :
718
- await self .release ()
719
-
720
-
721
432
class Lock :
722
433
"""Threadsafe version of an async Lock."""
723
434
@@ -797,11 +508,11 @@ async def _wake_up_next(self) -> None:
797
508
if fut in self ._waiters :
798
509
self ._waiters .remove (fut )
799
510
800
- if fut .get_loop () == asyncio . get_running_loop ():
801
- if not fut .done ():
802
- fut .set_result ( True )
803
- else :
804
- if fut . get_loop (). is_running () :
511
+ if fut .get_loop (). is_running () and not fut . get_loop (). is_closed ():
512
+ if fut .get_loop () == asyncio . get_running_loop ():
513
+ if not fut .done ():
514
+ fut . set_result ( True )
515
+ else :
805
516
806
517
def set_result (w : asyncio .Future [Any ], ev : threading .Event ) -> None :
807
518
try :
@@ -822,6 +533,9 @@ def set_result(w: asyncio.Future[Any], ev: threading.Event) -> None:
822
533
raise TimeoutError ("Can't set future result." )
823
534
824
535
await asyncio .sleep (0.001 )
536
+ else :
537
+ warnings .warn (f"Future { repr (fut )} loop is closed" )
538
+ await self ._wake_up_next ()
825
539
826
540
827
541
class FutureInfo :
0 commit comments