Skip to content

Commit 9b41910

Browse files
committed
chore(cubestore): Upgrade DF 46: Add migration testing form of cubesql test cases
1 parent edc57ad commit 9b41910

File tree

12 files changed

+489
-89
lines changed

12 files changed

+489
-89
lines changed

rust/cubestore/Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubestore/cubestore-sql-tests/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ name = "cluster"
2828
path = "tests/cluster.rs"
2929
harness = false
3030

31+
[[test]]
32+
name = "migration"
33+
path = "tests/migration.rs"
34+
harness = false
35+
3136
[target.'cfg(not(target_os = "windows"))'.dependencies]
3237
ipc-channel = { version = "0.18.0" }
3338

rust/cubestore/cubestore-sql-tests/src/benches.rs

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
1-
use crate::{to_rows, SqlClient};
1+
use crate::files::download_and_unzip;
2+
use crate::to_rows;
23
use async_trait::async_trait;
34
use cubestore::cluster::Cluster;
45
use cubestore::config::{env_parse, Config, CubeServices};
56
use cubestore::metastore::{Column, ColumnType};
67
use cubestore::table::TableValue;
78
use cubestore::util::strings::path_to_string;
89
use cubestore::CubeError;
9-
use flate2::read::GzDecoder;
1010
use std::any::Any;
11-
use std::io::Cursor;
12-
use std::path::Path;
1311
use std::sync::Arc;
1412
use std::time::Duration;
15-
use tar::Archive;
1613
use tokio::time::timeout;
1714

1815
pub type BenchState = dyn Any + Send + Sync;
@@ -243,21 +240,6 @@ impl Bench for crate::benches::QueueListBench {
243240
}
244241
}
245242

