Skip to content

Commit e107030

Browse files
authored
chore: add more fields to DdlManagerConfigureContext (GreptimeTeam#7310)
* feat: add more context for configurator * move the flow grpc configure context to plugins crate * move context to plugins crate * add more fields * fix: cargo check * refactor: some * refactor some * adjust context * fix: cargo check * fix: ut
1 parent 18875ee commit e107030

File tree

12 files changed

+101
-42
lines changed

12 files changed

+101
-42
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/cmd/src/flownode.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,19 @@ use std::sync::Arc;
1818
use std::time::Duration;
1919

2020
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
21-
use catalog::CatalogManagerRef;
2221
use catalog::information_extension::DistributedInformationExtension;
2322
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
2423
use clap::Parser;
2524
use client::client_manager::NodeClients;
2625
use common_base::Plugins;
2726
use common_config::{Configurable, DEFAULT_DATA_HOME};
2827
use common_grpc::channel_manager::ChannelConfig;
29-
use common_meta::FlownodeId;
3028
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
3129
use common_meta::heartbeat::handler::HandlerGroupExecutor;
3230
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
3331
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
3432
use common_meta::key::TableMetadataManager;
3533
use common_meta::key::flow::FlowMetadataManager;
36-
use common_meta::kv_backend::KvBackendRef;
3734
use common_stat::ResourceStatImpl;
3835
use common_telemetry::info;
3936
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
@@ -43,6 +40,7 @@ use flow::{
4340
get_flow_auth_options,
4441
};
4542
use meta_client::{MetaClientOptions, MetaClientType};
43+
use plugins::flownode::context::GrpcConfigureContext;
4644
use servers::configurator::GrpcBuilderConfiguratorRef;
4745
use snafu::{OptionExt, ResultExt, ensure};
4846
use tracing_appender::non_blocking::WorkerGuard;
@@ -435,11 +433,3 @@ impl StartCommand {
435433
Ok(Instance::new(flownode, guard))
436434
}
437435
}
438-
439-
/// The context for [`GrpcBuilderConfiguratorRef`] in flownode.
440-
pub struct GrpcConfigureContext {
441-
pub kv_backend: KvBackendRef,
442-
pub fe_client: Arc<FrontendClient>,
443-
pub flownode_id: FlownodeId,
444-
pub catalog_manager: CatalogManagerRef,
445-
}

