Skip to content

Commit 523c0c2

Browse files
authored
fix(cubestore): Wrong triggering of table deactivation in some cases (#6119)
1 parent 2727911 commit 523c0c2

File tree

7 files changed

+87
-16
lines changed

7 files changed

+87
-16
lines changed

rust/cubestore/cubestore/src/cluster/mod.rs

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -484,19 +484,29 @@ impl Cluster for ClusterImpl {
484484
let node_name = self.node_name_by_partition(&partition);
485485
let mut futures = Vec::new();
486486
if let Some(name) = partition.get_row().get_full_name(partition.get_id()) {
487-
futures.push(self.warmup_download(&node_name, name, partition.get_row().file_size()));
487+
futures.push(self.warmup_download_with_corruption_check(
488+
&node_name,
489+
name,
490+
partition.get_row().file_size(),
491+
&partition,
492+
None,
493+
));
488494
}
489495
for chunk in chunks.iter() {
490496
let name = chunk.get_row().get_full_name(chunk.get_id());
491-
futures.push(self.warmup_download(&node_name, name, chunk.get_row().file_size()));
497+
futures.push(self.warmup_download_with_corruption_check(
498+
&node_name,
499+
name,
500+
chunk.get_row().file_size(),
501+
&partition,
502+
Some(chunk.get_id()),
503+
));
492504
}
493505
let res = join_all(futures)
494506
.await
495507
.into_iter()
496508
.collect::<Result<Vec<_>, _>>();
497509

498-
deactivate_table_on_corrupt_data(self.meta_store.clone(), &res, &partition).await;
499-
500510
res?;
501511
Ok(())
502512
}
@@ -1404,14 +1414,20 @@ impl ClusterImpl {
14041414
let to_download = plan_node.files_to_download();
14051415
let file_futures = to_download
14061416
.iter()
1407-
.map(|(partition, remote, file_size)| {
1417+
.map(|(partition, remote, file_size, chunk_id)| {
14081418
let meta_store = self.meta_store.clone();
14091419
async move {
14101420
let res = self
14111421
.remote_fs
14121422
.download_file(remote, file_size.clone())
14131423
.await;
1414-
deactivate_table_on_corrupt_data(meta_store, &res, &partition).await;
1424+
deactivate_table_on_corrupt_data(
1425+
meta_store,
1426+
&res,
1427+
&partition,
1428+
chunk_id.clone(),
1429+
)
1430+
.await;
14151431
res
14161432
}
14171433
})
@@ -1428,12 +1444,27 @@ impl ClusterImpl {
14281444
.collect::<Result<Vec<_>, _>>()?
14291445
.into_iter(),
14301446
)
1431-
.map(|((_, remote_path, _), path)| (remote_path, path))
1447+
.map(|((_, remote_path, _, _), path)| (remote_path, path))
14321448
.collect::<HashMap<_, _>>();
14331449

14341450
Ok(remote_to_local_names)
14351451
}
14361452

1453+
async fn warmup_download_with_corruption_check(
1454+
&self,
1455+
node_name: &str,
1456+
remote_path: String,
1457+
expected_file_size: Option<u64>,
1458+
partition: &IdRow<Partition>,
1459+
chunk_id: Option<u64>,
1460+
) -> Result<(), CubeError> {
1461+
let res = self
1462+
.warmup_download(&node_name, remote_path, expected_file_size)
1463+
.await;
1464+
deactivate_table_on_corrupt_data(self.meta_store.clone(), &res, partition, chunk_id).await;
1465+
res
1466+
}
1467+
14371468
pub async fn try_to_connect(&mut self) -> Result<(), CubeError> {
14381469
let streams = self
14391470
.server_addresses
@@ -1669,7 +1700,8 @@ impl ClusterImpl {
16691700
c.get_row().file_size(),
16701701
)
16711702
.await;
1672-
deactivate_table_on_corrupt_data(self.meta_store.clone(), &result, &p).await;
1703+
// TODO: propagate 'not found' and log in debug mode. Compaction might remove files,
1704+
// so they are not errors most of the time.
16731705
ack_error!(result);
16741706
}
16751707
}

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4179,8 +4179,11 @@ pub async fn deactivate_table_on_corrupt_data<'a, T: 'static>(
41794179
meta_store: Arc<dyn MetaStore>,
41804180
e: &'a Result<T, CubeError>,
41814181
partition: &'a IdRow<Partition>,
4182+
chunk_id: Option<u64>,
41824183
) {
4183-
if let Err(e) = deactivate_table_on_corrupt_data_res::<T>(meta_store, e, partition).await {
4184+
if let Err(e) =
4185+
deactivate_table_on_corrupt_data_res::<T>(meta_store, e, partition, chunk_id).await
4186+
{
41844187
log::error!("Error during deactivation of table on corrupt data: {}", e);
41854188
}
41864189
}
@@ -4189,9 +4192,34 @@ pub async fn deactivate_table_on_corrupt_data_res<'a, T: 'static>(
41894192
meta_store: Arc<dyn MetaStore>,
41904193
result: &'a Result<T, CubeError>,
41914194
partition: &'a IdRow<Partition>,
4195+
chunk_id: Option<u64>,
41924196
) -> Result<(), CubeError> {
41934197
if let Err(e) = &result {
41944198
if e.is_corrupt_data() {
4199+
//Firstly check if chunk and partition exists in metastore now, because they could have been deleted due to compaction and similar things
4200+
if let Some(chunk_id) = chunk_id {
4201+
match meta_store.get_chunk(chunk_id).await {
4202+
Ok(_) => {}
4203+
Err(_) => {
4204+
log::info!(
4205+
"Chunk {} is no longer in metastore so deactivation is not required",
4206+
chunk_id
4207+
);
4208+
return Ok(());
4209+
}
4210+
};
4211+
} else {
4212+
match meta_store.get_partition(partition.get_id()).await {
4213+
Ok(_) => {}
4214+
Err(_) => {
4215+
log::info!(
4216+
"Partition {} is no longer in metastore so deactivation is not required",
4217+
partition.get_id()
4218+
);
4219+
return Ok(());
4220+
}
4221+
};
4222+
}
41954223
let table_id = meta_store
41964224
.get_index(partition.get_row().get_index_id())
41974225
.await?

rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,7 +1083,7 @@ impl SerializedPlan {
10831083
&self.schema_snapshot.index_snapshots
10841084
}
10851085

1086-
pub fn files_to_download(&self) -> Vec<(IdRow<Partition>, String, Option<u64>)> {
1086+
pub fn files_to_download(&self) -> Vec<(IdRow<Partition>, String, Option<u64>, Option<u64>)> {
10871087
self.list_files_to_download(|id| {
10881088
self.partition_ids_to_execute
10891089
.binary_search_by_key(&id, |(id, _)| *id)
@@ -1092,7 +1092,7 @@ impl SerializedPlan {
10921092
}
10931093

10941094
/// Note: avoid during normal execution, workers must filter the partitions they execute.
1095-
pub fn all_required_files(&self) -> Vec<(IdRow<Partition>, String, Option<u64>)> {
1095+
pub fn all_required_files(&self) -> Vec<(IdRow<Partition>, String, Option<u64>, Option<u64>)> {
10961096
self.list_files_to_download(|_| true)
10971097
}
10981098

@@ -1103,6 +1103,7 @@ impl SerializedPlan {
11031103
IdRow<Partition>,
11041104
/* file_name */ String,
11051105
/* size */ Option<u64>,
1106+
/* chunk_id */ Option<u64>,
11061107
)> {
11071108
let indexes = self.index_snapshots();
11081109

@@ -1122,6 +1123,7 @@ impl SerializedPlan {
11221123
partition.partition.clone(),
11231124
file,
11241125
partition.partition.get_row().file_size(),
1126+
None,
11251127
));
11261128
}
11271129

@@ -1131,6 +1133,7 @@ impl SerializedPlan {
11311133
partition.partition.clone(),
11321134
chunk.get_row().get_full_name(chunk.get_id()),
11331135
chunk.get_row().file_size(),
1136+
Some(chunk.get_id()),
11341137
))
11351138
}
11361139
}

rust/cubestore/cubestore/src/scheduler/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1101,7 +1101,7 @@ impl SchedulerImpl {
11011101
.warmup_download(&node_name, path, p.get_row().file_size())
11021102
.await;
11031103

1104-
deactivate_table_on_corrupt_data(self.meta_store.clone(), &result, p).await;
1104+
deactivate_table_on_corrupt_data(self.meta_store.clone(), &result, p, None).await;
11051105

11061106
result
11071107
}

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,7 @@ impl SqlServiceImpl {
613613
tokio::fs::create_dir(&data_dir).await?;
614614
log::debug!("Dumping data files to {:?}", data_dir);
615615
// TODO: download in parallel.
616-
for (_, f, size) in p.all_required_files() {
616+
for (_, f, size, _) in p.all_required_files() {
617617
let f = self.remote_fs.download_file(&f, size).await?;
618618
let name = Path::new(&f).file_name().ok_or_else(|| {
619619
CubeError::internal(format!("Could not get filename of '{}'", f))
@@ -1307,7 +1307,7 @@ impl SqlService for SqlServiceImpl {
13071307
context.inline_tables.into_iter().map(|i| i.id).collect(),
13081308
);
13091309
let mut mocked_names = HashMap::new();
1310-
for (_, f, _) in worker_plan.files_to_download() {
1310+
for (_, f, _, _) in worker_plan.files_to_download() {
13111311
let name = self.remote_fs.local_file(&f).await?;
13121312
mocked_names.insert(f, name);
13131313
}

rust/cubestore/cubestore/src/store/compaction.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,8 @@ impl CompactionService for CompactionServiceImpl {
455455
.remote_fs
456456
.download_file(&f, partition.get_row().file_size())
457457
.await;
458-
deactivate_table_on_corrupt_data(self.meta_store.clone(), &result, &partition).await;
458+
deactivate_table_on_corrupt_data(self.meta_store.clone(), &result, &partition, None)
459+
.await;
459460
Some(result?)
460461
} else {
461462
None

rust/cubestore/cubestore/src/store/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -701,10 +701,17 @@ impl ChunkStore {
701701
)));
702702
}
703703
let file_size = chunk.get_row().file_size();
704+
let chunk_id = chunk.get_id();
704705
let remote_path = ChunkStore::chunk_file_name(chunk);
705706
let result = self.remote_fs.download_file(&remote_path, file_size).await;
706707

707-
deactivate_table_on_corrupt_data(self.meta_store.clone(), &result, &partition).await;
708+
deactivate_table_on_corrupt_data(
709+
self.meta_store.clone(),
710+
&result,
711+
&partition,
712+
Some(chunk_id),
713+
)
714+
.await;
708715

709716
Ok((
710717
self.remote_fs.local_file(&remote_path).await?,

0 commit comments

Comments
 (0)