246-
async fn download_and_unzip(url: &str, dataset: &str) -> Result<Box<Path>, CubeError> {
247-
let root = std::env::current_dir()?.join("data");
248-
let dataset_path = root.join(dataset);
249-
if !dataset_path.exists() {
250-
println!("Downloading {}", dataset);
251-
let response = reqwest::get(url).await?;
252-
let content = Cursor::new(response.bytes().await?);
253-
let tarfile = GzDecoder::new(content);
254-
let mut archive = Archive::new(tarfile);
255-
archive.unpack(root)?;
256-
}
257-
assert!(dataset_path.exists());
258-
Ok(dataset_path.into_boxed_path())
259-
}
260-
261243
async fn wait_for_all_jobs(services: &CubeServices) -> Result<(), CubeError> {
262244
let wait_for = services
263245
.meta_store
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,53 @@
11
use cubestore::CubeError;
22
use std::io::Write;
33
use tempfile::NamedTempFile;
4+
use flate2::read::GzDecoder;
5+
use std::io::Cursor;
6+
use std::path::Path;
7+
use tar::Archive;
48

59
pub fn write_tmp_file(text: &str) -> Result<NamedTempFile, CubeError> {
610
let mut file = NamedTempFile::new()?;
711
file.write_all(text.as_bytes())?;
812
return Ok(file);
913
}
14+
15+
pub async fn download_and_unzip(url: &str, dataset: &str) -> Result<Box<Path>, CubeError> {
16+
let root = std::env::current_dir()?.join("data");
17+
let dataset_path = root.join(dataset);
18+
if !dataset_path.exists() {
19+
println!("Downloading {}", dataset);
20+
let response = reqwest::get(url).await?;
21+
let content = Cursor::new(response.bytes().await?);
22+
let tarfile = GzDecoder::new(content);
23+
let mut archive = Archive::new(tarfile);
24+
archive.unpack(root)?;
25+
}
26+
assert!(dataset_path.exists());
27+
Ok(dataset_path.into_boxed_path())
28+
}
29+
30+
/// Recursively copies files and directories from `from` to `to`, which must not exist yet. Errors
31+
/// if anything other than a file or directory is found.
32+
///
33+
/// We don't use a lib because the first that was tried was broken.
34+
pub fn recursive_copy_directory(from: &Path, to: &Path) -> Result<(), CubeError> {
35+
let mut dir = std::fs::read_dir(from)?;
36+
37+
// This errors if the destination already exists, and that's what we want.
38+
std::fs::create_dir(to)?;
39+
40+
while let Some(entry) = dir.next() {
41+
let entry = entry?;
42+
let file_type = entry.file_type()?;
43+
if file_type.is_dir() {
44+
recursive_copy_directory(&entry.path(), &to.join(entry.file_name()))?;
45+
} else if file_type.is_file() {
46+
let _file_size = std::fs::copy(entry.path(), to.join(entry.file_name()))?;
47+
} else {
48+
return Err(CubeError::corrupt_data(format!("cannot copy file of type {:?} at location {:?}", file_type, entry.path())));
49+
}
50+
}
51+
52+
Ok(())
53+
}

rust/cubestore/cubestore-sql-tests/src/lib.rs

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use test::{ShouldPanic, TestDesc, TestDescAndFn, TestName, TestType};
1616
use tests::sql_tests;
1717

1818
mod benches;
19-
mod files;
19+
pub mod files;
2020
#[cfg(not(target_os = "windows"))]
2121
pub mod multiproc;
2222
#[allow(unused_parens, non_snake_case)]
@@ -32,6 +32,29 @@ pub trait SqlClient: Send + Sync {
3232
query: &str,
3333
) -> Result<Arc<DataFrame>, CubeError>;
3434
async fn plan_query(&self, query: &str) -> Result<QueryPlans, CubeError>;
35+
fn prefix(&self) -> &str;
36+
/// Used by FilterWritesSqlClient in migration tests, ignored for others.
37+
fn migration_run_next_query(&self) { }
38+
/// Used by FilterWritesSqlClient in migration tests, ignored for others.
39+
fn migration_hardcode_next_query(&self, _next_result: Result<Arc<DataFrame>, CubeError>) { }
40+
}
41+
42+
impl dyn SqlClient {
43+
/// Use this instead of prefix() so that other uses of prefix() are easily searchable and
44+
/// enumerable.
45+
fn is_migration(&self) -> bool {
46+
self.prefix() == "migration"
47+
}
48+
49+
/// Doesn't do anything but is a searchable token for later test management.
50+
fn note_non_idempotent_migration_test(&self) { }
51+
52+
/// We tolerate the next query but we want to revisit later because maybe it should be a rule in
53+
/// the FilterWritesSqlClient's recognized queries list.
54+
fn tolerate_next_query_revisit(&self) { self.migration_run_next_query() }
55+
56+
/// Hardcodes an error return value, for when the presence of an error but not the message is asserted.
57+
fn migration_hardcode_generic_err(&self) { self.migration_hardcode_next_query(Err(CubeError::user(String::new())));}
3558
}
3659

3760
pub fn run_sql_tests(
@@ -73,21 +96,31 @@ pub fn run_sql_tests(
7396
);
7497
}
7598

99+
pub struct BasicSqlClient {
100+
/// Used rarely in some test cases, or maybe frequently for the "migration" prefix.
101+
pub prefix: &'static str,
102+
pub service: Arc<dyn SqlService>,
103+
}
104+
76105
#[async_trait]
77-
impl SqlClient for Arc<dyn SqlService> {
106+
impl SqlClient for BasicSqlClient {
78107
async fn exec_query(&self, query: &str) -> Result<Arc<DataFrame>, CubeError> {
79-
self.as_ref().exec_query(query).await
108+
self.service.as_ref().exec_query(query).await
80109
}
81110

82111
async fn exec_query_with_context(
83112
&self,
84113
context: SqlQueryContext,
85114
query: &str,
86115
) -> Result<Arc<DataFrame>, CubeError> {
87-
self.as_ref().exec_query_with_context(context, query).await
116+
self.service.as_ref().exec_query_with_context(context, query).await
88117
}
89118

90119
async fn plan_query(&self, query: &str) -> Result<QueryPlans, CubeError> {
91-
self.as_ref().plan_query(query).await
120+
self.service.as_ref().plan_query(query).await
121+
}
122+
123+
fn prefix(&self) -> &str {
124+
&self.prefix
92125
}
93126
}

0 commit comments

Comments
 (0)