@@ -93,47 +93,29 @@ impl IndexUpdateInfo {
9393pub struct Flow ( pub Arc < FlowContext > ) ;
9494
9595#[ pyclass]
96- pub struct FlowSynchronizer ( pub async_lock :: RwLock < execution:: FlowSynchronizer > ) ;
96+ pub struct FlowSynchronizer ( pub Arc < tokio :: sync :: RwLock < execution:: FlowSynchronizer > > ) ;
9797
9898#[ pymethods]
9999impl FlowSynchronizer {
100- pub fn join ( & self , py : Python < ' _ > ) -> PyResult < ( ) > {
101- py. allow_threads ( || {
102- let lib_context = get_lib_context ( )
103- . ok_or_else ( || PyException :: new_err ( "cocoindex library not initialized" ) ) ?;
104- lib_context
105- . runtime
106- . block_on ( async {
107- let mut synchronizer = self . 0 . write ( ) . await ;
108- synchronizer. join ( ) . await
109- } )
110- . into_py_result ( )
100+ pub fn join < ' py > ( & self , py : Python < ' py > ) -> PyResult < Bound < ' py , PyAny > > {
101+ let synchronizer = self . 0 . clone ( ) ;
102+ pyo3_async_runtimes:: tokio:: future_into_py ( py, async move {
103+ let mut synchronizer = synchronizer. write ( ) . await ;
104+ synchronizer. join ( ) . await . into_py_result ( )
111105 } )
112106 }
113107
114- pub fn abort ( & self , py : Python < ' _ > ) -> PyResult < ( ) > {
108+ pub fn abort ( & self , py : Python < ' _ > ) {
115109 py. allow_threads ( || {
116- let lib_context = get_lib_context ( )
117- . ok_or_else ( || PyException :: new_err ( "cocoindex library not initialized" ) ) ?;
118- lib_context. runtime . block_on ( async {
119- let mut synchronizer = self . 0 . write ( ) . await ;
120- synchronizer. abort ( ) ;
121- } ) ;
122- Ok ( ( ) )
110+ let mut synchronizer = self . 0 . blocking_write ( ) ;
111+ synchronizer. abort ( ) ;
123112 } )
124113 }
125114
126- pub fn index_update_info ( & self , py : Python < ' _ > ) -> PyResult < IndexUpdateInfo > {
115+ pub fn index_update_info ( & self , py : Python < ' _ > ) -> IndexUpdateInfo {
127116 py. allow_threads ( || {
128- let lib_context = get_lib_context ( )
129- . ok_or_else ( || PyException :: new_err ( "cocoindex library not initialized" ) ) ?;
130- lib_context
131- . runtime
132- . block_on ( async {
133- let synchronizer = self . 0 . read ( ) . await ;
134- anyhow:: Ok ( IndexUpdateInfo ( synchronizer. index_update_info ( ) ) )
135- } )
136- . into_py_result ( )
117+ let synchronizer = self . 0 . blocking_read ( ) ;
118+ IndexUpdateInfo ( synchronizer. index_update_info ( ) )
137119 } )
138120 }
139121}
@@ -152,25 +134,24 @@ impl Flow {
152134 & self . 0 . flow . flow_instance . name
153135 }
154136
155- pub fn update ( & self , py : Python < ' _ > ) -> PyResult < IndexUpdateInfo > {
156- py. allow_threads ( || {
137+ pub fn update < ' py > ( & self , py : Python < ' py > ) -> PyResult < Bound < ' py , PyAny > > {
138+ let flow_ctx = self . 0 . clone ( ) ;
139+ pyo3_async_runtimes:: tokio:: future_into_py ( py, async move {
157140 let lib_context = get_lib_context ( )
158141 . ok_or_else ( || PyException :: new_err ( "cocoindex library not initialized" ) ) ?;
159- let update_info = lib_context
160- . runtime
161- . block_on ( async {
162- let mut synchronizer = execution:: FlowSynchronizer :: start (
163- self . 0 . clone ( ) ,
164- & lib_context. pool ,
165- & execution:: FlowSynchronizerOptions {
166- keep_refreshed : false ,
167- } ,
168- )
169- . await ?;
170- synchronizer. join ( ) . await ?;
171- anyhow:: Ok ( synchronizer. index_update_info ( ) )
172- } )
142+ let update_info = {
143+ let mut synchronizer = execution:: FlowSynchronizer :: start (
144+ flow_ctx,
145+ & lib_context. pool ,
146+ & execution:: FlowSynchronizerOptions {
147+ keep_refreshed : false ,
148+ } ,
149+ )
150+ . await
173151 . into_py_result ( ) ?;
152+ synchronizer. join ( ) . await . into_py_result ( ) ?;
153+ synchronizer. index_update_info ( )
154+ } ;
174155 Ok ( IndexUpdateInfo ( update_info) )
175156 } )
176157 }
@@ -193,7 +174,9 @@ impl Flow {
193174 anyhow:: Ok ( synchronizer)
194175 } )
195176 . into_py_result ( ) ?;
196- Ok ( FlowSynchronizer ( async_lock:: RwLock :: new ( synchronizer) ) )
177+ Ok ( FlowSynchronizer ( Arc :: new ( tokio:: sync:: RwLock :: new (
178+ synchronizer,
179+ ) ) ) )
197180 } )
198181 }
199182
0 commit comments