@@ -30,7 +30,9 @@ use std::{
3030 sync:: { atomic:: AtomicU64 , Arc } ,
3131 time:: Instant ,
3232} ;
33- use tokio:: time:: sleep;
33+ use tokio:: time:: { sleep, sleep_until} ;
34+
35+ const ALLOWED_EARLY : Duration = Duration :: from_micros ( 500 ) ;
3436
3537pub struct SubmissionWorker {
3638 pub ( crate ) accounts : Vec < Arc < LocalAccount > > ,
@@ -82,31 +84,18 @@ impl SubmissionWorker {
8284 pub ( crate ) async fn run ( mut self , start_instant : Instant ) -> Vec < LocalAccount > {
8385 let mut wait_until = start_instant + self . start_sleep_duration ;
8486
85- let now = Instant :: now ( ) ;
86- if wait_until > now {
87- self . sleep_check_done ( wait_until - now) . await ;
88- }
87+ self . sleep_check_done ( wait_until) . await ;
8988 let wait_duration = Duration :: from_millis ( self . params . wait_millis ) ;
9089
9190 while !self . stop . load ( Ordering :: Relaxed ) {
92- let stats_clone = self . stats . clone ( ) ;
93- let loop_stats = stats_clone. get_cur ( ) ;
94-
9591 let loop_start_time = Instant :: now ( ) ;
96- if wait_duration. as_secs ( ) > 0
97- && loop_start_time. duration_since ( wait_until) > Duration :: from_secs ( 5 )
98- {
99- sample ! (
100- SampleRate :: Duration ( Duration :: from_secs( 5 ) ) ,
101- error!(
102- "[{:?}] txn_emitter worker drifted out of sync too much: {}s. Is expiration too short, or 5s buffer on top of it?" ,
103- self . client( ) . path_prefix_string( ) ,
104- loop_start_time. duration_since( wait_until) . as_secs( )
105- )
106- ) ;
92+
93+ if wait_duration. as_secs ( ) > 0 {
94+ self . verify_loop_start_drift ( loop_start_time, wait_until) ;
10795 }
108- // always add expected cycle duration, to not drift from expected pace.
109- wait_until += wait_duration;
96+
97+ let stats_clone = self . stats . clone ( ) ;
98+ let loop_stats = stats_clone. get_cur ( ) ;
11099
111100 let requests = self . gen_requests ( ) ;
112101 if !requests. is_empty ( ) {
@@ -175,9 +164,10 @@ impl SubmissionWorker {
175164 if self . skip_latency_stats {
176165 // we also don't want to be stuck waiting for txn_expiration_time_secs
177166 // after stop is called, so we sleep until time or stop is set.
178- self . sleep_check_done ( Duration :: from_secs (
179- self . params . txn_expiration_time_secs + 3 ,
180- ) )
167+ self . sleep_check_done (
168+ Instant :: now ( )
169+ + Duration :: from_secs ( self . params . txn_expiration_time_secs + 3 ) ,
170+ )
181171 . await
182172 }
183173
@@ -203,9 +193,11 @@ impl SubmissionWorker {
203193 . await ;
204194 }
205195
206- let now = Instant :: now ( ) ;
207- if wait_until > now {
208- self . sleep_check_done ( wait_until - now) . await ;
196+ if wait_duration. as_secs ( ) > 0 {
197+ // always add expected cycle duration, to not drift from expected pace,
198+ // irrespectively of how long our iteration lasted.
199+ wait_until += wait_duration;
200+ self . sleep_check_done ( wait_until) . await ;
209201 }
210202 }
211203
@@ -216,16 +208,63 @@ impl SubmissionWorker {
216208 }
217209
218210 // returns true if it returned early
219- async fn sleep_check_done ( & self , duration : Duration ) {
220- let start_time = Instant :: now ( ) ;
211+ async fn sleep_check_done ( & self , sleep_until_time : Instant ) {
212+ // sleep has millisecond granularity - so round the sleep
213+ let sleep_poll_interval = Duration :: from_secs ( 1 ) ;
221214 loop {
222- sleep ( Duration :: from_secs ( 1 ) ) . await ;
223215 if self . stop . load ( Ordering :: Relaxed ) {
224216 return ;
225217 }
226- if start_time. elapsed ( ) >= duration {
218+
219+ let now = Instant :: now ( ) ;
220+ if now + ALLOWED_EARLY > sleep_until_time {
227221 return ;
228222 }
223+
224+ if sleep_until_time > now + sleep_poll_interval {
225+ sleep ( sleep_poll_interval) . await ;
226+ } else {
227+ sleep_until ( sleep_until_time. into ( ) ) . await ;
228+ }
229+ }
230+ }
231+
232+ fn verify_loop_start_drift ( & self , loop_start_time : Instant , wait_until : Instant ) {
233+ if loop_start_time > wait_until {
234+ let delay_s = loop_start_time
235+ . saturating_duration_since ( wait_until)
236+ . as_secs_f32 ( ) ;
237+ if delay_s > 5.0 {
238+ sample ! (
239+ SampleRate :: Duration ( Duration :: from_secs( 2 ) ) ,
240+ error!(
241+ "[{:?}] txn_emitter worker drifted out of sync too much: {:.3}s. Is machine underprovisioned? Is expiration too short, or 5s buffer on top of it?" ,
242+ self . client( ) . path_prefix_string( ) ,
243+ delay_s,
244+ )
245+ ) ;
246+ } else if delay_s > 0.3 {
247+ sample ! (
248+ SampleRate :: Duration ( Duration :: from_secs( 5 ) ) ,
249+ error!(
250+ "[{:?}] txn_emitter worker called a bit out of sync: {:.3}s. Is machine underprovisioned? Is expiration too short, or 5s buffer on top of it?" ,
251+ self . client( ) . path_prefix_string( ) ,
252+ delay_s,
253+ )
254+ ) ;
255+ }
256+ } else {
257+ let early_s = wait_until. saturating_duration_since ( loop_start_time) ;
258+ if early_s > ALLOWED_EARLY {
259+ sample ! (
260+ SampleRate :: Duration ( Duration :: from_secs( 5 ) ) ,
261+ error!(
262+ "[{:?}] txn_emitter worker called too early: {:.3}s. There is some bug in waiting." ,
263+ self . client( ) . path_prefix_string( ) ,
264+ early_s. as_secs_f32( ) ,
265+ )
266+ ) ;
267+ }
229268 }
230269 }
231270
0 commit comments