Skip to content

Commit 516a516

Browse files
committed
feat(repartition): implement validation logic for repartition_table
Signed-off-by: WenyXu <[email protected]>
1 parent fce1687 commit 516a516

File tree

4 files changed

+364
-12
lines changed

4 files changed

+364
-12
lines changed

src/operator/src/error.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,14 @@ pub enum Error {
736736
location: Location,
737737
},
738738

739+
#[snafu(display("Failed to deserialize partition expression: {}", source))]
740+
DeserializePartitionExpr {
741+
#[snafu(source)]
742+
source: partition::error::Error,
743+
#[snafu(implicit)]
744+
location: Location,
745+
},
746+
739747
#[snafu(display("Invalid configuration value."))]
740748
InvalidConfigValue {
741749
source: session::session_config::Error,
@@ -973,7 +981,9 @@ impl ErrorExt for Error {
973981
Error::BuildDfLogicalPlan { .. }
974982
| Error::BuildTableMeta { .. }
975983
| Error::MissingInsertBody { .. } => StatusCode::Internal,
976-
Error::ExecuteAdminFunction { .. } | Error::EncodeJson { .. } => StatusCode::Unexpected,
984+
Error::ExecuteAdminFunction { .. }
985+
| Error::EncodeJson { .. }
986+
| Error::DeserializePartitionExpr { .. } => StatusCode::Unexpected,
977987
Error::ViewNotFound { .. }
978988
| Error::ViewInfoNotFound { .. }
979989
| Error::TableNotFound { .. } => StatusCode::TableNotFound,

src/operator/src/statement/ddl.rs

Lines changed: 169 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,14 @@ use table::table_name::TableName;
8383
use crate::error::{
8484
self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
8585
ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
86-
EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu,
87-
InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu,
88-
InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu, PartitionExprToPbSnafu, Result,
89-
SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
90-
TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
86+
DeserializePartitionExprSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
87+
FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
88+
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu,
89+
PartitionExprToPbSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
90+
SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
9191
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
9292
};
93-
use crate::expr_helper;
93+
use crate::expr_helper::{self, RepartitionRequest};
9494
use crate::statement::StatementExecutor;
9595
use crate::statement::show::create_partitions_stmt;
9696

@@ -1262,17 +1262,175 @@ impl StatementExecutor {
12621262
alter_table.alter_operation(),
12631263
AlterTableOperation::Repartition { .. }
12641264
) {
1265-
let _request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1266-
return NotSupportedSnafu {
1267-
feat: "ALTER TABLE REPARTITION",
1268-
}
1269-
.fail();
1265+
let request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1266+
return self.repartition_table(request, &query_context).await;
12701267
}
12711268

12721269
let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
12731270
self.alter_table_inner(expr, query_context).await
12741271
}
12751272

1273+
#[tracing::instrument(skip_all)]
1274+
pub async fn repartition_table(
1275+
&self,
1276+
request: RepartitionRequest,
1277+
query_context: &QueryContextRef,
1278+
) -> Result<Output> {
1279+
// Check if the schema is read-only.
1280+
ensure!(
1281+
!is_readonly_schema(&request.schema_name),
1282+
SchemaReadOnlySnafu {
1283+
name: request.schema_name.clone()
1284+
}
1285+
);
1286+
1287+
let catalog_name = request.catalog_name;
1288+
let schema_name = request.schema_name;
1289+
let table_name = request.table_name;
1290+
// Get the table from the catalog.
1291+
let table = self
1292+
.catalog_manager
1293+
.table(
1294+
&catalog_name,
1295+
&schema_name,
1296+
&table_name,
1297+
Some(query_context),
1298+
)
1299+
.await
1300+
.context(CatalogSnafu)?
1301+
.with_context(|| TableNotFoundSnafu {
1302+
table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1303+
})?;
1304+
let table_id = table.table_info().ident.table_id;
1305+
// Get existing partition expressions from the table route.
1306+
let (physical_table_id, physical_table_route) = self
1307+
.table_metadata_manager
1308+
.table_route_manager()
1309+
.get_physical_table_route(table_id)
1310+
.await
1311+
.context(TableMetadataManagerSnafu)?;
1312+
1313+
ensure!(
1314+
physical_table_id == table_id,
1315+
NotSupportedSnafu {
1316+
feat: "REPARTITION on logical tables"
1317+
}
1318+
);
1319+
1320+
let table_info = table.table_info();
1321+
// Get partition column names from the table metadata.
1322+
let existing_partition_columns: Vec<String> =
1323+
table_info.meta.partition_column_names().cloned().collect();
1324+
// Repartition requires the table to have partition columns.
1325+
ensure!(
1326+
!existing_partition_columns.is_empty(),
1327+
InvalidPartitionRuleSnafu {
1328+
reason: format!(
1329+
"table {}.{}.{} does not have partition columns, cannot repartition",
1330+
catalog_name, schema_name, table_name
1331+
)
1332+
}
1333+
);
1334+
1335+
// Build column name to type mapping for partition columns only.
1336+
let table_schema = &table_info.meta.schema;
1337+
let column_schemas = table_schema.column_schemas();
1338+
// Repartition operations involving columns outside the existing partition columns are not supported.
1339+
// This restriction ensures repartition only applies to current partition columns.
1340+
let column_name_and_type: HashMap<&String, ConcreteDataType> = existing_partition_columns
1341+
.iter()
1342+
.map(|pc| {
1343+
let column = column_schemas
1344+
.iter()
1345+
.find(|c| &c.name == pc)
1346+
// partition column must exist in schema
1347+
.unwrap();
1348+
(&column.name, column.data_type.clone())
1349+
})
1350+
.collect();
1351+
let timezone = query_context.timezone();
1352+
// Convert SQL Exprs to PartitionExprs.
1353+
let from_partition_exprs = request
1354+
.from_exprs
1355+
.iter()
1356+
.map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1357+
.collect::<Result<Vec<_>>>()?;
1358+
1359+
let into_partition_exprs = request
1360+
.into_exprs
1361+
.iter()
1362+
.map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1363+
.collect::<Result<Vec<_>>>()?;
1364+
1365+
// Parse existing partition expressions from region routes.
1366+
let mut existing_partition_exprs =
1367+
Vec::with_capacity(physical_table_route.region_routes.len());
1368+
for route in &physical_table_route.region_routes {
1369+
let expr_json = route.region.partition_expr();
1370+
if !expr_json.is_empty() {
1371+
match PartitionExpr::from_json_str(&expr_json) {
1372+
Ok(Some(expr)) => existing_partition_exprs.push(expr),
1373+
Ok(None) => {
1374+
// Empty
1375+
}
1376+
Err(e) => {
1377+
return Err(e).context(DeserializePartitionExprSnafu);
1378+
}
1379+
}
1380+
}
1381+
}
1382+
1383+
// Validate that from_partition_exprs are a subset of existing partition exprs.
1384+
// We compare PartitionExpr directly since it implements Eq.
1385+
for from_expr in &from_partition_exprs {
1386+
ensure!(
1387+
existing_partition_exprs.contains(from_expr),
1388+
InvalidPartitionRuleSnafu {
1389+
reason: format!(
1390+
"partition expression '{}' does not exist in table {}.{}.{}",
1391+
from_expr, catalog_name, schema_name, table_name
1392+
)
1393+
}
1394+
);
1395+
}
1396+
1397+
// Build the new partition expressions:
1398+
// new_exprs = existing_exprs - from_exprs + into_exprs
1399+
let new_partition_exprs: Vec<PartitionExpr> = existing_partition_exprs
1400+
.into_iter()
1401+
.filter(|expr| !from_partition_exprs.contains(expr))
1402+
.chain(into_partition_exprs.clone().into_iter())
1403+
.collect();
1404+
let new_partition_exprs_len = new_partition_exprs.len();
1405+
1406+
// Validate the new partition expressions using MultiDimPartitionRule and PartitionChecker.
1407+
let _ = MultiDimPartitionRule::try_new(
1408+
existing_partition_columns.clone(),
1409+
vec![],
1410+
new_partition_exprs,
1411+
true,
1412+
)
1413+
.context(InvalidPartitionSnafu)?;
1414+
1415+
info!(
1416+
"Repartition table {}.{}.{} (table_id={}) from {:?} to {:?}, new partition count: {}",
1417+
catalog_name,
1418+
schema_name,
1419+
table_name,
1420+
table_id,
1421+
from_partition_exprs,
1422+
into_partition_exprs,
1423+
new_partition_exprs_len
1424+
);
1425+
1426+
// TODO(weny): Implement the repartition procedure submission.
1427+
// The repartition procedure infrastructure is not yet fully integrated with the DDL task system.
1428+
NotSupportedSnafu {
1429+
feat: "ALTER TABLE REPARTITION",
1430+
}
1431+
.fail()
1432+
}
1433+
12761434
#[tracing::instrument(skip_all)]
12771435
pub async fn alter_table_inner(
12781436
&self,
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
-- Test repartition error cases
2+
-- Setup: Create a physical table with partitions
3+
CREATE TABLE repartition_test_table(
4+
device_id INT,
5+
area STRING,
6+
ts TIMESTAMP TIME INDEX,
7+
PRIMARY KEY(device_id)
8+
) PARTITION ON COLUMNS (device_id) (
9+
device_id < 100,
10+
device_id >= 100 AND device_id < 200,
11+
device_id >= 200
12+
);
13+
14+
Affected Rows: 0
15+
16+
-- Setup: Create a logical table (metric engine)
17+
CREATE TABLE physical_metric_table(
18+
ts TIMESTAMP TIME INDEX,
19+
val DOUBLE
20+
) ENGINE = metric WITH ("physical_metric_table" = "");
21+
22+
Affected Rows: 0
23+
24+
CREATE TABLE logical_metric_table(
25+
ts TIMESTAMP TIME INDEX,
26+
val DOUBLE,
27+
device_id STRING PRIMARY KEY
28+
) ENGINE = metric WITH ("on_physical_table" = "physical_metric_table");
29+
30+
Affected Rows: 0
31+
32+
-- Test 0: Logical table cannot be repartitioned
33+
ALTER TABLE logical_metric_table REPARTITION (
34+
device_id < '100'
35+
) INTO (
36+
device_id < '50',
37+
device_id >= '50' AND device_id < '100'
38+
);
39+
40+
Error: 1001(Unsupported), Not supported: REPARTITION on logical tables
41+
42+
-- Test 1: New partition rule contains non-partition column (ts is not a partition column)
43+
ALTER TABLE repartition_test_table REPARTITION (
44+
device_id < 100
45+
) INTO (
46+
device_id < 50,
47+
device_id >= 50 AND device_id < 100 AND ts < 1000
48+
);
49+
50+
Error: 1004(InvalidArguments), Cannot find column by name: ts
51+
52+
-- Test 2: From partition expr does not exist in existing partition exprs
53+
-- device_id < 50 is not in the existing partitions (which are < 100, >= 100 AND < 200, >= 200)
54+
ALTER TABLE repartition_test_table REPARTITION (
55+
device_id < 50
56+
) INTO (
57+
device_id < 25,
58+
device_id >= 25 AND device_id < 50
59+
);
60+
61+
Error: 1004(InvalidArguments), Invalid partition rule: partition expression 'device_id < 50' does not exist in table greptime.public.repartition_test_table
62+
63+
-- Test 3: New partition rule is incomplete (cannot pass checker)
64+
-- This creates a gap: device_id < 50 and device_id >= 100, missing [50, 100)
65+
-- The existing partitions are: device_id < 100, device_id >= 100 AND device_id < 200, device_id >= 200
66+
-- After removing device_id < 100 and adding device_id < 50 and device_id >= 100, we get:
67+
-- device_id < 50, device_id >= 100 AND device_id < 200, device_id >= 200
68+
-- This leaves a gap [50, 100)
69+
ALTER TABLE repartition_test_table REPARTITION (
70+
device_id < 100
71+
) INTO (
72+
device_id < 50,
73+
device_id >= 100
74+
);
75+
76+
Error: 1004(InvalidArguments), Checkpoint `device_id=50` is not covered
77+
78+
-- Test 4: New partition rule has overlapping partitions
79+
-- This creates overlapping ranges: device_id < 100 and device_id >= 50 AND device_id < 150
80+
-- After removing device_id < 100, we have: device_id >= 100 AND device_id < 200, device_id >= 200
81+
-- Adding the new ones: device_id < 100, device_id >= 50 AND device_id < 150
82+
-- This overlaps: [0, 100) and [50, 150) overlap in [50, 100)
83+
ALTER TABLE repartition_test_table REPARTITION (
84+
device_id < 100
85+
) INTO (
86+
device_id < 100,
87+
device_id >= 50 AND device_id < 150
88+
);
89+
90+
Error: 1004(InvalidArguments), Checkpoint `device_id=50` is overlapped
91+
92+
-- Cleanup
93+
DROP TABLE repartition_test_table;
94+
95+
Affected Rows: 0
96+
97+
DROP TABLE logical_metric_table;
98+
99+
Affected Rows: 0
100+
101+
DROP TABLE physical_metric_table;
102+
103+
Affected Rows: 0
104+

0 commit comments

Comments
 (0)