1515
1616use std:: {
1717 future:: Future ,
18- marker:: PhantomData ,
1918 pin:: Pin ,
2019 sync:: { Arc , Mutex } ,
2120 task:: { Context , Poll } ,
2221} ;
2322
24- use futures:: {
25- channel:: { mpsc, oneshot} ,
26- SinkExt ,
27- } ;
28- use once_cell:: sync:: OnceCell ;
29- use pin_project_lite:: pin_project;
30- use pyo3:: prelude:: * ;
31-
3223use crate :: {
3324 asyncio, call_soon_threadsafe, close, create_future, dump_err, err:: RustPanic ,
3425 get_running_loop, into_future_with_locals, TaskLocals ,
3526} ;
27+ use futures:: channel:: oneshot;
28+ #[ cfg( feature = "unstable-streams" ) ]
29+ use futures:: { channel:: mpsc, SinkExt } ;
30+ #[ cfg( feature = "unstable-streams" ) ]
31+ use once_cell:: sync:: OnceCell ;
32+ use pin_project_lite:: pin_project;
33+ use pyo3:: prelude:: * ;
34+ #[ cfg( feature = "unstable-streams" ) ]
35+ use std:: marker:: PhantomData ;
3636
3737/// Generic utilities for a JoinError
3838pub trait JoinError {
@@ -585,7 +585,7 @@ where
585585{
586586 let ( cancel_tx, cancel_rx) = oneshot:: channel ( ) ;
587587
588- let py_fut = create_future ( locals. event_loop . clone ( ) . into_bound ( py ) ) ?;
588+ let py_fut = create_future ( locals. event_loop . bind ( py ) . clone ( ) ) ?;
589589 py_fut. call_method1 (
590590 "add_done_callback" ,
591591 ( PyDoneCallback {
@@ -594,14 +594,14 @@ where
594594 ) ?;
595595
596596 let future_tx1 = PyObject :: from ( py_fut. clone ( ) ) ;
597- let future_tx2 = future_tx1. clone ( ) ;
597+ let future_tx2 = future_tx1. clone_ref ( py ) ;
598598
599599 R :: spawn ( async move {
600- let locals2 = locals. clone ( ) ;
600+ let locals2 = Python :: with_gil ( |py| locals. clone_ref ( py ) ) ;
601601
602602 if let Err ( e) = R :: spawn ( async move {
603603 let result = R :: scope (
604- locals2. clone ( ) ,
604+ Python :: with_gil ( |py| locals2. clone_ref ( py ) ) ,
605605 Cancellable :: new_with_cancel_rx ( fut, cancel_rx) ,
606606 )
607607 . await ;
@@ -990,7 +990,7 @@ where
990990{
991991 let ( cancel_tx, cancel_rx) = oneshot:: channel ( ) ;
992992
993- let py_fut = create_future ( locals. event_loop . clone ( ) . into_bound ( py) ) ?;
993+ let py_fut = create_future ( locals. event_loop . clone_ref ( py ) . into_bound ( py) ) ?;
994994 py_fut. call_method1 (
995995 "add_done_callback" ,
996996 ( PyDoneCallback {
@@ -999,14 +999,14 @@ where
999999 ) ?;
10001000
10011001 let future_tx1 = PyObject :: from ( py_fut. clone ( ) ) ;
1002- let future_tx2 = future_tx1. clone ( ) ;
1002+ let future_tx2 = future_tx1. clone_ref ( py ) ;
10031003
10041004 R :: spawn_local ( async move {
1005- let locals2 = locals. clone ( ) ;
1005+ let locals2 = Python :: with_gil ( |py| locals. clone_ref ( py ) ) ;
10061006
10071007 if let Err ( e) = R :: spawn_local ( async move {
10081008 let result = R :: scope_local (
1009- locals2. clone ( ) ,
1009+ Python :: with_gil ( |py| locals2. clone_ref ( py ) ) ,
10101010 Cancellable :: new_with_cancel_rx ( fut, cancel_rx) ,
10111011 )
10121012 . await ;
@@ -1446,27 +1446,12 @@ where
14461446 into_stream_with_locals_v1 :: < R > ( get_current_locals :: < R > ( gen. py ( ) ) ?, gen)
14471447}
14481448
1449- #[ allow( dead_code) ]
1450- fn py_true ( ) -> PyObject {
1451- static TRUE : OnceCell < PyObject > = OnceCell :: new ( ) ;
1452- TRUE . get_or_init ( || Python :: with_gil ( |py| true . into_py ( py) ) )
1453- . clone ( )
1454- }
1455-
1456- #[ allow( dead_code) ]
1457- fn py_false ( ) -> PyObject {
1458- static FALSE : OnceCell < PyObject > = OnceCell :: new ( ) ;
1459- FALSE
1460- . get_or_init ( || Python :: with_gil ( |py| false . into_py ( py) ) )
1461- . clone ( )
1462- }
1463-
14641449trait Sender : Send + ' static {
1465- fn send ( & mut self , locals : TaskLocals , item : PyObject ) -> PyResult < PyObject > ;
1450+ fn send ( & mut self , py : Python , locals : TaskLocals , item : PyObject ) -> PyResult < PyObject > ;
14661451 fn close ( & mut self ) -> PyResult < ( ) > ;
14671452}
14681453
1469- #[ allow ( dead_code ) ]
1454+ #[ cfg ( feature = "unstable-streams" ) ]
14701455struct GenericSender < R >
14711456where
14721457 R : Runtime ,
@@ -1475,13 +1460,14 @@ where
14751460 tx : mpsc:: Sender < PyObject > ,
14761461}
14771462
1463+ #[ cfg( feature = "unstable-streams" ) ]
14781464impl < R > Sender for GenericSender < R >
14791465where
14801466 R : Runtime + ContextExt ,
14811467{
1482- fn send ( & mut self , locals : TaskLocals , item : PyObject ) -> PyResult < PyObject > {
1483- match self . tx . try_send ( item. clone ( ) ) {
1484- Ok ( _) => Ok ( py_true ( ) ) ,
1468+ fn send ( & mut self , py : Python , locals : TaskLocals , item : PyObject ) -> PyResult < PyObject > {
1469+ match self . tx . try_send ( item. clone_ref ( py ) ) {
1470+ Ok ( _) => Ok ( true . into_py ( py ) ) ,
14851471 Err ( e) => {
14861472 if e. is_full ( ) {
14871473 let mut tx = self . tx . clone ( ) ;
@@ -1490,19 +1476,19 @@ where
14901476 future_into_py_with_locals :: < R , _ , PyObject > ( py, locals, async move {
14911477 if tx. flush ( ) . await . is_err ( ) {
14921478 // receiving side disconnected
1493- return Ok ( py_false ( ) ) ;
1479+ return Python :: with_gil ( |py| Ok ( false . into_py ( py ) ) ) ;
14941480 }
14951481 if tx. send ( item) . await . is_err ( ) {
14961482 // receiving side disconnected
1497- return Ok ( py_false ( ) ) ;
1483+ return Python :: with_gil ( |py| Ok ( false . into_py ( py ) ) ) ;
14981484 }
1499- Ok ( py_true ( ) )
1485+ Python :: with_gil ( |py| Ok ( true . into_py ( py ) ) )
15001486 } ) ?
15011487 . into ( ) ,
15021488 )
15031489 } )
15041490 } else {
1505- Ok ( py_false ( ) )
1491+ Ok ( false . into_py ( py ) )
15061492 }
15071493 }
15081494 }
@@ -1521,7 +1507,7 @@ struct SenderGlue {
15211507#[ pymethods]
15221508impl SenderGlue {
15231509 pub fn send ( & mut self , item : PyObject ) -> PyResult < PyObject > {
1524- self . tx . send ( self . locals . clone ( ) , item)
1510+ Python :: with_gil ( |py| self . tx . send ( py , self . locals . clone_ref ( py ) , item) )
15251511 }
15261512 pub fn close ( & mut self ) -> PyResult < ( ) > {
15271513 self . tx . close ( )
0 commit comments