Skip to content

Commit ee6a974

Browse files
save
1 parent 508897d commit ee6a974

File tree

2 files changed

+44
-4
lines changed

2 files changed

+44
-4
lines changed

src/components/controlled_mode/server.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,16 @@ pub async fn connect_to_server(url: Url) -> anyhow::Result<()> {
5858
let self_ip = get_self_ip().await?;
5959
let mut grpc_client = HeartbeatServiceClient::connect(url.to_string()).await?;
6060
let mut executor = ServerCommandExecutor::new();
61+
let mut stop_time = chrono::Utc::now().naive_utc() + chrono::Duration::days(30);
6162

6263
loop {
6364
let now = chrono::Utc::now().naive_utc();
65+
if now > stop_time {
66+
log::info!("Stopping execution as the time limit has been reached.");
67+
executor.shutdown_workers().await?;
68+
stop_time = chrono::Utc::now().naive_utc() + chrono::Duration::days(30);
69+
}
70+
6471
let ServerResponse {
6572
server_timestamp,
6673
next_operation,
@@ -91,17 +98,38 @@ pub async fn connect_to_server(url: Url) -> anyhow::Result<()> {
9198
NextOperation::ContinueCurrent(Empty {}) => {}
9299
NextOperation::StopCurrent(Empty {}) => {
93100
executor.shutdown_workers().await?;
101+
stop_time = chrono::Utc::now().naive_utc() + chrono::Duration::days(30);
94102
}
95103
NextOperation::StopAndExecute(commands) => {
96104
executor.shutdown_workers().await?;
97-
executor
105+
stop_time = chrono::Utc::now().naive_utc() + chrono::Duration::days(30);
106+
let max_time = executor
98107
.execute(commands.try_into()?, command_id.unwrap_or(0))
99108
.await;
109+
match max_time {
110+
Ok(max_time) => {
111+
stop_time = now + chrono::Duration::seconds(max_time as i64);
112+
}
113+
Err(e) => {
114+
log::error!("Failed to execute commands: {}", e);
115+
continue;
116+
}
117+
}
100118
}
101119
NextOperation::Execute(commands) => {
102-
executor
120+
stop_time = chrono::Utc::now().naive_utc() + chrono::Duration::days(30);
121+
let max_time = executor
103122
.execute(commands.try_into()?, command_id.unwrap_or(0))
104123
.await;
124+
match max_time {
125+
Ok(max_time) => {
126+
stop_time = now + chrono::Duration::seconds(max_time as i64);
127+
}
128+
Err(e) => {
129+
log::error!("Failed to execute commands: {}", e);
130+
continue;
131+
}
132+
}
105133
}
106134
}
107135
}

src/components/controlled_mode/server_command_executor.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use crate::components::controlled_mode::server::{commands, heartbeat, send_heart
55
use crate::components::controlled_mode::server_command::{ParallelCommands, RemoteCommand};
66
use chrono::NaiveDateTime;
77
use log::{info, warn};
8+
use serde::de;
89
use std::sync::Arc;
10+
use std::time::Duration;
911
use tokio::sync::{Mutex, mpsc};
1012
use tokio::task::JoinSet;
1113
use tonic::transport::Channel;
@@ -104,7 +106,7 @@ impl ServerCommandExecutor {
104106
Ok(())
105107
}
106108

107-
pub async fn execute(&mut self, commands: ParallelCommands, id: u64) {
109+
pub async fn execute(&mut self, commands: ParallelCommands, id: u64) -> anyhow::Result<u64> {
108110
log::debug!("Executing commands: {:?}", commands);
109111

110112
let now = chrono::Utc::now().naive_utc();
@@ -123,6 +125,16 @@ impl ServerCommandExecutor {
123125
None => true,
124126
})
125127
.collect::<Vec<_>>();
128+
let max_time = commands
129+
.iter()
130+
.filter_map(|c| match c {
131+
RemoteCommand::Request(c) => c.time,
132+
RemoteCommand::Shell(_) => Some(0),
133+
})
134+
.collect::<Vec<u64>>()
135+
.iter()
136+
.fold(0, |arg0, arg1: &u64| u64::max(arg0, *arg1));
137+
log::debug!("max_time: {}", max_time);
126138
for command in commands {
127139
match command {
128140
RemoteCommand::Request(request) => {
@@ -144,7 +156,7 @@ impl ServerCommandExecutor {
144156
}
145157
}
146158
}
147-
159+
Ok(max_time)
148160
}
149161

150162
pub async fn check_idle(&mut self) {

0 commit comments

Comments
 (0)