From f65f768b9223892e85b7d9d9062b927e60906d44 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 14 Oct 2025 10:58:13 +0800 Subject: [PATCH 1/4] feat/manual-compaction-parallelism: ### Add Parallelism Support to Compaction Requests - **`Cargo.lock` & `Cargo.toml`**: Updated `greptime-proto` dependency to a new revision. - **`flush_compact_table.rs`**: Enhanced `parse_compact_params` to support a new `parallelism` parameter, allowing users to specify the level of parallelism for table compaction. - **`handle_compaction.rs`**: Integrated `parallelism` into the compaction scheduling process, defaulting to 1 if not specified. - **`request.rs` & `region_request.rs`**: Modified `CompactRequest` to include `parallelism`, with logic to handle unspecifie values. - **`requests.rs`**: Updated `CompactTableRequest` structure to include an optional `parallelism` field. Signed-off-by: Lei, HUANG --- Cargo.lock | 2 +- Cargo.toml | 2 +- .../function/src/admin/flush_compact_table.rs | 90 +++++++++++++++++-- src/mito2/src/worker/handle_compaction.rs | 6 +- src/operator/src/request.rs | 2 + src/store-api/src/region_request.rs | 13 ++- src/table/src/requests.rs | 2 + 7 files changed, 106 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f86c79aa2cc6..b3c408b34510 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5325,7 +5325,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a250938d7106b77da0ae915eb0c531411c28cfe3#a250938d7106b77da0ae915eb0c531411c28cfe3" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=31a2ade77f83c1d95481208daeaba718e2c0d45f#31a2ade77f83c1d95481208daeaba718e2c0d45f" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", diff --git a/Cargo.toml b/Cargo.toml index 40ba49a73453..09cf76bcc509 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a250938d7106b77da0ae915eb0c531411c28cfe3" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "31a2ade77f83c1d95481208daeaba718e2c0d45f" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/common/function/src/admin/flush_compact_table.rs b/src/common/function/src/admin/flush_compact_table.rs index 378b4181cd68..9d5fbef2ba39 100644 --- a/src/common/function/src/admin/flush_compact_table.rs +++ b/src/common/function/src/admin/flush_compact_table.rs @@ -117,28 +117,33 @@ fn compact_signature() -> Signature { /// - `[]`: only tables name provided, using default compaction type: regular /// - `[, ]`: specify table name and compaction type. The compaction options will be default. /// - `[, , ]`: provides both type and type-specific options. +/// - `[, , , ]`: provides type, type-specific options, and parallelism. fn parse_compact_params( params: &[ValueRef<'_>], query_ctx: &QueryContextRef, ) -> Result { ensure!( - !params.is_empty(), + !params.is_empty() && params.len() <= 4, InvalidFuncArgsSnafu { - err_msg: "Args cannot be empty", + err_msg: format!( + "The length of the args is not correct, expect 1-4, have: {}", + params.len() + ), } ); - let (table_name, compact_type) = match params { + let (table_name, compact_type, parallelism) = match params { [ValueRef::String(table_name)] => ( table_name, compact_request::Options::Regular(Default::default()), + None, ), [ ValueRef::String(table_name), ValueRef::String(compact_ty_str), ] => { let compact_type = parse_compact_type(compact_ty_str, None)?; - (table_name, compact_type) + (table_name, compact_type, None) } [ @@ -147,7 +152,26 @@ fn parse_compact_params( ValueRef::String(options_str), ] => { let compact_type = parse_compact_type(compact_ty_str, Some(options_str))?; - (table_name, compact_type) + (table_name, compact_type, None) + } + + [ + ValueRef::String(table_name), + ValueRef::String(compact_ty_str), + ValueRef::String(options_str), + ValueRef::String(parallelism_str), + ] => { + let compact_type = parse_compact_type(compact_ty_str, Some(options_str))?; + let parallelism = Some(u32::from_str(parallelism_str).map_err(|_| { + InvalidFuncArgsSnafu { + err_msg: format!( + "Parallelism is expected to be a valid number, provided: {}", + parallelism_str + ), + } + .build() + })?); + (table_name, compact_type, parallelism) } _ => { return UnsupportedInputDataTypeSnafu { @@ -167,6 +191,7 @@ fn parse_compact_params( schema_name, table_name, compact_options: compact_type, + parallelism, }) } @@ -316,6 +341,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::Regular(Default::default()), + parallelism: None, }, ), ( @@ -325,6 +351,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::Regular(Default::default()), + parallelism: None, }, ), ( @@ -337,6 +364,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::Regular(Default::default()), + parallelism: None, }, ), ( @@ -346,6 +374,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::Regular(Default::default()), + parallelism: None, }, ), ( @@ -355,6 +384,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }), + parallelism: None, }, ), ( @@ -366,6 +396,7 @@ mod tests { compact_options: Options::StrictWindow(StrictWindow { window_seconds: 3600, }), + parallelism: None, }, ), ( @@ -375,6 +406,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::Regular(Default::default()), + parallelism: None, }, ), ( @@ -386,6 +418,30 @@ mod tests { compact_options: Options::StrictWindow(StrictWindow { window_seconds: 120, }), + parallelism: None, + }, + ), + // Test with parallelism parameter + ( + &["table", "regular", "options", "4"], + CompactTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "table".to_string(), + compact_options: Options::Regular(Default::default()), + parallelism: Some(4), + }, + ), + ( + &["table", "strict_window", "3600", "2"], + CompactTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "table".to_string(), + compact_options: Options::StrictWindow(StrictWindow { + window_seconds: 3600, + }), + parallelism: Some(2), }, ), ]); @@ -411,5 +467,29 @@ mod tests { ) .is_err() ); + + // Test invalid parallelism + assert!( + parse_compact_params( + &["table", "regular", "options", "invalid"] + .into_iter() + .map(ValueRef::String) + .collect::>(), + &QueryContext::arc(), + ) + .is_err() + ); + + // Test too many parameters + assert!( + parse_compact_params( + &["table", "regular", "options", "4", "extra"] + .into_iter() + .map(ValueRef::String) + .collect::>(), + &QueryContext::arc(), + ) + .is_err() + ); } } diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index b9aa7baa38ea..ffa0aa4468d4 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -35,6 +35,7 @@ impl RegionWorkerLoop { return; }; COMPACTION_REQUEST_COUNT.inc(); + let parallelism = req.parallelism.unwrap_or(1) as usize; if let Err(e) = self .compaction_scheduler .schedule_compaction( @@ -45,8 +46,7 @@ impl RegionWorkerLoop { sender, ®ion.manifest_ctx, self.schema_metadata_manager.clone(), - // TODO(yingwen): expose this to frontend - 1, + parallelism, ) .await { @@ -116,7 +116,7 @@ impl RegionWorkerLoop { OptionOutputTx::none(), ®ion.manifest_ctx, self.schema_metadata_manager.clone(), - 1, + 1, // Default for automatic compaction ) .await { diff --git a/src/operator/src/request.rs b/src/operator/src/request.rs index 36f368a68181..edbd1ba9aeb1 100644 --- a/src/operator/src/request.rs +++ b/src/operator/src/request.rs @@ -109,6 +109,7 @@ impl Requester { .map(|partition| { RegionRequestBody::Compact(CompactRequest { region_id: partition.id.into(), + parallelism: request.parallelism.unwrap_or(1), options: Some(request.compact_options), }) }) @@ -146,6 +147,7 @@ impl Requester { ) -> Result { let request = RegionRequestBody::Compact(CompactRequest { region_id: region_id.into(), + parallelism: 1, options: None, // todo(hl): maybe also support parameters in region compaction. }); diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 009cb4eeaff0..23927bdca944 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -338,9 +338,18 @@ fn make_region_compact(compact: CompactRequest) -> Result, } impl Default for RegionCompactRequest { @@ -1339,6 +1349,7 @@ impl Default for RegionCompactRequest { Self { // Default to regular compaction. options: compact_request::Options::Regular(Default::default()), + parallelism: None, } } } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 2418c4a2c5a2..521aadd4eb0f 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -401,6 +401,7 @@ pub struct CompactTableRequest { pub schema_name: String, pub table_name: String, pub compact_options: compact_request::Options, + pub parallelism: Option, } impl Default for CompactTableRequest { @@ -410,6 +411,7 @@ impl Default for CompactTableRequest { schema_name: Default::default(), table_name: Default::default(), compact_options: compact_request::Options::Regular(Default::default()), + parallelism: None, } } } From 397ac3fb0cab98274c7047b5214df413cf178dce Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 14 Oct 2025 16:39:39 +0800 Subject: [PATCH 2/4] feat/manual-compaction-parallelism: ### Commit Message Enhance Compaction Request Handling - **`flush_compact_table.rs`**: - Renamed `parse_compact_params` to `parse_compact_request`. - Introduced `DEFAULT_COMPACTION_PARALLELISM` constant. - Updated parsing logic to handle keyword arguments for `strict_window` and `regular` compaction types, including `parallelism` and `window`. - Modified tests to reflect changes in parsing logic and default parallelism handling. - **`request.rs`**: - Updated `parallelism` handling in `RegionRequestBody::Compact` to use the new default value. - **`requests.rs`**: - Changed `CompactTableRequest` to use a non-optional `parallelism` field with a default value of 1. Signed-off-by: Lei, HUANG --- .../function/src/admin/flush_compact_table.rs | 312 ++++++++++++++---- src/operator/src/request.rs | 2 +- src/table/src/requests.rs | 4 +- 3 files changed, 245 insertions(+), 73 deletions(-) diff --git a/src/common/function/src/admin/flush_compact_table.rs b/src/common/function/src/admin/flush_compact_table.rs index 9d5fbef2ba39..5ff1586917b5 100644 --- a/src/common/function/src/admin/flush_compact_table.rs +++ b/src/common/function/src/admin/flush_compact_table.rs @@ -37,6 +37,8 @@ const COMPACT_TYPE_STRICT_WINDOW: &str = "strict_window"; /// Compact type: strict window (short name). const COMPACT_TYPE_STRICT_WINDOW_SHORT: &str = "swcs"; +const DEFAULT_COMPACTION_PARALLELISM: u32 = 1; + #[admin_fn( name = FlushTableFunction, display_name = flush_table, @@ -95,7 +97,7 @@ pub(crate) async fn compact_table( query_ctx: &QueryContextRef, params: &[ValueRef<'_>], ) -> Result { - let request = parse_compact_params(params, query_ctx)?; + let request = parse_compact_request(params, query_ctx)?; info!("Compact table request: {:?}", request); let affected_rows = table_mutation_handler @@ -117,8 +119,9 @@ fn compact_signature() -> Signature { /// - `[]`: only tables name provided, using default compaction type: regular /// - `[, ]`: specify table name and compaction type. The compaction options will be default. /// - `[, , ]`: provides both type and type-specific options. -/// - `[, , , ]`: provides type, type-specific options, and parallelism. -fn parse_compact_params( +/// - For `twcs`, it accepts `parallelism=[N]` where N is an unsigned 32 bits number +/// - For `swcs`, it accepts two numeric parameter: `parallelism` and `window`. +fn parse_compact_request( params: &[ValueRef<'_>], query_ctx: &QueryContextRef, ) -> Result { @@ -133,44 +136,28 @@ fn parse_compact_params( ); let (table_name, compact_type, parallelism) = match params { + // 1. Only table name, strategy defaults to twcs and default parallelism. [ValueRef::String(table_name)] => ( table_name, compact_request::Options::Regular(Default::default()), - None, + DEFAULT_COMPACTION_PARALLELISM, ), + // 2. Both table name and strategy are provided. [ ValueRef::String(table_name), ValueRef::String(compact_ty_str), ] => { - let compact_type = parse_compact_type(compact_ty_str, None)?; - (table_name, compact_type, None) - } - - [ - ValueRef::String(table_name), - ValueRef::String(compact_ty_str), - ValueRef::String(options_str), - ] => { - let compact_type = parse_compact_type(compact_ty_str, Some(options_str))?; - (table_name, compact_type, None) + let (compact_type, parallelism) = parse_compact_options(compact_ty_str, None)?; + (table_name, compact_type, parallelism) } - + // 3. Table name, strategy and strategy specific options [ ValueRef::String(table_name), ValueRef::String(compact_ty_str), ValueRef::String(options_str), - ValueRef::String(parallelism_str), ] => { - let compact_type = parse_compact_type(compact_ty_str, Some(options_str))?; - let parallelism = Some(u32::from_str(parallelism_str).map_err(|_| { - InvalidFuncArgsSnafu { - err_msg: format!( - "Parallelism is expected to be a valid number, provided: {}", - parallelism_str - ), - } - .build() - })?); + let (compact_type, parallelism) = + parse_compact_options(compact_ty_str, Some(options_str))?; (table_name, compact_type, parallelism) } _ => { @@ -195,32 +182,122 @@ fn parse_compact_params( }) } -/// Parses compaction strategy type. For `strict_window` or `swcs` strict window compaction is chose, +/// Parses compaction strategy type. For `strict_window` or `swcs` strict window compaction is chosen, /// otherwise choose regular (TWCS) compaction. -fn parse_compact_type(type_str: &str, option: Option<&str>) -> Result { +fn parse_compact_options( + type_str: &str, + option: Option<&str>, +) -> Result<(compact_request::Options, u32)> { if type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW) | type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW_SHORT) { - let window_seconds = option - .map(|v| { - i64::from_str(v).map_err(|_| { - InvalidFuncArgsSnafu { - err_msg: format!( - "Compact window is expected to be a valid number, provided: {}", - v - ), + let Some(option_str) = option else { + return Ok(( + compact_request::Options::StrictWindow(StrictWindow { window_seconds: 0 }), + DEFAULT_COMPACTION_PARALLELISM, + )); + }; + + // For compatibility, accepts single number as window size. + if let Ok(window_seconds) = i64::from_str(option_str) { + return Ok(( + compact_request::Options::StrictWindow(StrictWindow { window_seconds }), + DEFAULT_COMPACTION_PARALLELISM, + )); + }; + + // Parse keyword arguments in forms: `key1=value1,key2=value2` + let mut window_seconds = 0i64; + let mut parallelism = DEFAULT_COMPACTION_PARALLELISM; + + let pairs: Vec<&str> = option_str.split(',').collect(); + for pair in pairs { + let kv: Vec<&str> = pair.trim().split('=').collect(); + if kv.len() != 2 { + return InvalidFuncArgsSnafu { + err_msg: format!("Invalid key-value pair: {}", pair.trim()), + } + .fail(); + } + + let key = kv[0].trim(); + let value = kv[1].trim(); + + match key { + "window" | "window_seconds" => { + window_seconds = i64::from_str(value).map_err(|_| { + InvalidFuncArgsSnafu { + err_msg: format!("Invalid value for window: {}", value), + } + .build() + })?; + } + "parallelism" => { + parallelism = value.parse::().map_err(|_| { + InvalidFuncArgsSnafu { + err_msg: format!("Invalid value for parallelism: {}", value), + } + .build() + })?; + } + _ => { + return InvalidFuncArgsSnafu { + err_msg: format!("Unknown parameter: {}", key), } - .build() - }) - }) - .transpose()? - .unwrap_or(0); - - Ok(compact_request::Options::StrictWindow(StrictWindow { - window_seconds, - })) + .fail(); + } + } + } + + Ok(( + compact_request::Options::StrictWindow(StrictWindow { window_seconds }), + parallelism, + )) } else { - Ok(compact_request::Options::Regular(Default::default())) + // TWCS strategy + let Some(option_str) = option else { + return Ok(( + compact_request::Options::Regular(Default::default()), + DEFAULT_COMPACTION_PARALLELISM, + )); + }; + + let mut parallelism = DEFAULT_COMPACTION_PARALLELISM; + let pairs: Vec<&str> = option_str.split(',').collect(); + for pair in pairs { + let kv: Vec<&str> = pair.trim().split('=').collect(); + if kv.len() != 2 { + return InvalidFuncArgsSnafu { + err_msg: format!("Invalid key-value pair: {}", pair.trim()), + } + .fail(); + } + + let key = kv[0].trim(); + let value = kv[1].trim(); + + match key { + "parallelism" => { + parallelism = value.parse::().map_err(|_| { + InvalidFuncArgsSnafu { + err_msg: format!("Invalid value for parallelism: {}", value), + } + .build() + })?; + } + _ => { + return InvalidFuncArgsSnafu { + err_msg: format!("Unknown parameter: {}", key), + } + .fail(); + } + } + } + + Ok(( + compact_request::Options::Regular(Default::default()), + parallelism, + )) } } @@ -326,7 +403,7 @@ mod tests { assert_eq!( expected, - &parse_compact_params(¶ms, &QueryContext::arc()).unwrap() + &parse_compact_request(¶ms, &QueryContext::arc()).unwrap() ); } } @@ -341,7 +418,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::Regular(Default::default()), - parallelism: None, + parallelism: 1, }, ), ( @@ -351,7 +428,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::Regular(Default::default()), - parallelism: None, + parallelism: 1, }, ), ( @@ -364,7 +441,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::Regular(Default::default()), - parallelism: None, + parallelism: 1, }, ), ( @@ -374,7 +451,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::Regular(Default::default()), - parallelism: None, + parallelism: 1, }, ), ( @@ -384,7 +461,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }), - parallelism: None, + parallelism: 1, }, ), ( @@ -396,58 +473,94 @@ mod tests { compact_options: Options::StrictWindow(StrictWindow { window_seconds: 3600, }), - parallelism: None, + parallelism: 1, }, ), ( - &["table", "regular", "abcd"], + &["table", "swcs", "120"], + CompactTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "table".to_string(), + compact_options: Options::StrictWindow(StrictWindow { + window_seconds: 120, + }), + parallelism: 1, + }, + ), + // Test with parallelism parameter + ( + &["table", "regular", "parallelism=4"], CompactTableRequest { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::Regular(Default::default()), - parallelism: None, + parallelism: 4, }, ), ( - &["table", "swcs", "120"], + &["table", "strict_window", "window=3600,parallelism=2"], CompactTableRequest { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::StrictWindow(StrictWindow { - window_seconds: 120, + window_seconds: 3600, }), - parallelism: None, + parallelism: 2, }, ), - // Test with parallelism parameter ( - &["table", "regular", "options", "4"], + &["table", "strict_window", "window=3600"], CompactTableRequest { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), - compact_options: Options::Regular(Default::default()), - parallelism: Some(4), + compact_options: Options::StrictWindow(StrictWindow { + window_seconds: 3600, + }), + parallelism: 1, }, ), ( - &["table", "strict_window", "3600", "2"], + &["table", "strict_window", "window_seconds=7200"], CompactTableRequest { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table".to_string(), compact_options: Options::StrictWindow(StrictWindow { - window_seconds: 3600, + window_seconds: 7200, + }), + parallelism: 1, + }, + ), + ( + &["table", "strict_window", "window=1800"], + CompactTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "table".to_string(), + compact_options: Options::StrictWindow(StrictWindow { + window_seconds: 1800, }), - parallelism: Some(2), + parallelism: 1, + }, + ), + ( + &["table", "regular", "parallelism=8"], + CompactTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "table".to_string(), + compact_options: Options::Regular(Default::default()), + parallelism: 8, }, ), ]); assert!( - parse_compact_params( + parse_compact_request( &["table", "strict_window", "abc"] .into_iter() .map(ValueRef::String) @@ -458,7 +571,7 @@ mod tests { ); assert!( - parse_compact_params( + parse_compact_request( &["a.b.table", "strict_window", "abc"] .into_iter() .map(ValueRef::String) @@ -470,7 +583,7 @@ mod tests { // Test invalid parallelism assert!( - parse_compact_params( + parse_compact_request( &["table", "regular", "options", "invalid"] .into_iter() .map(ValueRef::String) @@ -482,7 +595,7 @@ mod tests { // Test too many parameters assert!( - parse_compact_params( + parse_compact_request( &["table", "regular", "options", "4", "extra"] .into_iter() .map(ValueRef::String) @@ -491,5 +604,64 @@ mod tests { ) .is_err() ); + + // Test invalid keyword argument format + assert!( + parse_compact_request( + &["table", "strict_window", "window"] + .into_iter() + .map(ValueRef::String) + .collect::>(), + &QueryContext::arc(), + ) + .is_err() + ); + + // Test invalid keyword + assert!( + parse_compact_request( + &["table", "strict_window", "invalid_key=123"] + .into_iter() + .map(ValueRef::String) + .collect::>(), + &QueryContext::arc(), + ) + .is_err() + ); + + assert!( + parse_compact_request( + &["table", "regular", "abcd"] + .into_iter() + .map(ValueRef::String) + .collect::>(), + &QueryContext::arc(), + ) + .is_err() + ); + + // Test invalid window value + assert!( + parse_compact_request( + &["table", "strict_window", "window=abc"] + .into_iter() + .map(ValueRef::String) + .collect::>(), + &QueryContext::arc(), + ) + .is_err() + ); + + // Test invalid parallelism in options string + assert!( + parse_compact_request( + &["table", "strict_window", "parallelism=abc"] + .into_iter() + .map(ValueRef::String) + .collect::>(), + &QueryContext::arc(), + ) + .is_err() + ); } } diff --git a/src/operator/src/request.rs b/src/operator/src/request.rs index edbd1ba9aeb1..1bca4618429f 100644 --- a/src/operator/src/request.rs +++ b/src/operator/src/request.rs @@ -109,7 +109,7 @@ impl Requester { .map(|partition| { RegionRequestBody::Compact(CompactRequest { region_id: partition.id.into(), - parallelism: request.parallelism.unwrap_or(1), + parallelism: request.parallelism, options: Some(request.compact_options), }) }) diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 521aadd4eb0f..6cdf9454805e 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -401,7 +401,7 @@ pub struct CompactTableRequest { pub schema_name: String, pub table_name: String, pub compact_options: compact_request::Options, - pub parallelism: Option, + pub parallelism: u32, } impl Default for CompactTableRequest { @@ -411,7 +411,7 @@ impl Default for CompactTableRequest { schema_name: Default::default(), table_name: Default::default(), compact_options: compact_request::Options::Regular(Default::default()), - parallelism: None, + parallelism: 1, } } } From 05eeb359198b6a81206028fc3136dda29f684a2f Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 14 Oct 2025 16:56:19 +0800 Subject: [PATCH 3/4] feat/manual-compaction-parallelism: ### Update `flush_compact_table.rs` Parameter Validation - Modified parameter validation in `flush_compact_table.rs` to restrict the maximum number of parameters from 4 to 3 in the `parse_compact_request` function. Signed-off-by: Lei, HUANG --- src/common/function/src/admin/flush_compact_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/function/src/admin/flush_compact_table.rs b/src/common/function/src/admin/flush_compact_table.rs index 5ff1586917b5..e45f2f0f84d5 100644 --- a/src/common/function/src/admin/flush_compact_table.rs +++ b/src/common/function/src/admin/flush_compact_table.rs @@ -126,7 +126,7 @@ fn parse_compact_request( query_ctx: &QueryContextRef, ) -> Result { ensure!( - !params.is_empty() && params.len() <= 4, + !params.is_empty() && params.len() <= 3, InvalidFuncArgsSnafu { err_msg: format!( "The length of the args is not correct, expect 1-4, have: {}", From dfb4ae8667f39b023c0e06d1ffb975bed64b2bd8 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Thu, 16 Oct 2025 11:09:13 +0800 Subject: [PATCH 4/4] feat/manual-compaction-parallelism: Update `greptime-proto` dependency - Updated the `greptime-proto` dependency to a new revision in both `Cargo.lock` and `Cargo.toml`. Signed-off-by: Lei, HUANG --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b3c408b34510..d32a4832b63d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5325,7 +5325,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=31a2ade77f83c1d95481208daeaba718e2c0d45f#31a2ade77f83c1d95481208daeaba718e2c0d45f" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=d000eedd3739c003bb139aa42cefe05521a60f7d#d000eedd3739c003bb139aa42cefe05521a60f7d" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", diff --git a/Cargo.toml b/Cargo.toml index 09cf76bcc509..a9fafd232cd1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "31a2ade77f83c1d95481208daeaba718e2c0d45f" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "d000eedd3739c003bb139aa42cefe05521a60f7d" } hex = "0.4" http = "1" humantime = "2.1"