15
15
16
16
use std:: {
17
17
future:: Future ,
18
- marker:: PhantomData ,
19
18
pin:: Pin ,
20
19
sync:: { Arc , Mutex } ,
21
20
task:: { Context , Poll } ,
22
21
} ;
23
22
24
- use futures:: {
25
- channel:: { mpsc, oneshot} ,
26
- SinkExt ,
27
- } ;
28
- use once_cell:: sync:: OnceCell ;
29
- use pin_project_lite:: pin_project;
30
- use pyo3:: prelude:: * ;
31
-
32
23
use crate :: {
33
24
asyncio, call_soon_threadsafe, close, create_future, dump_err, err:: RustPanic ,
34
25
get_running_loop, into_future_with_locals, TaskLocals ,
35
26
} ;
27
+ use futures:: channel:: oneshot;
28
+ #[ cfg( feature = "unstable-streams" ) ]
29
+ use futures:: { channel:: mpsc, SinkExt } ;
30
+ #[ cfg( feature = "unstable-streams" ) ]
31
+ use once_cell:: sync:: OnceCell ;
32
+ use pin_project_lite:: pin_project;
33
+ use pyo3:: prelude:: * ;
34
+ #[ cfg( feature = "unstable-streams" ) ]
35
+ use std:: marker:: PhantomData ;
36
36
37
37
/// Generic utilities for a JoinError
38
38
pub trait JoinError {
@@ -585,7 +585,7 @@ where
585
585
{
586
586
let ( cancel_tx, cancel_rx) = oneshot:: channel ( ) ;
587
587
588
- let py_fut = create_future ( locals. event_loop . clone ( ) . into_bound ( py ) ) ?;
588
+ let py_fut = create_future ( locals. event_loop . bind ( py ) . clone ( ) ) ?;
589
589
py_fut. call_method1 (
590
590
"add_done_callback" ,
591
591
( PyDoneCallback {
@@ -594,14 +594,14 @@ where
594
594
) ?;
595
595
596
596
let future_tx1 = PyObject :: from ( py_fut. clone ( ) ) ;
597
- let future_tx2 = future_tx1. clone ( ) ;
597
+ let future_tx2 = future_tx1. clone_ref ( py ) ;
598
598
599
599
R :: spawn ( async move {
600
- let locals2 = locals. clone ( ) ;
600
+ let locals2 = Python :: with_gil ( |py| locals. clone_ref ( py ) ) ;
601
601
602
602
if let Err ( e) = R :: spawn ( async move {
603
603
let result = R :: scope (
604
- locals2. clone ( ) ,
604
+ Python :: with_gil ( |py| locals2. clone_ref ( py ) ) ,
605
605
Cancellable :: new_with_cancel_rx ( fut, cancel_rx) ,
606
606
)
607
607
. await ;
@@ -990,7 +990,7 @@ where
990
990
{
991
991
let ( cancel_tx, cancel_rx) = oneshot:: channel ( ) ;
992
992
993
- let py_fut = create_future ( locals. event_loop . clone ( ) . into_bound ( py) ) ?;
993
+ let py_fut = create_future ( locals. event_loop . clone_ref ( py ) . into_bound ( py) ) ?;
994
994
py_fut. call_method1 (
995
995
"add_done_callback" ,
996
996
( PyDoneCallback {
@@ -999,14 +999,14 @@ where
999
999
) ?;
1000
1000
1001
1001
let future_tx1 = PyObject :: from ( py_fut. clone ( ) ) ;
1002
- let future_tx2 = future_tx1. clone ( ) ;
1002
+ let future_tx2 = future_tx1. clone_ref ( py ) ;
1003
1003
1004
1004
R :: spawn_local ( async move {
1005
- let locals2 = locals. clone ( ) ;
1005
+ let locals2 = Python :: with_gil ( |py| locals. clone_ref ( py ) ) ;
1006
1006
1007
1007
if let Err ( e) = R :: spawn_local ( async move {
1008
1008
let result = R :: scope_local (
1009
- locals2. clone ( ) ,
1009
+ Python :: with_gil ( |py| locals2. clone_ref ( py ) ) ,
1010
1010
Cancellable :: new_with_cancel_rx ( fut, cancel_rx) ,
1011
1011
)
1012
1012
. await ;
@@ -1446,27 +1446,12 @@ where
1446
1446
into_stream_with_locals_v1 :: < R > ( get_current_locals :: < R > ( gen. py ( ) ) ?, gen)
1447
1447
}
1448
1448
1449
- #[ allow( dead_code) ]
1450
- fn py_true ( ) -> PyObject {
1451
- static TRUE : OnceCell < PyObject > = OnceCell :: new ( ) ;
1452
- TRUE . get_or_init ( || Python :: with_gil ( |py| true . into_py ( py) ) )
1453
- . clone ( )
1454
- }
1455
-
1456
- #[ allow( dead_code) ]
1457
- fn py_false ( ) -> PyObject {
1458
- static FALSE : OnceCell < PyObject > = OnceCell :: new ( ) ;
1459
- FALSE
1460
- . get_or_init ( || Python :: with_gil ( |py| false . into_py ( py) ) )
1461
- . clone ( )
1462
- }
1463
-
1464
1449
trait Sender : Send + ' static {
1465
- fn send ( & mut self , locals : TaskLocals , item : PyObject ) -> PyResult < PyObject > ;
1450
+ fn send ( & mut self , py : Python , locals : TaskLocals , item : PyObject ) -> PyResult < PyObject > ;
1466
1451
fn close ( & mut self ) -> PyResult < ( ) > ;
1467
1452
}
1468
1453
1469
- #[ allow ( dead_code ) ]
1454
+ #[ cfg ( feature = "unstable-streams" ) ]
1470
1455
struct GenericSender < R >
1471
1456
where
1472
1457
R : Runtime ,
@@ -1475,13 +1460,14 @@ where
1475
1460
tx : mpsc:: Sender < PyObject > ,
1476
1461
}
1477
1462
1463
+ #[ cfg( feature = "unstable-streams" ) ]
1478
1464
impl < R > Sender for GenericSender < R >
1479
1465
where
1480
1466
R : Runtime + ContextExt ,
1481
1467
{
1482
- fn send ( & mut self , locals : TaskLocals , item : PyObject ) -> PyResult < PyObject > {
1483
- match self . tx . try_send ( item. clone ( ) ) {
1484
- Ok ( _) => Ok ( py_true ( ) ) ,
1468
+ fn send ( & mut self , py : Python , locals : TaskLocals , item : PyObject ) -> PyResult < PyObject > {
1469
+ match self . tx . try_send ( item. clone_ref ( py ) ) {
1470
+ Ok ( _) => Ok ( true . into_py ( py ) ) ,
1485
1471
Err ( e) => {
1486
1472
if e. is_full ( ) {
1487
1473
let mut tx = self . tx . clone ( ) ;
@@ -1490,19 +1476,19 @@ where
1490
1476
future_into_py_with_locals :: < R , _ , PyObject > ( py, locals, async move {
1491
1477
if tx. flush ( ) . await . is_err ( ) {
1492
1478
// receiving side disconnected
1493
- return Ok ( py_false ( ) ) ;
1479
+ return Python :: with_gil ( |py| Ok ( false . into_py ( py ) ) ) ;
1494
1480
}
1495
1481
if tx. send ( item) . await . is_err ( ) {
1496
1482
// receiving side disconnected
1497
- return Ok ( py_false ( ) ) ;
1483
+ return Python :: with_gil ( |py| Ok ( false . into_py ( py ) ) ) ;
1498
1484
}
1499
- Ok ( py_true ( ) )
1485
+ Python :: with_gil ( |py| Ok ( true . into_py ( py ) ) )
1500
1486
} ) ?
1501
1487
. into ( ) ,
1502
1488
)
1503
1489
} )
1504
1490
} else {
1505
- Ok ( py_false ( ) )
1491
+ Ok ( false . into_py ( py ) )
1506
1492
}
1507
1493
}
1508
1494
}
@@ -1521,7 +1507,7 @@ struct SenderGlue {
1521
1507
#[ pymethods]
1522
1508
impl SenderGlue {
1523
1509
pub fn send ( & mut self , item : PyObject ) -> PyResult < PyObject > {
1524
- self . tx . send ( self . locals . clone ( ) , item)
1510
+ Python :: with_gil ( |py| self . tx . send ( py , self . locals . clone_ref ( py ) , item) )
1525
1511
}
1526
1512
pub fn close ( & mut self ) -> PyResult < ( ) > {
1527
1513
self . tx . close ( )
0 commit comments