@@ -9,27 +9,24 @@ use tokio_util::sync::CancellationToken;
99use tracing:: Instrument ;
1010
1111#[ derive( Debug , Clone ) ]
12- pub ( crate ) struct Restarter {
12+ pub ( crate ) struct Restarter < Restart = KubeRestarter > {
1313 duration : Duration ,
1414 waiting : bool ,
1515 tx : Sender < usize > ,
1616 rx : Arc < Mutex < Receiver < usize > > > ,
17+ restart : Restart ,
1718}
1819
19- impl Restarter {
20- pub ( crate ) fn new ( duration : Duration ) -> Self {
21- let ( tx, rx) = mpsc:: channel ( 1 ) ;
20+ #[ derive( Debug , Clone ) ]
21+ pub ( crate ) struct KubeRestarter ;
2222
23- Self {
24- duration,
25- waiting : false ,
26- tx,
27- rx : Arc :: new ( Mutex :: new ( rx) ) ,
28- }
29- }
23+ pub ( crate ) trait Restart : Send + Sync {
24+ fn restart ( & self ) -> impl Future < Output = Result < ( ) > > + Send ;
25+ }
3026
27+ impl Restart for KubeRestarter {
3128 #[ tracing:: instrument]
32- pub ( crate ) async fn restart ( ) -> Result < ( ) > {
29+ async fn restart ( & self ) -> Result < ( ) > {
3330 let config = match Config :: incluster ( ) {
3431 Ok ( config) => config,
3532 Err ( _) => {
@@ -46,11 +43,26 @@ impl Restarter {
4643
4744 Ok ( ( ) )
4845 }
46+ }
47+
48+ impl < Restart : self :: Restart + Clone + Send + Sync + ' static > Restarter < Restart > {
49+ pub ( crate ) fn new ( duration : Duration , restart : Restart ) -> Self {
50+ let ( tx, rx) = mpsc:: channel ( 1 ) ;
51+
52+ Self {
53+ duration,
54+ waiting : false ,
55+ tx,
56+ rx : Arc :: new ( Mutex :: new ( rx) ) ,
57+ restart,
58+ }
59+ }
4960
5061 #[ tracing:: instrument( skip_all) ]
5162 pub ( crate ) async fn wait ( & self ) {
5263 let duration = self . duration ;
5364 let rx = self . rx . clone ( ) ;
65+ let restart = self . restart . clone ( ) ;
5466 let mut waiting = self . waiting ;
5567 let cancellation_token = CancellationToken :: new ( ) ;
5668
@@ -59,9 +71,13 @@ impl Restarter {
5971
6072 while let Some ( connection_count) = rx. recv ( ) . await {
6173 match ( connection_count, waiting) {
74+ ( 0 , true ) => {
75+ tracing:: error!( "unexpected connection count {connection_count} and waiting {waiting}" ) ;
76+ } ,
6277 ( 0 , false ) => {
6378 waiting = true ;
6479 let cancellation_token = cancellation_token. clone ( ) ;
80+ let restart = restart. clone ( ) ;
6581
6682 tracing:: info!( "statefulsets/voicevox is going to restart in {} secs" , duration. as_secs( ) ) ;
6783
@@ -71,7 +87,7 @@ impl Restarter {
7187 tracing:: info!( "cancelled restarting statefulsets/voicevox" ) ;
7288 } ,
7389 _ = tokio:: time:: sleep( duration) => {
74- if let Err ( err) = Self :: restart( ) . await {
90+ if let Err ( err) = restart . restart( ) . await {
7591 tracing:: error!( "failed to restart statefulsets/voicevox\n Error: {err:?}" ) ;
7692 }
7793 } ,
@@ -82,9 +98,7 @@ impl Restarter {
8298 waiting = false ;
8399 cancellation_token. cancel ( ) ;
84100 } ,
85- ( connection_count, waiting) => {
86- tracing:: error!( "unexpected connection count {connection_count} and waiting {waiting}" ) ;
87- } ,
101+ ( 1 .., false ) => ( ) ,
88102 }
89103 }
90104 } . in_current_span ( ) ) ;
0 commit comments