@@ -10,6 +10,7 @@ use crate::server::{self, ServerSettings};
1010use crate :: settings:: Settings ;
1111use crate :: setup;
1212use pyo3:: { exceptions:: PyException , prelude:: * } ;
13+ use pyo3_async_runtimes:: tokio:: future_into_py;
1314use std:: collections:: btree_map;
1415
1516mod convert;
@@ -82,29 +83,51 @@ impl IndexUpdateInfo {
8283pub struct Flow ( pub Arc < FlowContext > ) ;
8384
8485#[ pyclass]
85- pub struct FlowSynchronizer ( pub Arc < tokio:: sync:: RwLock < execution:: FlowSynchronizer > > ) ;
86+ pub struct FlowLiveUpdater ( pub Arc < tokio:: sync:: RwLock < execution:: FlowLiveUpdater > > ) ;
8687
8788#[ pymethods]
88- impl FlowSynchronizer {
89- pub fn join < ' py > ( & self , py : Python < ' py > ) -> PyResult < Bound < ' py , PyAny > > {
90- let synchronizer = self . 0 . clone ( ) ;
91- pyo3_async_runtimes:: tokio:: future_into_py ( py, async move {
92- let mut synchronizer = synchronizer. write ( ) . await ;
93- synchronizer. join ( ) . await . into_py_result ( )
89+ impl FlowLiveUpdater {
90+ #[ new]
91+ pub fn new (
92+ py : Python < ' _ > ,
93+ flow : & Flow ,
94+ options : Pythonized < execution:: FlowLiveUpdaterOptions > ,
95+ ) -> PyResult < Self > {
96+ py. allow_threads ( || {
97+ let live_updater = get_runtime ( )
98+ . block_on ( async {
99+ let live_updater = execution:: FlowLiveUpdater :: start (
100+ flow. 0 . clone ( ) ,
101+ & get_lib_context ( ) ?. pool ,
102+ options. into_inner ( ) ,
103+ )
104+ . await ?;
105+ anyhow:: Ok ( live_updater)
106+ } )
107+ . into_py_result ( ) ?;
108+ Ok ( Self ( Arc :: new ( tokio:: sync:: RwLock :: new ( live_updater) ) ) )
109+ } )
110+ }
111+
112+ pub fn wait < ' py > ( & self , py : Python < ' py > ) -> PyResult < Bound < ' py , PyAny > > {
113+ let live_updater = self . 0 . clone ( ) ;
114+ future_into_py ( py, async move {
115+ let mut live_updater = live_updater. write ( ) . await ;
116+ live_updater. wait ( ) . await . into_py_result ( )
94117 } )
95118 }
96119
97120 pub fn abort ( & self , py : Python < ' _ > ) {
98121 py. allow_threads ( || {
99- let mut synchronizer = self . 0 . blocking_write ( ) ;
100- synchronizer . abort ( ) ;
122+ let mut live_updater = self . 0 . blocking_write ( ) ;
123+ live_updater . abort ( ) ;
101124 } )
102125 }
103126
104127 pub fn index_update_info ( & self , py : Python < ' _ > ) -> IndexUpdateInfo {
105128 py. allow_threads ( || {
106- let synchronizer = self . 0 . blocking_read ( ) ;
107- IndexUpdateInfo ( synchronizer . index_update_info ( ) )
129+ let live_updater = self . 0 . blocking_read ( ) ;
130+ IndexUpdateInfo ( live_updater . index_update_info ( ) )
108131 } )
109132 }
110133}
@@ -123,47 +146,6 @@ impl Flow {
123146 & self . 0 . flow . flow_instance . name
124147 }
125148
126- pub fn update < ' py > ( & self , py : Python < ' py > ) -> PyResult < Bound < ' py , PyAny > > {
127- let flow_ctx = self . 0 . clone ( ) ;
128- pyo3_async_runtimes:: tokio:: future_into_py ( py, async move {
129- let update_info = {
130- let mut synchronizer = execution:: FlowSynchronizer :: start (
131- flow_ctx,
132- & get_lib_context ( ) . into_py_result ( ) ?. pool ,
133- & execution:: FlowSynchronizerOptions {
134- keep_refreshed : false ,
135- } ,
136- )
137- . await
138- . into_py_result ( ) ?;
139- synchronizer. join ( ) . await . into_py_result ( ) ?;
140- synchronizer. index_update_info ( )
141- } ;
142- Ok ( IndexUpdateInfo ( update_info) )
143- } )
144- }
145-
146- pub fn keep_in_sync ( & self , py : Python < ' _ > ) -> PyResult < FlowSynchronizer > {
147- py. allow_threads ( || {
148- let synchronizer = get_runtime ( )
149- . block_on ( async {
150- let synchronizer = execution:: FlowSynchronizer :: start (
151- self . 0 . clone ( ) ,
152- & get_lib_context ( ) ?. pool ,
153- & execution:: FlowSynchronizerOptions {
154- keep_refreshed : false ,
155- } ,
156- )
157- . await ?;
158- anyhow:: Ok ( synchronizer)
159- } )
160- . into_py_result ( ) ?;
161- Ok ( FlowSynchronizer ( Arc :: new ( tokio:: sync:: RwLock :: new (
162- synchronizer,
163- ) ) ) )
164- } )
165- }
166-
167149 pub fn evaluate_and_dump (
168150 & self ,
169151 py : Python < ' _ > ,
@@ -340,7 +322,7 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
340322 m. add_class :: < builder:: flow_builder:: DataSlice > ( ) ?;
341323 m. add_class :: < builder:: flow_builder:: DataScopeRef > ( ) ?;
342324 m. add_class :: < Flow > ( ) ?;
343- m. add_class :: < FlowSynchronizer > ( ) ?;
325+ m. add_class :: < FlowLiveUpdater > ( ) ?;
344326 m. add_class :: < TransientFlow > ( ) ?;
345327 m. add_class :: < IndexUpdateInfo > ( ) ?;
346328 m. add_class :: < SimpleSemanticsQueryHandler > ( ) ?;
0 commit comments