1- use crate :: prelude:: * ;
1+ use crate :: { execution :: stats :: UpdateStats , prelude:: * } ;
22
33use super :: stats;
44use futures:: future:: try_join_all;
@@ -159,12 +159,18 @@ impl SourceUpdateTask {
159159 break ;
160160 } ;
161161
162+ let update_stats = Arc :: new ( stats:: UpdateStats :: default ( ) ) ;
162163 let ack_fn = {
163164 let status_tx = status_tx. clone ( ) ;
165+ let update_stats = update_stats. clone ( ) ;
166+ let change_stream_stats = change_stream_stats. clone ( ) ;
164167 async move || {
165- status_tx. send_modify ( |update| {
166- update. source_updates_num [ source_idx] += 1 ;
167- } ) ;
168+ if update_stats. has_any_change ( ) {
169+ status_tx. send_modify ( |update| {
170+ update. source_updates_num [ source_idx] += 1 ;
171+ } ) ;
172+ change_stream_stats. merge ( & update_stats) ;
173+ }
168174 if let Some ( ack_fn) = change_msg. ack_fn {
169175 ack_fn ( ) . await
170176 } else {
@@ -185,7 +191,7 @@ impl SourceUpdateTask {
185191 tokio:: spawn ( source_context. clone ( ) . process_source_key (
186192 change. key ,
187193 change. data ,
188- change_stream_stats . clone ( ) ,
194+ update_stats . clone ( ) ,
189195 concur_permit,
190196 Some ( move || async move {
191197 SharedAckFn :: ack ( & shared_ack_fn) . await
@@ -203,7 +209,8 @@ impl SourceUpdateTask {
203209 futs. push (
204210 async move {
205211 let mut interval = tokio:: time:: interval ( REPORT_INTERVAL ) ;
206- let mut last_change_stream_stats = change_stream_stats. as_ref ( ) . clone ( ) ;
212+ let mut last_change_stream_stats: UpdateStats =
213+ change_stream_stats. as_ref ( ) . clone ( ) ;
207214 interval. set_missed_tick_behavior ( MissedTickBehavior :: Delay ) ;
208215 interval. tick ( ) . await ;
209216 loop {
@@ -229,9 +236,11 @@ impl SourceUpdateTask {
229236 async move {
230237 let update_stats = Arc :: new ( stats:: UpdateStats :: default ( ) ) ;
231238 source_context. update ( & pool, & update_stats) . await ?;
232- status_tx. send_modify ( |update| {
233- update. source_updates_num [ source_idx] += 1 ;
234- } ) ;
239+ if update_stats. has_any_change ( ) {
240+ status_tx. send_modify ( |update| {
241+ update. source_updates_num [ source_idx] += 1 ;
242+ } ) ;
243+ }
235244 report_stats ( & update_stats, "batch update" ) ;
236245
237246 if let ( true , Some ( refresh_interval) ) =
@@ -245,9 +254,11 @@ impl SourceUpdateTask {
245254
246255 let update_stats = Arc :: new ( stats:: UpdateStats :: default ( ) ) ;
247256 source_context. update ( & pool, & update_stats) . await ?;
248- status_tx. send_modify ( |update| {
249- update. source_updates_num [ source_idx] += 1 ;
250- } ) ;
257+ if update_stats. has_any_change ( ) {
258+ status_tx. send_modify ( |update| {
259+ update. source_updates_num [ source_idx] += 1 ;
260+ } ) ;
261+ }
251262 report_stats ( & update_stats, "interval refresh" ) ;
252263 }
253264 }
0 commit comments