@@ -14,7 +14,10 @@ extern crate regex;
1414extern crate rust_log_analyzer as rla;
1515extern crate serde_json;
1616
17+ use crate :: server:: QueueItem ;
1718use clap:: Parser ;
19+ use crossbeam:: channel:: Sender ;
20+ use rla:: index:: IndexStorage ;
1821use std:: process;
1922use std:: sync:: Arc ;
2023use std:: thread;
@@ -47,7 +50,7 @@ struct Cli {
4750 long = "index-file" ,
4851 help = "The index file to read / write. An existing index file is updated."
4952 ) ]
50- index_file : std :: path :: PathBuf ,
53+ index_file : IndexStorage ,
5154 #[ arg(
5255 long = "debug-post" ,
5356 help = "Post all comments to the given issue instead of the actual PR. Format: \" user/repo#id\" "
@@ -90,7 +93,10 @@ fn main() {
9093
9194 let ( queue_send, queue_recv) = crossbeam:: channel:: unbounded ( ) ;
9295
93- let service = Arc :: new ( server:: RlaService :: new ( args. webhook_verify , queue_send) ?) ;
96+ let service = Arc :: new ( server:: RlaService :: new (
97+ args. webhook_verify ,
98+ queue_send. clone ( ) ,
99+ ) ?) ;
94100
95101 let mut worker = server:: Worker :: new (
96102 args. index_file ,
@@ -102,7 +108,7 @@ fn main() {
102108 args. query_builds_from_primary_repo ,
103109 ) ?;
104110
105- thread:: spawn ( move || {
111+ let worker_thread = thread:: spawn ( move || {
106112 if let Err ( e) = worker. main ( ) {
107113 error ! ( "Worker failed, exiting: {}" , e) ;
108114 process:: exit ( 1 ) ;
@@ -125,9 +131,34 @@ fn main() {
125131 } ) )
126132 }
127133 } ) )
134+ . with_graceful_shutdown ( graceful_shutdown ( queue_send) )
128135 . await
129136 } ) ?;
130137
138+ worker_thread. join ( ) . expect ( "worker thread failed" ) ;
139+
131140 Ok ( ( ) )
132141 } ) ;
133142}
143+
144+ async fn graceful_shutdown ( sender : Sender < QueueItem > ) {
145+ let ctrl_c = tokio:: signal:: ctrl_c ( ) ;
146+
147+ // ECS uses SIGTERM to signal graceful shutdown must begin.
148+ #[ cfg( unix) ]
149+ let mut sigterm =
150+ tokio:: signal:: unix:: signal ( tokio:: signal:: unix:: SignalKind :: terminate ( ) ) . unwrap ( ) ;
151+ #[ cfg( unix) ]
152+ let sigterm = sigterm. recv ( ) ;
153+
154+ #[ cfg( not( unix) ) ]
155+ let sigterm = std:: future:: pending ( ) ; // Never resolves
156+
157+ tokio:: select! {
158+ _ = ctrl_c => { }
159+ _ = sigterm => { }
160+ } ;
161+
162+ info ! ( "graceful shutdown signal received" ) ;
163+ let _ = sender. send ( QueueItem :: GracefulShutdown ) ;
164+ }
0 commit comments