@@ -3,7 +3,6 @@ use crate::prelude::*;
33use super :: stats;
44use futures:: future:: try_join_all;
55use sqlx:: PgPool ;
6- use std:: time:: Instant ;
76use tokio:: { task:: JoinSet , time:: MissedTickBehavior } ;
87
98pub struct FlowLiveUpdater {
@@ -22,12 +21,6 @@ pub struct FlowLiveUpdaterOptions {
2221 pub print_stats : bool ,
2322}
2423
25- struct StatsReportState {
26- last_report_time : Option < Instant > ,
27- last_stats : stats:: UpdateStats ,
28- }
29-
30- const MIN_REPORT_INTERVAL : std:: time:: Duration = std:: time:: Duration :: from_secs ( 5 ) ;
3124const REPORT_INTERVAL : std:: time:: Duration = std:: time:: Duration :: from_secs ( 10 ) ;
3225
3326struct SharedAckFn {
@@ -74,32 +67,18 @@ async fn update_source(
7467
7568 let import_op = & plan. import_ops [ source_idx] ;
7669
77- let stats_report_state = Mutex :: new ( StatsReportState {
78- last_report_time : None ,
79- last_stats : source_update_stats. as_ref ( ) . clone ( ) ,
80- } ) ;
81- let report_stats = || {
82- let new_stats = source_update_stats. as_ref ( ) . clone ( ) ;
83- let now = Instant :: now ( ) ;
84- let delta = {
85- let mut state = stats_report_state. lock ( ) . unwrap ( ) ;
86- if let Some ( last_report_time) = state. last_report_time {
87- if now. duration_since ( last_report_time) < MIN_REPORT_INTERVAL {
88- return ;
89- }
90- }
91- let delta = new_stats. delta ( & state. last_stats ) ;
92- if delta. is_zero ( ) {
93- return ;
94- }
95- state. last_stats = new_stats;
96- state. last_report_time = Some ( now) ;
97- delta
98- } ;
70+ let report_stats = |stats : & stats:: UpdateStats , kind : & str | {
71+ source_update_stats. merge ( stats) ;
9972 if options. print_stats {
100- println ! ( "{}.{}: {}" , flow. flow_instance. name, import_op. name, delta) ;
73+ println ! (
74+ "{}.{} ({kind}): {}" ,
75+ flow. flow_instance. name, import_op. name, stats
76+ ) ;
10177 } else {
102- trace ! ( "{}.{}: {}" , flow. flow_instance. name, import_op. name, delta) ;
78+ trace ! (
79+ "{}.{} ({kind}): {}" ,
80+ flow. flow_instance. name, import_op. name, stats
81+ ) ;
10382 }
10483 } ;
10584
@@ -108,68 +87,78 @@ async fn update_source(
10887 // Deal with change streams.
10988 if options. live_mode {
11089 if let Some ( change_stream) = import_op. executor . change_stream ( ) . await ? {
111- let pool = pool. clone ( ) ;
112- let source_update_stats = source_update_stats. clone ( ) ;
90+ let change_stream_stats = Arc :: new ( stats:: UpdateStats :: default ( ) ) ;
11391 futs. push (
114- async move {
115- let mut change_stream = change_stream;
116- let retry_options = retryable:: RetryOptions {
117- max_retries : None ,
118- initial_backoff : std:: time:: Duration :: from_secs ( 5 ) ,
119- max_backoff : std:: time:: Duration :: from_secs ( 60 ) ,
120- } ;
121- loop {
122- // Workaround as AsyncFnMut isn't mature yet.
123- // Should be changed to use AsyncFnMut once it is.
124- let change_stream = tokio:: sync:: Mutex :: new ( & mut change_stream) ;
125- let change_msg = retryable:: run (
126- || async {
127- let mut change_stream = change_stream. lock ( ) . await ;
128- change_stream
129- . next ( )
130- . await
131- . transpose ( )
132- . map_err ( retryable:: Error :: always_retryable)
133- } ,
134- & retry_options,
135- )
136- . await ?;
137- let change_msg = if let Some ( change_msg) = change_msg {
138- change_msg
139- } else {
140- break ;
92+ {
93+ let change_stream_stats = change_stream_stats. clone ( ) ;
94+ let pool = pool. clone ( ) ;
95+ async move {
96+ let mut change_stream = change_stream;
97+ let retry_options = retryable:: RetryOptions {
98+ max_retries : None ,
99+ initial_backoff : std:: time:: Duration :: from_secs ( 5 ) ,
100+ max_backoff : std:: time:: Duration :: from_secs ( 60 ) ,
141101 } ;
142- let ack_fn = change_msg. ack_fn . map ( |ack_fn| {
143- Arc :: new ( Mutex :: new ( SharedAckFn :: new (
144- change_msg. changes . iter ( ) . len ( ) ,
145- ack_fn,
146- ) ) )
147- } ) ;
148- for change in change_msg. changes {
149- let ack_fn = ack_fn. clone ( ) ;
150- tokio:: spawn ( source_context. clone ( ) . process_source_key (
151- change. key ,
152- change. data ,
153- source_update_stats. clone ( ) ,
154- ack_fn. map ( |ack_fn| {
155- move || async move { SharedAckFn :: ack ( & ack_fn) . await }
156- } ) ,
157- pool. clone ( ) ,
158- ) ) ;
102+ loop {
103+ // Workaround as AsyncFnMut isn't mature yet.
104+ // Should be changed to use AsyncFnMut once it is.
105+ let change_stream = tokio:: sync:: Mutex :: new ( & mut change_stream) ;
106+ let change_msg = retryable:: run (
107+ || async {
108+ let mut change_stream = change_stream. lock ( ) . await ;
109+ change_stream
110+ . next ( )
111+ . await
112+ . transpose ( )
113+ . map_err ( retryable:: Error :: always_retryable)
114+ } ,
115+ & retry_options,
116+ )
117+ . await ?;
118+ let change_msg = if let Some ( change_msg) = change_msg {
119+ change_msg
120+ } else {
121+ break ;
122+ } ;
123+ let ack_fn = change_msg. ack_fn . map ( |ack_fn| {
124+ Arc :: new ( Mutex :: new ( SharedAckFn :: new (
125+ change_msg. changes . iter ( ) . len ( ) ,
126+ ack_fn,
127+ ) ) )
128+ } ) ;
129+ for change in change_msg. changes {
130+ let ack_fn = ack_fn. clone ( ) ;
131+ tokio:: spawn ( source_context. clone ( ) . process_source_key (
132+ change. key ,
133+ change. data ,
134+ change_stream_stats. clone ( ) ,
135+ ack_fn. map ( |ack_fn| {
136+ move || async move { SharedAckFn :: ack ( & ack_fn) . await }
137+ } ) ,
138+ pool. clone ( ) ,
139+ ) ) ;
140+ }
159141 }
142+ Ok ( ( ) )
160143 }
161- Ok ( ( ) )
162144 }
163145 . boxed ( ) ,
164146 ) ;
147+
165148 futs. push (
166149 async move {
167150 let mut interval = tokio:: time:: interval ( REPORT_INTERVAL ) ;
151+ let mut last_change_stream_stats = change_stream_stats. as_ref ( ) . clone ( ) ;
168152 interval. set_missed_tick_behavior ( MissedTickBehavior :: Delay ) ;
169153 interval. tick ( ) . await ;
170154 loop {
171155 interval. tick ( ) . await ;
172- report_stats ( ) ;
156+ let curr_change_stream_stats = change_stream_stats. as_ref ( ) . clone ( ) ;
157+ let delta = curr_change_stream_stats. delta ( & last_change_stream_stats) ;
158+ if !delta. has_any_change ( ) {
159+ report_stats ( & delta, "change stream" ) ;
160+ last_change_stream_stats = curr_change_stream_stats;
161+ }
173162 }
174163 }
175164 . boxed ( ) ,
@@ -178,11 +167,11 @@ async fn update_source(
178167 }
179168
180169 // The main update loop.
181- let source_update_stats = source_update_stats. clone ( ) ;
182170 futs. push (
183171 async move {
184- source_context. update ( & pool, & source_update_stats) . await ?;
185- report_stats ( ) ;
172+ let update_stats = Arc :: new ( stats:: UpdateStats :: default ( ) ) ;
173+ source_context. update ( & pool, & update_stats) . await ?;
174+ report_stats ( & update_stats, "batch update" ) ;
186175
187176 if let ( true , Some ( refresh_interval) ) = (
188177 options. live_mode ,
@@ -193,8 +182,10 @@ async fn update_source(
193182 interval. tick ( ) . await ;
194183 loop {
195184 interval. tick ( ) . await ;
196- source_context. update ( & pool, & source_update_stats) . await ?;
197- report_stats ( ) ;
185+
186+ let update_stats = Arc :: new ( stats:: UpdateStats :: default ( ) ) ;
187+ source_context. update ( & pool, & update_stats) . await ?;
188+ report_stats ( & update_stats, "interval refresh" ) ;
198189 }
199190 }
200191 Ok ( ( ) )
0 commit comments