Skip to content

Commit 69ef0b6

Browse files
authored
fix(cubestore): If job has been scheduled to non-existent node it'd hang around forever (#6242)
1 parent e2dfe8b commit 69ef0b6

File tree

3 files changed

+169
-0
lines changed

3 files changed

+169
-0
lines changed

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,6 +1009,7 @@ pub trait MetaStore: DIService + Send + Sync {
10091009
&self,
10101010
orphaned_timeout: Duration,
10111011
) -> Result<Vec<IdRow<Job>>, CubeError>;
1012+
async fn get_jobs_on_non_exists_nodes(&self) -> Result<Vec<IdRow<Job>>, CubeError>;
10121013
async fn delete_job(&self, job_id: u64) -> Result<IdRow<Job>, CubeError>;
10131014
async fn start_processing_job(
10141015
&self,
@@ -3580,6 +3581,33 @@ impl MetaStore for RocksMetaStore {
35803581
.await
35813582
}
35823583

3584+
async fn get_jobs_on_non_exists_nodes(&self) -> Result<Vec<IdRow<Job>>, CubeError> {
3585+
let workers = if self.store.config.select_workers().is_empty() {
3586+
vec![self.store.config.server_name().clone()]
3587+
} else {
3588+
self.store.config.select_workers().clone()
3589+
};
3590+
let nodes = workers
3591+
.iter()
3592+
.map(|s| s.to_string())
3593+
.collect::<HashSet<_>>();
3594+
self.read_operation_out_of_queue(move |db_ref| {
3595+
let jobs_table = JobRocksTable::new(db_ref);
3596+
let all_jobs = jobs_table
3597+
.all_rows()?
3598+
.into_iter()
3599+
.filter(|j| match j.get_row().status() {
3600+
JobStatus::Scheduled(node) | JobStatus::ProcessingBy(node) => {
3601+
!nodes.contains(node)
3602+
}
3603+
_ => false,
3604+
})
3605+
.collect::<Vec<_>>();
3606+
Ok(all_jobs)
3607+
})
3608+
.await
3609+
}
3610+
35833611
#[tracing::instrument(level = "trace", skip(self))]
35843612
async fn delete_job(&self, job_id: u64) -> Result<IdRow<Job>, CubeError> {
35853613
self.write_operation(move |db_ref, batch_pipe| {

rust/cubestore/cubestore/src/queryplanner/test_utils.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,9 @@ impl MetaStore for MetaStoreMock {
694694
async fn insert_chunks(&self, _chunks: Vec<Chunk>) -> Result<Vec<IdRow<Chunk>>, CubeError> {
695695
panic!("MetaStore mock!")
696696
}
697+
async fn get_jobs_on_non_exists_nodes(&self) -> Result<Vec<IdRow<Job>>, CubeError> {
698+
panic!("MetaStore mock!")
699+
}
697700
}
698701

699702
crate::di_service!(MetaStoreMock, [MetaStore]);

rust/cubestore/cubestore/src/scheduler/mod.rs

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,15 @@ impl SchedulerImpl {
151151
}
152152

153153
pub async fn reconcile(&self) -> Result<(), CubeError> {
154+
if let Err(e) = warn_long_fut(
155+
"Removing jobs on non-existing nodes",
156+
Duration::from_millis(5000),
157+
self.remove_jobs_on_non_exists_nodes(),
158+
)
159+
.await
160+
{
161+
error!("Error removing orphaned jobs: {}", e);
162+
}
154163
if let Err(e) = warn_long_fut(
155164
"Removing orphaned jobs",
156165
Duration::from_millis(5000),
@@ -545,6 +554,15 @@ impl SchedulerImpl {
545554
Ok(())
546555
}
547556

557+
async fn remove_jobs_on_non_exists_nodes(&self) -> Result<(), CubeError> {
558+
let jobs_to_remove = self.meta_store.get_jobs_on_non_exists_nodes().await?;
559+
for job in jobs_to_remove.into_iter() {
560+
log::info!("Removing job {:?} on non-existing node", job);
561+
self.meta_store.delete_job(job.get_id()).await?;
562+
}
563+
Ok(())
564+
}
565+
548566
async fn remove_orphaned_jobs(&self) -> Result<(), CubeError> {
549567
let orphaned_jobs = self
550568
.meta_store
@@ -1319,3 +1337,123 @@ impl DataGCLoop {
13191337
}
13201338
}
13211339
}
1340+
1341+
#[cfg(test)]
1342+
mod tests {
1343+
use super::*;
1344+
use crate::config::Config;
1345+
use std::fs;
1346+
1347+
#[tokio::test]
1348+
async fn test_remove_jobs_on_non_exists_nodes() {
1349+
let config = Config::test("remove_jobs_on_non_exists_nodes");
1350+
1351+
let _ = fs::remove_dir_all(config.local_dir());
1352+
let _ = fs::remove_dir_all(config.remote_dir());
1353+
1354+
let services = config.configure().await;
1355+
services.start_processing_loops().await.unwrap();
1356+
let meta_store = services.meta_store.clone();
1357+
meta_store
1358+
.add_job(Job::new(
1359+
RowKey::Table(TableId::Partitions, 1),
1360+
JobType::PartitionCompaction,
1361+
"not_existis_node".to_string(),
1362+
))
1363+
.await
1364+
.unwrap();
1365+
let exists_job = meta_store
1366+
.add_job(Job::new(
1367+
RowKey::Table(TableId::Partitions, 2),
1368+
JobType::PartitionCompaction,
1369+
config.config_obj().server_name().to_string(),
1370+
))
1371+
.await
1372+
.unwrap()
1373+
.unwrap();
1374+
let all_jobs = meta_store.all_jobs().await.unwrap();
1375+
assert_eq!(all_jobs.len(), 2);
1376+
let scheduler = services.injector.get_service_typed::<SchedulerImpl>().await;
1377+
scheduler.remove_jobs_on_non_exists_nodes().await.unwrap();
1378+
let all_jobs = meta_store.all_jobs().await.unwrap();
1379+
assert_eq!(all_jobs.len(), 1);
1380+
assert_eq!(all_jobs[0].get_id(), exists_job.get_id());
1381+
services.stop_processing_loops().await.unwrap();
1382+
let _ = fs::remove_dir_all(config.local_dir());
1383+
let _ = fs::remove_dir_all(config.remote_dir());
1384+
}
1385+
1386+
#[tokio::test]
1387+
async fn test_remove_jobs_on_non_exists_nodes_several_workers() {
1388+
let config = Config::test("remove_jobs_on_non_exists_nodes_several_workers").update_config(
1389+
|mut config| {
1390+
config.select_workers = vec!["worker1".to_string(), "worker2".to_string()];
1391+
config
1392+
},
1393+
);
1394+
1395+
let _ = fs::remove_dir_all(config.local_dir());
1396+
let _ = fs::remove_dir_all(config.remote_dir());
1397+
1398+
let services = config.configure().await;
1399+
services.start_processing_loops().await.unwrap();
1400+
let meta_store = services.meta_store.clone();
1401+
let mut existing_ids = Vec::new();
1402+
existing_ids.push(
1403+
meta_store
1404+
.add_job(Job::new(
1405+
RowKey::Table(TableId::Partitions, 2),
1406+
JobType::PartitionCompaction,
1407+
"worker1".to_string(),
1408+
))
1409+
.await
1410+
.unwrap()
1411+
.unwrap()
1412+
.get_id(),
1413+
);
1414+
1415+
existing_ids.push(
1416+
meta_store
1417+
.add_job(Job::new(
1418+
RowKey::Table(TableId::Partitions, 3),
1419+
JobType::PartitionCompaction,
1420+
"worker2".to_string(),
1421+
))
1422+
.await
1423+
.unwrap()
1424+
.unwrap()
1425+
.get_id(),
1426+
);
1427+
1428+
meta_store
1429+
.add_job(Job::new(
1430+
RowKey::Table(TableId::Partitions, 1),
1431+
JobType::PartitionCompaction,
1432+
"not_existis_node".to_string(),
1433+
))
1434+
.await
1435+
.unwrap();
1436+
1437+
meta_store
1438+
.add_job(Job::new(
1439+
RowKey::Table(TableId::Partitions, 4),
1440+
JobType::PartitionCompaction,
1441+
"not_existis_node2".to_string(),
1442+
))
1443+
.await
1444+
.unwrap();
1445+
existing_ids.sort();
1446+
let all_jobs = meta_store.all_jobs().await.unwrap();
1447+
assert_eq!(all_jobs.len(), 4);
1448+
let scheduler = services.injector.get_service_typed::<SchedulerImpl>().await;
1449+
scheduler.remove_jobs_on_non_exists_nodes().await.unwrap();
1450+
let all_jobs = meta_store.all_jobs().await.unwrap();
1451+
assert_eq!(all_jobs.len(), 2);
1452+
let mut job_ids = all_jobs.into_iter().map(|j| j.get_id()).collect::<Vec<_>>();
1453+
job_ids.sort();
1454+
assert_eq!(job_ids, existing_ids);
1455+
services.stop_processing_loops().await.unwrap();
1456+
let _ = fs::remove_dir_all(config.local_dir());
1457+
let _ = fs::remove_dir_all(config.remote_dir());
1458+
}
1459+
}

0 commit comments

Comments
 (0)