@@ -18,7 +18,6 @@ use std::process;
18
18
use std:: process:: Command ;
19
19
use std:: time:: Duration ;
20
20
use thiserror:: Error ;
21
- use tokio:: runtime:: Handle ;
22
21
use tokio_cron_scheduler:: { Job , JobScheduler } ;
23
22
24
23
#[ allow( dead_code) ]
@@ -59,7 +58,7 @@ async fn main() -> Result<(), anyhow::Error> {
59
58
60
59
env_logger:: Builder :: from_env ( Env :: default ( ) . default_filter_or ( "info" ) ) . init ( ) ;
61
60
let host_dir = env:: var ( "HOST_DIR" ) . unwrap_or_else ( |_| DEFAULT_BASE_DIR . to_string ( ) ) ;
62
- let core_dir = env:: var ( "CORE_DIR" ) . unwrap_or_else ( |_| DEFAULT_CORE_DIR . to_string ( ) ) ;
61
+ let core_dir_command = env:: var ( "CORE_DIR" ) . unwrap_or_else ( |_| DEFAULT_CORE_DIR . to_string ( ) ) ;
63
62
let suid = env:: var ( "SUID_DUMPABLE" ) . unwrap_or_else ( |_| DEFAULT_SUID_DUMPABLE . to_string ( ) ) ;
64
63
let deploy_crio_config = env:: var ( "DEPLOY_CRIO_CONFIG" )
65
64
. unwrap_or_else ( |_| "false" . to_string ( ) )
@@ -94,9 +93,8 @@ async fn main() -> Result<(), anyhow::Error> {
94
93
info ! ( "Uploading {}" , file) ;
95
94
process_file ( p, & bucket) . await ;
96
95
} else {
97
- let core_store = core_dir. clone ( ) ;
98
- info ! ( "Uploading all content in {}" , core_store) ;
99
- run_polling_agent ( core_store. as_str ( ) ) . await ;
96
+ info ! ( "Uploading all content in {}" , core_dir_command) ;
97
+ run_polling_agent ( ) . await ;
100
98
}
101
99
process:: exit ( 0 ) ;
102
100
}
@@ -119,7 +117,7 @@ async fn main() -> Result<(), anyhow::Error> {
119
117
format ! ( "{}/core_pattern.bak" , host_location) . as_str ( ) ,
120
118
format ! (
121
119
"|{}/{} -c=%c -e=%e -p=%p -s=%s -t=%t -d={} -h=%h -E=%E" ,
122
- host_location, CDC_NAME , core_dir
120
+ host_location, CDC_NAME , core_dir_command
123
121
)
124
122
. as_str ( ) ,
125
123
) ?;
@@ -135,8 +133,6 @@ async fn main() -> Result<(), anyhow::Error> {
135
133
& suid,
136
134
) ?;
137
135
138
- let core_location = core_dir. clone ( ) ;
139
-
140
136
create_env_file ( host_location) ?;
141
137
// Run polling agent on startup to clean up files.
142
138
@@ -155,7 +151,7 @@ async fn main() -> Result<(), anyhow::Error> {
155
151
std:: thread:: sleep ( Duration :: from_millis ( 1000 ) ) ;
156
152
}
157
153
} else {
158
- run_polling_agent ( core_location . as_str ( ) ) . await ;
154
+ run_polling_agent ( ) . await ;
159
155
}
160
156
161
157
if !interval. is_empty ( ) && !schedule. is_empty ( ) {
@@ -180,7 +176,6 @@ async fn main() -> Result<(), anyhow::Error> {
180
176
}
181
177
}
182
178
183
- let notify_location = core_location. clone ( ) ;
184
179
if !schedule. is_empty ( ) {
185
180
info ! ( "Schedule Initialising with: {}" , schedule) ;
186
181
let sched = match JobScheduler :: new ( ) . await {
@@ -190,12 +185,11 @@ async fn main() -> Result<(), anyhow::Error> {
190
185
panic ! ( "Schedule Creation Failed with {}" , e)
191
186
}
192
187
} ;
193
- let s_job = match Job :: new ( schedule. as_str ( ) , move |_uuid, _l| {
194
- let handle = Handle :: current ( ) ;
195
- let core_str = core_location. clone ( ) ;
196
- handle. spawn ( async move {
197
- run_polling_agent ( & core_str) . await ;
198
- } ) ;
188
+
189
+ let s_job = match Job :: new_async ( schedule. as_str ( ) , move |_uuid, _l| {
190
+ Box :: pin ( async move {
191
+ run_polling_agent ( ) . await ;
192
+ } )
199
193
} ) {
200
194
Ok ( v) => v,
201
195
Err ( e) => {
@@ -231,14 +225,14 @@ async fn main() -> Result<(), anyhow::Error> {
231
225
}
232
226
} ;
233
227
info ! ( "INotify Initialised..." ) ;
234
- match inotify. add_watch ( & notify_location , WatchMask :: CLOSE ) {
228
+ match inotify. add_watch ( & core_dir_command , WatchMask :: CLOSE ) {
235
229
Ok ( _) => { }
236
230
Err ( e) => {
237
231
error ! ( "Add watch failed: {}" , e) ;
238
232
panic ! ( "Add watch failed: {}" , e)
239
233
}
240
234
} ;
241
- info ! ( "INotify watching : {}" , notify_location ) ;
235
+ info ! ( "INotify watching : {}" , core_dir_command ) ;
242
236
let mut buffer = [ 0 ; 4096 ] ;
243
237
loop {
244
238
let events = match inotify. read_events_blocking ( & mut buffer) {
@@ -264,7 +258,7 @@ async fn main() -> Result<(), anyhow::Error> {
264
258
Some ( s) => {
265
259
let file = format ! (
266
260
"{}/{}" ,
267
- notify_location ,
261
+ core_dir_command ,
268
262
s. to_str( ) . unwrap_or_default( )
269
263
) ;
270
264
let p = Path :: new ( & file) ;
@@ -389,7 +383,8 @@ fn get_bucket() -> Result<Bucket, anyhow::Error> {
389
383
Ok ( Bucket :: new ( & s3. bucket , s3. region , s3. credentials ) ?. with_path_style ( ) )
390
384
}
391
385
392
- async fn run_polling_agent ( core_location : & str ) {
386
+ async fn run_polling_agent ( ) {
387
+ let core_location = env:: var ( "CORE_DIR" ) . unwrap_or_else ( |_| DEFAULT_CORE_DIR . to_string ( ) ) ;
393
388
info ! ( "Executing Agent with location : {}" , core_location) ;
394
389
395
390
let bucket = match get_bucket ( ) {
0 commit comments