@@ -18,7 +18,6 @@ use std::process;
18
18
use std:: process:: Command ;
19
19
use std:: time:: Duration ;
20
20
use thiserror:: Error ;
21
- use tokio:: runtime:: Handle ;
22
21
use tokio_cron_scheduler:: { Job , JobScheduler } ;
23
22
24
23
#[ allow( dead_code) ]
@@ -59,7 +58,7 @@ async fn main() -> Result<(), anyhow::Error> {
59
58
60
59
env_logger:: Builder :: from_env ( Env :: default ( ) . default_filter_or ( "info" ) ) . init ( ) ;
61
60
let host_dir = env:: var ( "HOST_DIR" ) . unwrap_or_else ( |_| DEFAULT_BASE_DIR . to_string ( ) ) ;
62
- let core_dir = env:: var ( "CORE_DIR" ) . unwrap_or_else ( |_| DEFAULT_CORE_DIR . to_string ( ) ) ;
61
+ let core_dir_command = env:: var ( "CORE_DIR" ) . unwrap_or_else ( |_| DEFAULT_CORE_DIR . to_string ( ) ) ;
63
62
let suid = env:: var ( "SUID_DUMPABLE" ) . unwrap_or_else ( |_| DEFAULT_SUID_DUMPABLE . to_string ( ) ) ;
64
63
let deploy_crio_config = env:: var ( "DEPLOY_CRIO_CONFIG" )
65
64
. unwrap_or_else ( |_| "false" . to_string ( ) )
@@ -94,9 +93,8 @@ async fn main() -> Result<(), anyhow::Error> {
94
93
info ! ( "Uploading {}" , file) ;
95
94
process_file ( p, & bucket) . await ;
96
95
} else {
97
- let core_store = core_dir. clone ( ) ;
98
- info ! ( "Uploading all content in {}" , core_store) ;
99
- run_polling_agent ( core_store. as_str ( ) ) . await ;
96
+ info ! ( "Uploading all content in {}" , core_dir_command) ;
97
+ run_polling_agent ( ) . await ;
100
98
}
101
99
process:: exit ( 0 ) ;
102
100
}
@@ -119,7 +117,7 @@ async fn main() -> Result<(), anyhow::Error> {
119
117
format ! ( "{}/core_pattern.bak" , host_location) . as_str ( ) ,
120
118
format ! (
121
119
"|{}/{} -c=%c -e=%e -p=%p -s=%s -t=%t -d={} -h=%h -E=%E" ,
122
- host_location, CDC_NAME , core_dir
120
+ host_location, CDC_NAME , core_dir_command
123
121
)
124
122
. as_str ( ) ,
125
123
) ?;
@@ -135,8 +133,6 @@ async fn main() -> Result<(), anyhow::Error> {
135
133
& suid,
136
134
) ?;
137
135
138
- let core_location = core_dir. clone ( ) ;
139
-
140
136
create_env_file ( host_location) ?;
141
137
// Run polling agent on startup to clean up files.
142
138
@@ -155,7 +151,7 @@ async fn main() -> Result<(), anyhow::Error> {
155
151
std:: thread:: sleep ( Duration :: from_millis ( 1000 ) ) ;
156
152
}
157
153
} else {
158
- run_polling_agent ( core_location . as_str ( ) ) . await ;
154
+ run_polling_agent ( ) . await ;
159
155
}
160
156
161
157
if !interval. is_empty ( ) && !schedule. is_empty ( ) {
@@ -180,7 +176,6 @@ async fn main() -> Result<(), anyhow::Error> {
180
176
}
181
177
}
182
178
183
- let notify_location = core_location. clone ( ) ;
184
179
if !schedule. is_empty ( ) {
185
180
info ! ( "Schedule Initialising with: {}" , schedule) ;
186
181
let sched = match JobScheduler :: new ( ) . await {
@@ -190,12 +185,18 @@ async fn main() -> Result<(), anyhow::Error> {
190
185
panic ! ( "Schedule Creation Failed with {}" , e)
191
186
}
192
187
} ;
193
- let s_job = match Job :: new ( schedule. as_str ( ) , move |_uuid, _l| {
194
- let handle = Handle :: current ( ) ;
195
- let core_str = core_location. clone ( ) ;
196
- handle. spawn ( async move {
197
- run_polling_agent ( & core_str) . await ;
198
- } ) ;
188
+
189
+ let s_job = match Job :: new_async ( schedule. as_str ( ) , move |uuid, mut l| {
190
+ Box :: pin ( async move {
191
+ let next_tick = l. next_tick_for_job ( uuid) . await ;
192
+ match next_tick {
193
+ Ok ( Some ( ts) ) => {
194
+ info ! ( "Next scheduled run {:?}" , ts) ;
195
+ run_polling_agent ( ) . await ;
196
+ }
197
+ _ => warn ! ( "Could not get next tick for job" ) ,
198
+ }
199
+ } )
199
200
} ) {
200
201
Ok ( v) => v,
201
202
Err ( e) => {
@@ -218,6 +219,9 @@ async fn main() -> Result<(), anyhow::Error> {
218
219
panic ! ( "Schedule Start failed, {:#?}" , e) ;
219
220
}
220
221
} ;
222
+ loop {
223
+ std:: thread:: sleep ( Duration :: from_millis ( 100 ) ) ;
224
+ }
221
225
}
222
226
223
227
if use_inotify == "true" {
@@ -231,14 +235,14 @@ async fn main() -> Result<(), anyhow::Error> {
231
235
}
232
236
} ;
233
237
info ! ( "INotify Initialised..." ) ;
234
- match inotify. add_watch ( & notify_location , WatchMask :: CLOSE ) {
238
+ match inotify. add_watch ( & core_dir_command , WatchMask :: CLOSE ) {
235
239
Ok ( _) => { }
236
240
Err ( e) => {
237
241
error ! ( "Add watch failed: {}" , e) ;
238
242
panic ! ( "Add watch failed: {}" , e)
239
243
}
240
244
} ;
241
- info ! ( "INotify watching : {}" , notify_location ) ;
245
+ info ! ( "INotify watching : {}" , core_dir_command ) ;
242
246
let mut buffer = [ 0 ; 4096 ] ;
243
247
loop {
244
248
let events = match inotify. read_events_blocking ( & mut buffer) {
@@ -264,7 +268,7 @@ async fn main() -> Result<(), anyhow::Error> {
264
268
Some ( s) => {
265
269
let file = format ! (
266
270
"{}/{}" ,
267
- notify_location ,
271
+ core_dir_command ,
268
272
s. to_str( ) . unwrap_or_default( )
269
273
) ;
270
274
let p = Path :: new ( & file) ;
@@ -389,7 +393,8 @@ fn get_bucket() -> Result<Bucket, anyhow::Error> {
389
393
Ok ( Bucket :: new ( & s3. bucket , s3. region , s3. credentials ) ?. with_path_style ( ) )
390
394
}
391
395
392
- async fn run_polling_agent ( core_location : & str ) {
396
+ async fn run_polling_agent ( ) {
397
+ let core_location = env:: var ( "CORE_DIR" ) . unwrap_or_else ( |_| DEFAULT_CORE_DIR . to_string ( ) ) ;
393
398
info ! ( "Executing Agent with location : {}" , core_location) ;
394
399
395
400
let bucket = match get_bucket ( ) {
0 commit comments