src/cmd/src/frontend.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ use frontend::frontend::Frontend;
4545
use frontend::heartbeat::HeartbeatTask;
4646
use frontend::instance::builder::FrontendBuilder;
4747
use frontend::server::Services;
48-
use meta_client::{MetaClientOptions, MetaClientRef, MetaClientType};
48+
use meta_client::{MetaClientOptions, MetaClientType};
49+
use plugins::frontend::context::{
50+
CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext,
51+
};
4952
use servers::addrs;
5053
use servers::grpc::GrpcOptions;
5154
use servers::tls::{TlsMode, TlsOption};
@@ -423,9 +426,11 @@ impl StartCommand {
423426
let builder = if let Some(configurator) =
424427
plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
425428
{
426-
let ctx = CatalogManagerConfigureContext {
429+
let ctx = DistributedCatalogManagerConfigureContext {
427430
meta_client: meta_client.clone(),
428431
};
432+
let ctx = CatalogManagerConfigureContext::Distributed(ctx);
433+
429434
configurator
430435
.configure(builder, ctx)
431436
.await
@@ -482,11 +487,6 @@ impl StartCommand {
482487
}
483488
}
484489

485-
/// The context for [`CatalogManagerConfigratorRef`] in frontend.
486-
pub struct CatalogManagerConfigureContext {
487-
pub meta_client: MetaClientRef,
488-
}
489-
490490
#[cfg(test)]
491491
mod tests {
492492
use std::io::Write;

src/cmd/src/standalone.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use common_meta::cache::LayeredCacheRegistryBuilder;
3232
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
3333
use common_meta::ddl::table_meta::TableMetadataAllocator;
3434
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
35-
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef, DdlManagerConfigureContext};
35+
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
3636
use common_meta::key::flow::FlowMetadataManager;
3737
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
3838
use common_meta::kv_backend::KvBackendRef;
@@ -58,6 +58,10 @@ use frontend::instance::StandaloneDatanodeManager;
5858
use frontend::instance::builder::FrontendBuilder;
5959
use frontend::server::Services;
6060
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
61+
use plugins::frontend::context::{
62+
CatalogManagerConfigureContext, StandaloneCatalogManagerConfigureContext,
63+
};
64+
use plugins::standalone::context::DdlManagerConfigureContext;
6165
use servers::tls::{TlsMode, TlsOption};
6266
use snafu::ResultExt;
6367
use standalone::StandaloneInformationExtension;
@@ -414,9 +418,10 @@ impl StartCommand {
414418
let builder = if let Some(configurator) =
415419
plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
416420
{
417-
let ctx = CatalogManagerConfigureContext {
421+
let ctx = StandaloneCatalogManagerConfigureContext {
418422
fe_client: frontend_client.clone(),
419423
};
424+
let ctx = CatalogManagerConfigureContext::Standalone(ctx);
420425
configurator
421426
.configure(builder, ctx)
422427
.await
@@ -506,9 +511,13 @@ impl StartCommand {
506511
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
507512
.context(error::InitDdlManagerSnafu)?;
508513

509-
let ddl_manager = if let Some(configurator) = plugins.get::<DdlManagerConfiguratorRef>() {
514+
let ddl_manager = if let Some(configurator) =
515+
plugins.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>()
516+
{
510517
let ctx = DdlManagerConfigureContext {
511518
kv_backend: kv_backend.clone(),
519+
fe_client: frontend_client.clone(),
520+
catalog_manager: catalog_manager.clone(),
512521
};
513522
configurator
514523
.configure(ddl_manager, ctx)
@@ -595,11 +604,6 @@ impl StartCommand {
595604
}
596605
}
597606

598-
/// The context for [`CatalogManagerConfigratorRef`] in standalone.
599-
pub struct CatalogManagerConfigureContext {
600-
pub fe_client: Arc<FrontendClient>,
601-
}
602-
603607
#[cfg(test)]
604608
mod tests {
605609
use std::default::Default;

src/common/base/src/plugins.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,12 @@ impl Plugins {
3232

3333
pub fn insert<T: 'static + Send + Sync>(&self, value: T) {
3434
let last = self.write().insert(value);
35-
assert!(last.is_none(), "each type of plugins must be one and only");
35+
if last.is_some() {
36+
panic!(
37+
"Plugin of type {} already exists",
38+
std::any::type_name::<T>()
39+
);
40+
}
3641
}
3742

3843
pub fn get<T: 'static + Send + Sync + Clone>(&self) -> Option<T> {
@@ -140,7 +145,7 @@ mod tests {
140145
}
141146

142147
#[test]
143-
#[should_panic(expected = "each type of plugins must be one and only")]
148+
#[should_panic(expected = "Plugin of type i32 already exists")]
144149
fn test_plugin_uniqueness() {
145150
let plugins = Plugins::new();
146151
plugins.insert(1i32);

src/common/meta/src/ddl_manager.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ use crate::error::{
4646
use crate::key::table_info::TableInfoValue;
4747
use crate::key::table_name::TableNameKey;
4848
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
49-
use crate::kv_backend::KvBackendRef;
5049
use crate::procedure_executor::ExecutorContext;
5150
#[cfg(feature = "enterprise")]
5251
use crate::rpc::ddl::DdlTask::CreateTrigger;
@@ -70,20 +69,16 @@ use crate::rpc::router::RegionRoute;
7069

7170
/// A configurator that customizes or enhances a [`DdlManager`].
7271
#[async_trait::async_trait]
73-
pub trait DdlManagerConfigurator: Send + Sync {
72+
pub trait DdlManagerConfigurator<C>: Send + Sync {
7473
/// Configures the given [`DdlManager`] using the provided [`DdlManagerConfigureContext`].
7574
async fn configure(
7675
&self,
7776
ddl_manager: DdlManager,
78-
ctx: DdlManagerConfigureContext,
77+
ctx: C,
7978
) -> std::result::Result<DdlManager, BoxedError>;
8079
}
8180

82-
pub type DdlManagerConfiguratorRef = Arc<dyn DdlManagerConfigurator>;
83-
84-
pub struct DdlManagerConfigureContext {
85-
pub kv_backend: KvBackendRef,
86-
}
81+
pub type DdlManagerConfiguratorRef<C> = Arc<dyn DdlManagerConfigurator<C>>;
8782

8883
pub type DdlManagerRef = Arc<DdlManager>;
8984

src/meta-srv/src/metasrv/builder.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocato
2828
use common_meta::ddl::{
2929
DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
3030
};
31-
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef, DdlManagerConfigureContext};
31+
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
3232
use common_meta::distributed_time_constants::{self};
3333
use common_meta::key::TableMetadataManager;
3434
use common_meta::key::flow::FlowMetadataManager;
@@ -405,10 +405,11 @@ impl MetasrvBuilder {
405405

406406
let ddl_manager = if let Some(configurator) = plugins
407407
.as_ref()
408-
.and_then(|p| p.get::<DdlManagerConfiguratorRef>())
408+
.and_then(|p| p.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>())
409409
{
410410
let ctx = DdlManagerConfigureContext {
411411
kv_backend: kv_backend.clone(),
412+
meta_peer_client: meta_peer_client.clone(),
412413
};
413414
configurator
414415
.configure(ddl_manager, ctx)
@@ -637,3 +638,9 @@ impl Default for MetasrvBuilder {
637638
Self::new()
638639
}
639640
}
641+
642+
/// The context for [`DdlManagerConfiguratorRef`].
643+
pub struct DdlManagerConfigureContext {
644+
pub kv_backend: KvBackendRef,
645+
pub meta_peer_client: MetaPeerClientRef,
646+
}

src/plugins/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ workspace = true
99

1010
[dependencies]
1111
auth.workspace = true
12+
catalog.workspace = true
1213
clap.workspace = true
1314
cli.workspace = true
1415
common-base.workspace = true
@@ -17,6 +18,7 @@ common-meta.workspace = true
1718
datanode.workspace = true
1819
flow.workspace = true
1920
frontend.workspace = true
21+
meta-client.workspace = true
2022
meta-srv.workspace = true
2123
serde.workspace = true
2224
snafu.workspace = true

src/plugins/src/flownode.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,20 @@ pub async fn setup_flownode_plugins(
3030
pub async fn start_flownode_plugins(_plugins: Plugins) -> Result<()> {
3131
Ok(())
3232
}
33+
34+
pub mod context {
35+
use std::sync::Arc;
36+
37+
use catalog::CatalogManagerRef;
38+
use common_meta::FlownodeId;
39+
use common_meta::kv_backend::KvBackendRef;
40+
use flow::FrontendClient;
41+
42+
/// The context for `GrpcBuilderConfiguratorRef` in flownode.
43+
pub struct GrpcConfigureContext {
44+
pub kv_backend: KvBackendRef,
45+
pub fe_client: Arc<FrontendClient>,
46+
pub flownode_id: FlownodeId,
47+
pub catalog_manager: CatalogManagerRef,
48+
}
49+
}

src/plugins/src/frontend.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,25 @@ pub async fn setup_frontend_plugins(
4040
pub async fn start_frontend_plugins(_plugins: Plugins) -> Result<()> {
4141
Ok(())
4242
}
43+
44+
pub mod context {
45+
use std::sync::Arc;
46+
47+
use flow::FrontendClient;
48+
use meta_client::MetaClientRef;
49+
50+
/// The context for [`catalog::kvbackend::CatalogManagerConfiguratorRef`] in standalone or
51+
/// distributed.
52+
pub enum CatalogManagerConfigureContext {
53+
Distributed(DistributedCatalogManagerConfigureContext),
54+
Standalone(StandaloneCatalogManagerConfigureContext),
55+
}
56+
57+
pub struct DistributedCatalogManagerConfigureContext {
58+
pub meta_client: MetaClientRef,
59+
}
60+
61+
pub struct StandaloneCatalogManagerConfigureContext {
62+
pub fe_client: Arc<FrontendClient>,
63+
}
64+
}

0 commit comments

Comments
 (0)