Skip to content

Commit be3e99e

Browse files
committed
fix(up): Fix anytime solver that missed intermediate solutions.
CPU bound computations were performed on the tokio green thread which led to some message being delayed.
1 parent 61d0649 commit be3e99e

File tree

1 file changed

+44
-20
lines changed

1 file changed

+44
-20
lines changed

planning/grpc/server/src/bin/server.rs

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ struct SolveArgs {
6161
conf: SolverConfiguration,
6262
}
6363

64-
#[derive(Debug, Args)]
64+
#[derive(Debug, Args, Clone)]
6565
pub struct SolverConfiguration {
6666
/// If true, the solver should look for optimal solutions
6767
#[clap(long)]
@@ -110,11 +110,25 @@ impl SolverConfiguration {
110110
}
111111
}
112112

113+
async fn solve(
114+
problem: Arc<up::Problem>,
115+
on_new_sol: impl Fn(up::Plan) + Clone + Send + 'static,
116+
conf: Arc<SolverConfiguration>,
117+
) -> Result<up::PlanGenerationResult, Error> {
118+
let (tx, rx) = tokio::sync::oneshot::channel();
119+
// run CPU-bound computation on a separate OS Thread
120+
std::thread::spawn(move || {
121+
tx.send(solve_blocking(problem, on_new_sol, conf)).unwrap();
122+
});
123+
rx.await.unwrap()
124+
}
125+
113126
/// Solves the given problem, giving any intermediate solution to the callback.
114-
pub fn solve(
115-
problem: &up::Problem,
127+
/// NOTE: This function is CPU-Bound and should not be used in an async context
128+
fn solve_blocking(
129+
problem: Arc<up::Problem>,
116130
on_new_sol: impl Fn(up::Plan) + Clone,
117-
conf: &SolverConfiguration,
131+
conf: Arc<SolverConfiguration>,
118132
) -> Result<up::PlanGenerationResult, Error> {
119133
let reception_time = Instant::now();
120134
let deadline = conf
@@ -138,7 +152,7 @@ pub fn solve(
138152
None
139153
};
140154

141-
let base_problem = problem_to_chronicles(problem)
155+
let base_problem = problem_to_chronicles(&problem)
142156
.with_context(|| format!("In problem {}/{}", &problem.domain_name, &problem.problem_name))?;
143157
let bounded = htn_mode && hierarchical_is_non_recursive(&base_problem) || base_problem.templates.is_empty();
144158

@@ -151,7 +165,7 @@ pub fn solve(
151165

152166
// callback that will be invoked each time an intermediate solution is found
153167
let on_new_solution = |pb: &FiniteProblem, ass: Arc<SavedAssignment>| {
154-
let plan = serialize_plan(problem, pb, &ass);
168+
let plan = serialize_plan(&problem, pb, &ass);
155169
match plan {
156170
Ok(plan) => on_new_sol(plan),
157171
Err(err) => eprintln!("Error when serializing intermediate plan: {err}"),
@@ -179,7 +193,7 @@ pub fn solve(
179193
} else {
180194
up::plan_generation_result::Status::SolvedSatisficing
181195
};
182-
let plan = serialize_plan(problem, &finite_problem, &plan)?;
196+
let plan = serialize_plan(&problem, &finite_problem, &plan)?;
183197
Ok(up::PlanGenerationResult {
184198
status: status as i32,
185199
plan: Some(plan),
@@ -202,7 +216,7 @@ pub fn solve(
202216
println!("************* TIMEOUT **************");
203217
let opt_plan = if let Some((finite_problem, plan)) = opt_plan {
204218
println!("\n{}", solver::format_plan(&finite_problem, &plan, htn_mode)?);
205-
Some(serialize_plan(problem, &finite_problem, &plan)?)
219+
Some(serialize_plan(&problem, &finite_problem, &plan)?)
206220
} else {
207221
None
208222
};
@@ -232,7 +246,10 @@ impl UnifiedPlanning for UnifiedPlanningService {
232246

233247
async fn plan_anytime(&self, request: Request<PlanRequest>) -> Result<Response<Self::planAnytimeStream>, Status> {
234248
let reception_time = Instant::now();
235-
let (tx, rx) = mpsc::channel(32);
249+
// Channel to send the stream of results
250+
// Channel is given a large capacity, as we do not want the solver to block when submitting
251+
// intermediate solutions
252+
let (tx, rx) = mpsc::channel(1024);
236253
let plan_request = request.into_inner();
237254

238255
let problem = plan_request
@@ -248,6 +265,9 @@ impl UnifiedPlanning for UnifiedPlanningService {
248265
conf.optimal = true;
249266

250267
let tx2 = tx.clone();
268+
269+
// Callback that will be called by the solver on each plan found.
270+
// Note that this is called outside of tokio and should no rely on async
251271
let on_new_sol = move |plan: up::Plan| {
252272
let mut answer = up::PlanGenerationResult {
253273
status: up::plan_generation_result::Status::Intermediate as i32,
@@ -258,18 +278,17 @@ impl UnifiedPlanning for UnifiedPlanningService {
258278
};
259279
add_engine_time(&mut answer.metrics, &reception_time);
260280

261-
// start a new green thread in charge for sending the result
262-
let tx2 = tx2.clone();
263-
tokio::spawn(async move {
264-
if tx2.send(Ok(answer)).await.is_err() {
265-
eprintln!("Could not send intermediate solution through the gRPC channel.");
266-
}
267-
});
281+
// send results synchronously (queue is sized to avoid blocking in practice)
282+
if tx2.blocking_send(Ok(answer)).is_err() {
283+
eprintln!("Could not send intermediate solution through the gRPC channel.");
284+
}
268285
};
269286

270-
// run a new green thread in which the solver will run
287+
let conf = Arc::new(conf);
288+
let problem = Arc::new(problem);
289+
271290
tokio::spawn(async move {
272-
let result = solve(&problem, on_new_sol, &conf);
291+
let result = solve(problem.clone(), on_new_sol, conf.clone()).await;
273292
match result {
274293
Ok(mut answer) => {
275294
add_engine_time(&mut answer.metrics, &reception_time);
@@ -311,7 +330,10 @@ impl UnifiedPlanning for UnifiedPlanningService {
311330
conf.timeout = Some(plan_request.timeout)
312331
}
313332

314-
let result = solve(&problem, |_| {}, &conf);
333+
let conf = Arc::new(conf);
334+
let problem = Arc::new(problem);
335+
336+
let result = solve(problem, |_| {}, conf).await;
315337
let mut answer = match result {
316338
Ok(answer) => answer,
317339
Err(e) => {
@@ -420,8 +442,10 @@ async fn main() -> Result<(), Error> {
420442
Command::Solve(solve_args) => {
421443
let problem = std::fs::read(&solve_args.problem_file)?;
422444
let problem = Problem::decode(problem.as_slice())?;
445+
let problem = Arc::new(problem);
446+
let conf = Arc::new(solve_args.conf.clone());
423447

424-
let answer = solve(&problem, |_| {}, &solve_args.conf);
448+
let answer = solve(problem, |_| {}, conf).await;
425449

426450
println!("{answer:?}");
427451
}

0 commit comments

Comments
 (0)