Skip to content

Commit 567d3e6

Browse files
authored
feat: integrate repartition procedure into DdlManager (#7548)
* feat: add repartition procedure factory support to DdlManager - Introduce RepartitionProcedureFactory trait for creating and registering repartition procedures - Implement DefaultRepartitionProcedureFactory for metasrv with full support - Implement StandaloneRepartitionProcedureFactory for standalone (unsupported) - Add procedure loader registration for RepartitionProcedure and RepartitionGroupProcedure - Add helper methods to TableMetadataAllocator for allocator access - Add error types for repartition procedure operations - Update DdlManager to accept and use RepartitionProcedureFactoryRef Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: integrate repartition procedure into DdlManager - Add submit_repartition_task() to handle repartition from alter table - Route Repartition operations in submit_alter_table_task() to repartition factory - Refactor: rename submit_procedure() to execute_procedure_and_wait() - Make all DDL operations wait for completion by default - Add submit_procedure() for fire-and-forget submissions - Add CreateRepartitionProcedure error type - Add placeholder Repartition handling in grpc-expr (unsupported) - Update greptime-proto dependency Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: implement ALTER TABLE REPARTITION procedure submission Signed-off-by: WenyXu <wenymedia@gmail.com> * refactor(repartition): handle central region in apply staging manifest - Introduce ApplyStagingManifestInstructions struct to organize instructions - Add special handling for central region when applying staging manifests - Transition state from UpdateMetadata to RepartitionEnd after applying staging manifests - Remove next_state() method in RepartitionStart and inline state transitions - Improve logging and expression serialization in DDL statement executor - Move repartition tests from standalone to distributed test suite Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions from CR Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: update proto Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
1 parent 63284a5 commit 567d3e6

File tree

25 files changed

+832
-153
lines changed

25 files changed

+832
-153
lines changed

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ etcd-client = { version = "0.16.1", features = [
151151
fst = "0.4.7"
152152
futures = "0.3"
153153
futures-util = "0.3"
154-
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0e316b86d765e4718d6f0ca77b1ad179f222b822" }
154+
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "58aeee49267fb1eafa6f9123f9d0c47dd0f62722" }
155155
hex = "0.4"
156156
http = "1"
157157
humantime = "2.1"

src/cmd/src/standalone.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ use plugins::frontend::context::{
6464
use plugins::standalone::context::DdlManagerConfigureContext;
6565
use servers::tls::{TlsMode, TlsOption, merge_tls_option};
6666
use snafu::ResultExt;
67-
use standalone::StandaloneInformationExtension;
6867
use standalone::options::StandaloneOptions;
68+
use standalone::{StandaloneInformationExtension, StandaloneRepartitionProcedureFactory};
6969
use tracing_appender::non_blocking::WorkerGuard;
7070

7171
use crate::error::{OtherSnafu, Result, StartFlownodeSnafu};
@@ -509,8 +509,13 @@ impl StartCommand {
509509
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
510510
};
511511

512-
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
513-
.context(error::InitDdlManagerSnafu)?;
512+
let ddl_manager = DdlManager::try_new(
513+
ddl_context,
514+
procedure_manager.clone(),
515+
Arc::new(StandaloneRepartitionProcedureFactory),
516+
true,
517+
)
518+
.context(error::InitDdlManagerSnafu)?;
514519

515520
let ddl_manager = if let Some(configurator) =
516521
plugins.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>()

src/common/grpc-expr/src/alter.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use table::requests::{
3434
};
3535

3636
use crate::error::{
37-
ColumnNotFoundSnafu, InvalidColumnDefSnafu, InvalidIndexOptionSnafu,
37+
self, ColumnNotFoundSnafu, InvalidColumnDefSnafu, InvalidIndexOptionSnafu,
3838
InvalidSetFulltextOptionRequestSnafu, InvalidSetSkippingIndexOptionRequestSnafu,
3939
InvalidSetTableOptionRequestSnafu, InvalidUnsetTableOptionRequestSnafu,
4040
MissingAlterIndexOptionSnafu, MissingFieldSnafu, MissingTableMetaSnafu,
@@ -251,6 +251,10 @@ pub fn alter_expr_to_request(
251251
.collect::<Result<Vec<_>>>()?;
252252
AlterKind::SetDefaults { defaults }
253253
}
254+
Kind::Repartition(_) => error::UnexpectedSnafu {
255+
err_msg: "Repartition operation should be handled through DdlManager and not converted to AlterTableRequest",
256+
}
257+
.fail()?,
254258
};
255259

256260
let request = AlterTableRequest {

src/common/grpc-expr/src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,13 @@ pub enum Error {
161161
#[snafu(implicit)]
162162
location: Location,
163163
},
164+
165+
#[snafu(display("Unexpected: {err_msg}"))]
166+
Unexpected {
167+
err_msg: String,
168+
#[snafu(implicit)]
169+
location: Location,
170+
},
164171
}
165172

166173
pub type Result<T> = std::result::Result<T, Error>;
@@ -188,6 +195,7 @@ impl ErrorExt for Error {
188195
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
189196
Error::SqlCommon { source, .. } => source.status_code(),
190197
Error::MissingTableMeta { .. } => StatusCode::Unexpected,
198+
Error::Unexpected { .. } => StatusCode::Unexpected,
191199
}
192200
}
193201

src/common/meta/src/ddl/alter_table/region_request.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use snafu::OptionExt;
2222
use table::metadata::RawTableInfo;
2323

2424
use crate::ddl::alter_table::AlterTableProcedure;
25-
use crate::error::{InvalidProtoMsgSnafu, Result};
25+
use crate::error::{self, InvalidProtoMsgSnafu, Result};
2626

2727
impl AlterTableProcedure {
2828
/// Makes alter kind proto that all regions can reuse.
@@ -112,6 +112,10 @@ fn create_proto_alter_kind(
112112
Kind::UnsetIndexes(v) => Ok(Some(alter_request::Kind::UnsetIndexes(v.clone()))),
113113
Kind::DropDefaults(v) => Ok(Some(alter_request::Kind::DropDefaults(v.clone()))),
114114
Kind::SetDefaults(v) => Ok(Some(alter_request::Kind::SetDefaults(v.clone()))),
115+
Kind::Repartition(_) => error::UnexpectedSnafu {
116+
err_msg: "Repartition operation should be handled through DdlManager and not converted to AlterTableRequest",
117+
}
118+
.fail()?,
115119
}
116120
}
117121

src/common/meta/src/ddl/table_meta.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,18 @@ impl TableMetadataAllocator {
161161
})
162162
}
163163

164+
/// Returns the table id allocator.
164165
pub fn table_id_allocator(&self) -> ResourceIdAllocatorRef {
165166
self.table_id_allocator.clone()
166167
}
168+
169+
/// Returns the wal options allocator.
170+
pub fn wal_options_allocator(&self) -> WalOptionsAllocatorRef {
171+
self.wal_options_allocator.clone()
172+
}
173+
174+
/// Returns the region routes allocator.
175+
pub fn region_routes_allocator(&self) -> RegionRoutesAllocatorRef {
176+
self.region_routes_allocator.clone()
177+
}
167178
}

0 commit comments

Comments
 (0)