11use super :: { ErrorReport , Event , ParallelReporter , ProgressReport , Reporter , Size } ;
2- use pipe_trait :: Pipe ;
2+ use progress_report_state :: ProgressReportState ;
33use std:: {
44 any:: Any ,
5- sync:: { Arc , RwLock } ,
5+ marker:: PhantomData ,
6+ ops:: ControlFlow ,
7+ sync:: { atomic:: Ordering :: Relaxed , Arc } ,
68 thread:: { sleep, spawn, JoinHandle } ,
79 time:: Duration ,
810} ;
@@ -15,19 +17,23 @@ pub struct ProgressAndErrorReporter<Data, ReportError>
1517where
1618 Data : Size + Send + Sync ,
1719 ReportError : Fn ( ErrorReport ) + Sync ,
20+ u64 : Into < Data > ,
1821{
1922 /// Progress information.
20- progress : Arc < RwLock < Option < ProgressReport < Data > > > > ,
23+ progress : Arc < ProgressReportState > ,
2124 /// Report encountered error.
2225 report_error : ReportError ,
2326 /// Join handle of progress reporting thread.
2427 progress_reporter_handle : JoinHandle < ( ) > ,
28+ /// Keep generic parameters.
29+ _phantom : PhantomData < Data > ,
2530}
2631
2732impl < Data , ReportError > ProgressAndErrorReporter < Data , ReportError >
2833where
2934 Data : Size + Send + Sync ,
3035 ReportError : Fn ( ErrorReport ) + Sync ,
36+ u64 : Into < Data > ,
3137{
3238 /// Create a new [`ProgressAndErrorReporter`] from a report function.
3339 pub fn new < ReportProgress > (
@@ -39,41 +45,36 @@ where
3945 ProgressReport < Data > : Default + ' static ,
4046 ReportProgress : Fn ( ProgressReport < Data > ) + Send + Sync + ' static ,
4147 {
42- let progress = ProgressReport :: default ( )
43- . pipe ( Some )
44- . pipe ( RwLock :: new)
45- . pipe ( Arc :: new) ;
48+ let progress = Arc :: new ( ProgressReportState :: default ( ) ) ;
4649 let progress_thread = progress. clone ( ) ;
4750 let progress_reporter_handle = spawn ( move || loop {
4851 sleep ( progress_report_interval) ;
49- if let Ok ( progress) = progress_thread. read ( ) . as_deref ( ) {
50- if let Some ( progress) = * progress {
51- report_progress ( progress) ;
52- } else {
53- break ;
54- }
55- }
52+ match progress_thread. to_progress_report ( ) {
53+ ControlFlow :: Continue ( progress) => report_progress ( progress) ,
54+ ControlFlow :: Break ( ( ) ) => break ,
55+ } ;
5656 } ) ;
5757 ProgressAndErrorReporter {
5858 progress,
5959 report_error,
6060 progress_reporter_handle,
61+ _phantom : PhantomData ,
6162 }
6263 }
6364
6465 /// Stop the thread that reports progress.
6566 ///
6667 /// This function would be automatically invoked once the value is [dropped](Drop).
6768 pub fn stop_progress_reporter ( & self ) {
68- let mut progress = self . progress . write ( ) . expect ( "lock progress to stop" ) ;
69- * progress = None ;
69+ self . progress . stopped . store ( true , Relaxed ) ;
7070 }
7171}
7272
7373impl < Data , ReportError > Reporter < Data > for ProgressAndErrorReporter < Data , ReportError >
7474where
75- Data : Size + Send + Sync ,
75+ Data : Size + Into < u64 > + Send + Sync ,
7676 ReportError : Fn ( ErrorReport ) + Sync ,
77+ u64 : Into < Data > ,
7778{
7879 fn report ( & self , event : Event < Data > ) {
7980 use Event :: * ;
@@ -82,38 +83,35 @@ where
8283 report_error,
8384 ..
8485 } = self ;
85- macro_rules! handle_field {
86- ( $( $field: ident $operator: tt $addend: expr; ) +) => {
87- if let Some ( progress) = progress. write( ) . ok( ) . as_mut( ) . and_then( |x| x. as_mut( ) ) {
88- $( progress. $field $operator $addend; ) +
89- }
86+ macro_rules! bump {
87+ ( $field: ident += $delta: expr) => {
88+ progress. $field. fetch_add( $delta, Relaxed )
9089 } ;
9190 }
9291 match event {
9392 ReceiveData ( data) => {
94- handle_field ! {
95- items += 1 ;
96- total += data;
97- }
93+ bump ! ( items += 1 ) ;
94+ bump ! ( total += data. into( ) ) ;
9895 }
9996 EncounterError ( error_report) => {
10097 report_error ( error_report) ;
101- handle_field ! {
102- errors += 1 ;
103- }
98+ bump ! ( errors += 1 ) ;
10499 }
105100 }
106101 }
107102}
108103
109104impl < Data , ReportError > ParallelReporter < Data > for ProgressAndErrorReporter < Data , ReportError >
110105where
111- Data : Size + Send + Sync ,
106+ Data : Size + Into < u64 > + Send + Sync ,
112107 ReportError : Fn ( ErrorReport ) + Sync ,
108+ u64 : Into < Data > ,
113109{
114110 type DestructionError = Box < dyn Any + Send + ' static > ;
115111 fn destroy ( self ) -> Result < ( ) , Self :: DestructionError > {
116112 self . stop_progress_reporter ( ) ;
117113 self . progress_reporter_handle . join ( )
118114 }
119115}
116+
117+ mod progress_report_state;
0 commit comments