1
- use std:: { collections:: BTreeMap , sync:: Arc , time:: Duration } ;
1
+ use std:: {
2
+ collections:: BTreeMap ,
3
+ sync:: {
4
+ mpsc:: { sync_channel, SyncSender } ,
5
+ Arc ,
6
+ } ,
7
+ thread:: spawn,
8
+ time:: Duration ,
9
+ } ;
2
10
3
11
use anyhow:: { Context , Result } ;
4
12
use indicatif:: ProgressBar ;
@@ -10,20 +18,38 @@ use upon::{Engine, Value};
10
18
pub struct ProgressHandler {
11
19
engine : Engine < ' static > ,
12
20
template : String ,
13
- callback : Arc < Py < PyAny > > ,
14
21
rate : Duration ,
15
22
n_cores : usize ,
23
+ updates : SyncSender < String > ,
16
24
}
17
25
18
26
impl ProgressHandler {
19
27
pub fn new ( callback : Arc < Py < PyAny > > , rate : Duration , template : String , n_cores : usize ) -> Self {
20
28
let engine = Engine :: new ( ) ;
29
+
30
+ let ( update_tx, update_rx) = sync_channel ( 1 ) ;
31
+
32
+ spawn ( move || {
33
+ Python :: with_gil ( move |py| {
34
+ py. allow_threads ( move || {
35
+ let update = update_rx. recv ( ) ;
36
+ let Ok ( update) = update else {
37
+ return ;
38
+ } ;
39
+ let res = Python :: with_gil ( |py| callback. call1 ( py, ( update, ) ) ) ;
40
+ if let Err ( err) = res {
41
+ eprintln ! ( "Error in progress callback: {err}" ) ;
42
+ }
43
+ } ) ;
44
+ } ) ;
45
+ } ) ;
46
+
21
47
Self {
22
48
engine,
23
- callback,
24
49
rate,
25
50
template,
26
51
n_cores,
52
+ updates : update_tx,
27
53
}
28
54
}
29
55
@@ -50,7 +76,10 @@ impl ProgressHandler {
50
76
progress_to_value ( progress_update_count, self . n_cores , time_sampling, progress) ;
51
77
let rendered = template. render_from ( & self . engine , & progress) . to_string ( ) ;
52
78
let rendered = rendered. unwrap_or_else ( |err| format ! ( "{err}" ) ) ;
53
- let _ = Python :: with_gil ( |py| self . callback . call1 ( py, ( rendered, ) ) ) ;
79
+ if let Err ( e) = self . updates . send ( rendered) {
80
+ eprintln ! ( "Could not send progress update: {e}" ) ;
81
+ return ;
82
+ }
54
83
progress_update_count += 1 ;
55
84
} ;
56
85
0 commit comments