diff --git a/Cargo.lock b/Cargo.lock index f86c79aa2cc6..d32a4832b63d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5325,7 +5325,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a250938d7106b77da0ae915eb0c531411c28cfe3#a250938d7106b77da0ae915eb0c531411c28cfe3" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=d000eedd3739c003bb139aa42cefe05521a60f7d#d000eedd3739c003bb139aa42cefe05521a60f7d" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", diff --git a/Cargo.toml b/Cargo.toml index 40ba49a73453..a9fafd232cd1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a250938d7106b77da0ae915eb0c531411c28cfe3" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "d000eedd3739c003bb139aa42cefe05521a60f7d" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/common/function/src/admin/flush_compact_table.rs b/src/common/function/src/admin/flush_compact_table.rs index 378b4181cd68..e45f2f0f84d5 100644 --- a/src/common/function/src/admin/flush_compact_table.rs +++ b/src/common/function/src/admin/flush_compact_table.rs @@ -37,6 +37,8 @@ const COMPACT_TYPE_STRICT_WINDOW: &str = "strict_window"; /// Compact type: strict window (short name). const COMPACT_TYPE_STRICT_WINDOW_SHORT: &str = "swcs"; +const DEFAULT_COMPACTION_PARALLELISM: u32 = 1; + #[admin_fn( name = FlushTableFunction, display_name = flush_table, @@ -95,7 +97,7 @@ pub(crate) async fn compact_table( query_ctx: &QueryContextRef, params: &[ValueRef<'_>], ) -> Result { - let request = parse_compact_params(params, query_ctx)?; + let request = parse_compact_request(params, query_ctx)?; info!("Compact table request: {:?}", request); let affected_rows = table_mutation_handler @@ -117,37 +119,46 @@ fn compact_signature() -> Signature { /// - `[]`: only tables name provided, using default compaction type: regular /// - `[, ]`: specify table name and compaction type. The compaction options will be default. /// - `[, , ]`: provides both type and type-specific options. -fn parse_compact_params( +/// - For `twcs`, it accepts `parallelism=[N]` where N is an unsigned 32 bits number +/// - For `swcs`, it accepts two numeric parameter: `parallelism` and `window`. +fn parse_compact_request( params: &[ValueRef<'_>], query_ctx: &QueryContextRef, ) -> Result { ensure!( - !params.is_empty(), + !params.is_empty() && params.len() <= 3, InvalidFuncArgsSnafu { - err_msg: "Args cannot be empty", + err_msg: format!( + "The length of the args is not correct, expect 1-4, have: {}", + params.len() + ), } ); - let (table_name, compact_type) = match params { + let (table_name, compact_type, parallelism) = match params { + // 1. Only table name, strategy defaults to twcs and default parallelism. [ValueRef::String(table_name)] => ( table_name, compact_request::Options::Regular(Default::default()), + DEFAULT_COMPACTION_PARALLELISM, ), + // 2. Both table name and strategy are provided. [ ValueRef::String(table_name), ValueRef::String(compact_ty_str), ] => { - let compact_type = parse_compact_type(compact_ty_str, None)?; - (table_name, compact_type) + let (compact_type, parallelism) = parse_compact_options(compact_ty_str, None)?; + (table_name, compact_type, parallelism) } - + // 3. Table name, strategy and strategy specific options [ ValueRef::String(table_name), ValueRef::String(compact_ty_str), ValueRef::String(options_str), ] => { - let compact_type = parse_compact_type(compact_ty_str, Some(options_str))?; - (table_name, compact_type) + let (compact_type, parallelism) = + parse_compact_options(compact_ty_str, Some(options_str))?; + (table_name, compact_type, parallelism) } _ => { return UnsupportedInputDataTypeSnafu { @@ -167,35 +178,126 @@ fn parse_compact_params( schema_name, table_name, compact_options: compact_type, + parallelism, }) } -/// Parses compaction strategy type. For `strict_window` or `swcs` strict window compaction is chose, +/// Parses compaction strategy type. For `strict_window` or `swcs` strict window compaction is chosen, /// otherwise choose regular (TWCS) compaction. -fn parse_compact_type(type_str: &str, option: Option<&str>) -> Result { +fn parse_compact_options( + type_str: &str, + option: Option<&str>, +) -> Result<(compact_request::Options, u32)> { if type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW) | type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW_SHORT) { - let window_seconds = option - .map(|v| { - i64::from_str(v).map_err(|_| { - InvalidFuncArgsSnafu { - err_msg: format!( - "Compact window is expected to be a valid number, provided: {}", - v - ), + let Some(option_str) = option else { + return Ok(( + compact_request::Options::StrictWindow(StrictWindow { window_seconds: 0 }), + DEFAULT_COMPACTION_PARALLELISM, + )); + }; + + // For compatibility, accepts single number as window size. + if let Ok(window_seconds) = i64::from_str(option_str) { + return Ok(( + compact_request::Options::StrictWindow(StrictWindow { window_seconds }), + DEFAULT_COMPACTION_PARALLELISM, + )); + }; + + // Parse keyword arguments in forms: `key1=value1,key2=value2` + let mut window_seconds = 0i64; + let mut parallelism = DEFAULT_COMPACTION_PARALLELISM; + + let pairs: Vec<&str> = option_str.split(',').collect(); + for pair in pairs { + let kv: Vec<&str> = pair.trim().split('=').collect(); + if kv.len() != 2 { + return InvalidFuncArgsSnafu { + err_msg: format!("Invalid key-value pair: {}", pair.trim()), + } + .fail(); + } + + let key = kv[0].trim(); + let value = kv[1].trim(); + + match key { + "window" | "window_seconds" => { + window_seconds = i64::from_str(value).map_err(|_| { + InvalidFuncArgsSnafu { + err_msg: format!("Invalid value for window: {}", value), + } + .build() + })?; + } + "parallelism" => { + parallelism = value.parse::().map_err(|_| { + InvalidFuncArgsSnafu { + err_msg: format!("Invalid value for parallelism: {}", value), + } + .build() + })?; + } + _ => { + return InvalidFuncArgsSnafu { + err_msg: format!("Unknown parameter: {}", key), } - .build() - }) - }) - .transpose()? - .unwrap_or(0); - - Ok(compact_request::Options::StrictWindow(StrictWindow { - window_seconds, - })) + .fail(); + } + } + } + + Ok(( + compact_request::Options::StrictWindow(StrictWindow { window_seconds }), + parallelism, + )) } else { - Ok(compact_request::Options::Regular(Default::default())) + // TWCS strategy + let Some(option_str) = option else { + return Ok(( + compact_request::Options::Regular(Default::default()), + DEFAULT_COMPACTION_PARALLELISM, + )); + }; + + let mut parallelism = DEFAULT_COMPACTION_PARALLELISM; + let pairs: Vec<&str> = option_str.split(',').collect(); + for pair in pairs { + let kv: Vec<&str> = pair.trim().split('=').collect(); + if kv.len() != 2 { + return InvalidFuncArgsSnafu { + err_msg: format!("Invalid key-value pair: {}", pair.trim()), + } + .fail(); + } + + let key = kv[0].trim(); + let value = kv[1].trim(); + + match key { + "parallelism" => { + parallelism = value.parse::().map_err(|_| { + InvalidFuncArgsSnafu { + err_msg: format!("Invalid value for parallelism: {}", value), + } + .build() + })?; + } + _ => { + return InvalidFuncArgsSnafu { + err_msg: format!("Unknown parameter: {}", key), + } + .fail(); + } + } + } + + Ok(( + compact_request::Options::Regular(Default::default()), + parallelism, + )) } } @@ -301,7 +403,7 @@ mod tests { assert_eq!( expected, - &parse_compact_params(¶ms, &QueryContext::arc()).unwrap() + &parse_compact_request(¶ms, &QueryContext::arc()).unwrap() ); } } @@ -316,6 +418,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::Regular(Default::default()), + parallelism: 1, }, ), ( @@ -325,6 +428,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::Regular(Default::default()), + parallelism: 1, }, ), ( @@ -337,6 +441,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::Regular(Default::default()), + parallelism: 1, }, ), ( @@ -346,6 +451,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::Regular(Default::default()), + parallelism: 1, }, ), ( @@ -355,6 +461,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }), + parallelism: 1, }, ), ( @@ -366,32 +473,94 @@ mod tests { compact_options: Options::StrictWindow(StrictWindow { window_seconds: 3600, }), + parallelism: 1, }, ), ( - &["table", "regular", "abcd"], + &["table", "swcs", "120"], + CompactTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "table".to_string(), + compact_options: Options::StrictWindow(StrictWindow { + window_seconds: 120, + }), + parallelism: 1, + }, + ), + // Test with parallelism parameter + ( + &["table", "regular", "parallelism=4"], CompactTableRequest { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::Regular(Default::default()), + parallelism: 4, }, ), ( - &["table", "swcs", "120"], + &["table", "strict_window", "window=3600,parallelism=2"], CompactTableRequest { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::StrictWindow(StrictWindow { - window_seconds: 120, + window_seconds: 3600, + }), + parallelism: 2, + }, + ), + ( + &["table", "strict_window", "window=3600"], + CompactTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "table".to_string(), + compact_options: Options::StrictWindow(StrictWindow { + window_seconds: 3600, + }), + parallelism: 1, + }, + ), + ( + &["table", "strict_window", "window_seconds=7200"], + CompactTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "table".to_string(), + compact_options: Options::StrictWindow(StrictWindow { + window_seconds: 7200, + }), + parallelism: 1, + }, + ), + ( + &["table", "strict_window", "window=1800"], + CompactTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "table".to_string(), + compact_options: Options::StrictWindow(StrictWindow { + window_seconds: 1800, }), + parallelism: 1, + }, + ), + ( + &["table", "regular", "parallelism=8"], + CompactTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "table".to_string(), + compact_options: Options::Regular(Default::default()), + parallelism: 8, }, ), ]); assert!( - parse_compact_params( + parse_compact_request( &["table", "strict_window", "abc"] .into_iter() .map(ValueRef::String) @@ -402,7 +571,7 @@ mod tests { ); assert!( - parse_compact_params( + parse_compact_request( &["a.b.table", "strict_window", "abc"] .into_iter() .map(ValueRef::String) @@ -411,5 +580,88 @@ mod tests { ) .is_err() ); + + // Test invalid parallelism + assert!( + parse_compact_request( + &["table", "regular", "options", "invalid"] + .into_iter() + .map(ValueRef::String) + .collect::>(), + &QueryContext::arc(), + ) + .is_err() + ); + + // Test too many parameters + assert!( + parse_compact_request( + &["table", "regular", "options", "4", "extra"] + .into_iter() + .map(ValueRef::String) + .collect::>(), + &QueryContext::arc(), + ) + .is_err() + ); + + // Test invalid keyword argument format + assert!( + parse_compact_request( + &["table", "strict_window", "window"] + .into_iter() + .map(ValueRef::String) + .collect::>(), + &QueryContext::arc(), + ) + .is_err() + ); + + // Test invalid keyword + assert!( + parse_compact_request( + &["table", "strict_window", "invalid_key=123"] + .into_iter() + .map(ValueRef::String) + .collect::>(), + &QueryContext::arc(), + ) + .is_err() + ); + + assert!( + parse_compact_request( + &["table", "regular", "abcd"] + .into_iter() + .map(ValueRef::String) + .collect::>(), + &QueryContext::arc(), + ) + .is_err() + ); + + // Test invalid window value + assert!( + parse_compact_request( + &["table", "strict_window", "window=abc"] + .into_iter() + .map(ValueRef::String) + .collect::>(), + &QueryContext::arc(), + ) + .is_err() + ); + + // Test invalid parallelism in options string + assert!( + parse_compact_request( + &["table", "strict_window", "parallelism=abc"] + .into_iter() + .map(ValueRef::String) + .collect::>(), + &QueryContext::arc(), + ) + .is_err() + ); } } diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index b9aa7baa38ea..ffa0aa4468d4 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -35,6 +35,7 @@ impl RegionWorkerLoop { return; }; COMPACTION_REQUEST_COUNT.inc(); + let parallelism = req.parallelism.unwrap_or(1) as usize; if let Err(e) = self .compaction_scheduler .schedule_compaction( @@ -45,8 +46,7 @@ impl RegionWorkerLoop { sender, ®ion.manifest_ctx, self.schema_metadata_manager.clone(), - // TODO(yingwen): expose this to frontend - 1, + parallelism, ) .await { @@ -116,7 +116,7 @@ impl RegionWorkerLoop { OptionOutputTx::none(), ®ion.manifest_ctx, self.schema_metadata_manager.clone(), - 1, + 1, // Default for automatic compaction ) .await { diff --git a/src/operator/src/request.rs b/src/operator/src/request.rs index 36f368a68181..1bca4618429f 100644 --- a/src/operator/src/request.rs +++ b/src/operator/src/request.rs @@ -109,6 +109,7 @@ impl Requester { .map(|partition| { RegionRequestBody::Compact(CompactRequest { region_id: partition.id.into(), + parallelism: request.parallelism, options: Some(request.compact_options), }) }) @@ -146,6 +147,7 @@ impl Requester { ) -> Result { let request = RegionRequestBody::Compact(CompactRequest { region_id: region_id.into(), + parallelism: 1, options: None, // todo(hl): maybe also support parameters in region compaction. }); diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 009cb4eeaff0..23927bdca944 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -338,9 +338,18 @@ fn make_region_compact(compact: CompactRequest) -> Result, } impl Default for RegionCompactRequest { @@ -1339,6 +1349,7 @@ impl Default for RegionCompactRequest { Self { // Default to regular compaction. options: compact_request::Options::Regular(Default::default()), + parallelism: None, } } } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 2418c4a2c5a2..6cdf9454805e 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -401,6 +401,7 @@ pub struct CompactTableRequest { pub schema_name: String, pub table_name: String, pub compact_options: compact_request::Options, + pub parallelism: u32, } impl Default for CompactTableRequest { @@ -410,6 +411,7 @@ impl Default for CompactTableRequest { schema_name: Default::default(), table_name: Default::default(), compact_options: compact_request::Options::Regular(Default::default()), + parallelism: 1, } } }