Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions src/bendpy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ fn create_embedded_config(

// Query configuration
conf.query.tenant_id = Tenant::new_literal("python_binding");
conf.query.embedded_mode = true;
conf.query.cluster_id = "".to_string();
conf.query.warehouse_id = "".to_string();
conf.query.common.embedded_mode = true;
conf.query.common.cluster_id = "".to_string();
conf.query.common.warehouse_id = "".to_string();
conf.query.node_id = "embedded_node".to_string();

// Logging configuration
Expand Down Expand Up @@ -136,8 +136,8 @@ fn create_embedded_config(
let total_memory = system.total_memory();

// Set max server memory usage to 80% of system memory
conf.query.max_server_memory_usage = (total_memory as f64 * 0.8) as u64;
conf.query.max_memory_limit_enabled = true;
conf.query.common.max_server_memory_usage = (total_memory as f64 * 0.8) as u64;
conf.query.common.max_memory_limit_enabled = true;

// Enable spill when memory usage exceeds 60% of system memory
conf.spill.global_bytes_limit = (total_memory as f64 * 0.6) as u64;
Expand Down Expand Up @@ -211,8 +211,8 @@ fn create_python_binding_telemetry_payload(config: &InnerConfig) -> serde_json::
"disk_cache_max_bytes": config.cache.disk_cache_config.max_bytes
},
"memory_management": {
"max_server_memory_usage": config.query.max_server_memory_usage,
"max_memory_limit_enabled": config.query.max_memory_limit_enabled,
"max_server_memory_usage": config.query.common.max_server_memory_usage,
"max_memory_limit_enabled": config.query.common.max_memory_limit_enabled,
"spill_enabled": config.spill.global_bytes_limit > 0,
"spill_threshold_bytes": config.spill.global_bytes_limit,
"spill_threshold_percent": 60.0,
Expand Down
62 changes: 33 additions & 29 deletions src/binaries/query/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ pub async fn init_services(conf: &InnerConfig, ee_mode: bool) -> Result<(), Main
async fn precheck_services(conf: &InnerConfig) -> Result<(), MainError> {
let make_error = || "failed to precheck";

if conf.query.max_memory_limit_enabled {
let size = conf.query.max_server_memory_usage as i64;
if conf.query.common.max_memory_limit_enabled {
let size = conf.query.common.max_server_memory_usage as i64;
info!("Set memory limit: {}", size);
GLOBAL_MEM_STAT.set_limit(size, false);
}
Expand Down Expand Up @@ -136,13 +136,13 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
.with_context(make_error)?;
info!(
"Databend query has been registered:{:?}/{:?} to metasrv:{:?}.",
conf.query.warehouse_id, conf.query.cluster_id, conf.meta.endpoints
conf.query.common.warehouse_id, conf.query.common.cluster_id, conf.meta.endpoints
);
}

// RPC API service.
{
let address = conf.query.flight_api_address.clone();
let address = conf.query.common.flight_api_address.clone();
let mut srv = FlightService::create(conf.clone()).with_context(make_error)?;
let listening = srv
.start(address.parse().with_context(make_error)?)
Expand All @@ -154,12 +154,12 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {

// MySQL handler.
{
let hostname = conf.query.mysql_handler_host.clone();
let listening = format!("{}:{}", hostname, conf.query.mysql_handler_port);
let tcp_keepalive_timeout_secs = conf.query.mysql_handler_tcp_keepalive_timeout_secs;
let hostname = conf.query.common.mysql_handler_host.clone();
let listening = format!("{}:{}", hostname, conf.query.common.mysql_handler_port);
let tcp_keepalive_timeout_secs = conf.query.common.mysql_handler_tcp_keepalive_timeout_secs;
let tls_config = MySQLTlsConfig::new(
conf.query.mysql_tls_server_cert.clone(),
conf.query.mysql_tls_server_key.clone(),
conf.query.common.mysql_tls_server_cert.clone(),
conf.query.common.mysql_tls_server_key.clone(),
);

let mut handler = MySQLHandler::create(tcp_keepalive_timeout_secs, tls_config)
Expand All @@ -180,8 +180,11 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {

// ClickHouse HTTP handler.
{
let hostname = conf.query.clickhouse_http_handler_host.clone();
let listening = format!("{}:{}", hostname, conf.query.clickhouse_http_handler_port);
let hostname = conf.query.common.clickhouse_http_handler_host.clone();
let listening = format!(
"{}:{}",
hostname, conf.query.common.clickhouse_http_handler_port
);

let mut srv = HttpHandler::create(HttpHandlerKind::Clickhouse);
let listening = srv
Expand All @@ -199,8 +202,8 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {

// Databend HTTP handler.
{
let hostname = conf.query.http_handler_host.clone();
let listening = format!("{}:{}", hostname, conf.query.http_handler_port);
let hostname = conf.query.common.http_handler_host.clone();
let listening = format!("{}:{}", hostname, conf.query.common.http_handler_port);

let mut srv = HttpHandler::create(HttpHandlerKind::Query);
let listening = srv
Expand All @@ -219,7 +222,7 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
// Metric API service.
{
set_system_version("query", DATABEND_GIT_SEMVER, VERGEN_GIT_SHA);
let address = conf.query.metric_api_address.clone();
let address = conf.query.common.metric_api_address.clone();
let mut srv = MetricService::create();
let listening = srv
.start(address.parse().with_context(make_error)?)
Expand All @@ -231,7 +234,7 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {

// Admin HTTP API service.
{
let address = conf.query.admin_api_address.clone();
let address = conf.query.common.admin_api_address.clone();
let mut srv = AdminService::create(conf);
let listening = srv
.start(address.parse().with_context(make_error)?)
Expand All @@ -245,7 +248,7 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
{
let address = format!(
"{}:{}",
conf.query.flight_sql_handler_host, conf.query.flight_sql_handler_port
conf.query.common.flight_sql_handler_host, conf.query.common.flight_sql_handler_port
);
let mut srv =
FlightSQLServer::create(conf.clone(), &BUILD_INFO).with_context(make_error)?;
Expand Down Expand Up @@ -307,10 +310,10 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
println!();
println!("Memory:");
println!(" limit: {}", {
if conf.query.max_memory_limit_enabled {
if conf.query.common.max_memory_limit_enabled {
format!(
"Memory: server memory limit to {} (bytes)",
conf.query.max_server_memory_usage
conf.query.common.max_server_memory_usage
)
} else {
"unlimited".to_string()
Expand All @@ -336,10 +339,10 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
println!();
println!("Storage: {}", conf.storage.params);
println!("Disk cache:");
println!(" storage: {}", conf.cache.data_cache_storage);
println!(" storage: {:?}", conf.cache.data_cache_storage);
println!(" path: {:?}", conf.cache.disk_cache_config);
println!(
" reload policy: {}",
" reload policy: {:?}",
conf.cache.data_cache_key_reload_policy
);

Expand Down Expand Up @@ -368,38 +371,38 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {

println!();
println!("Admin");
println!(" listened at {}", conf.query.admin_api_address);
println!(" listened at {}", conf.query.common.admin_api_address);
println!("MySQL");
println!(
" listened at {}:{}",
conf.query.mysql_handler_host, conf.query.mysql_handler_port
conf.query.common.mysql_handler_host, conf.query.common.mysql_handler_port
);
println!(
" connect via: mysql -u${{USER}} -p${{PASSWORD}} -h{} -P{}",
conf.query.mysql_handler_host, conf.query.mysql_handler_port
conf.query.common.mysql_handler_host, conf.query.common.mysql_handler_port
);
println!("Databend");
println!(
" listened at {}:{}",
conf.query.http_handler_host, conf.query.http_handler_port
conf.query.common.http_handler_host, conf.query.common.http_handler_port
);

println!(
" usage with args: bendsql -u ${{USER}} -p ${{PASSWORD}} -h {} -P {}",
conf.query.http_handler_host, conf.query.http_handler_port
conf.query.common.http_handler_host, conf.query.common.http_handler_port
);

println!(
" usage with dsn: bendsql --dsn \"databend://${{USER}}:${{PASSWORD}}@{}:{}?sslmode=disable\"",
conf.query.http_handler_host, conf.query.http_handler_port
conf.query.common.http_handler_host, conf.query.common.http_handler_port
);

println!(
" http: {}",
HttpHandlerKind::Query.usage(
format!(
"{}:{}",
conf.query.http_handler_host, conf.query.http_handler_port
conf.query.common.http_handler_host, conf.query.common.http_handler_port
)
.parse()
.with_context(make_error)?
Expand All @@ -421,8 +424,9 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
start_time.elapsed().as_secs_f32()
);

let graceful_shutdown_timeout =
Some(Duration::from_millis(conf.query.shutdown_wait_timeout_ms));
let graceful_shutdown_timeout = Some(Duration::from_millis(
conf.query.common.shutdown_wait_timeout_ms,
));
shutdown_handle
.wait_for_termination_request(graceful_shutdown_timeout)
.await;
Expand Down
11 changes: 9 additions & 2 deletions src/query/catalog/src/catalog/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::Arc;
use chrono::Utc;
use databend_common_base::base::BuildInfoRef;
use databend_common_base::base::GlobalInstance;
use databend_common_config::CatalogConfig;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand Down Expand Up @@ -107,11 +106,19 @@ impl CatalogManager {
// init external catalogs.
let mut external_catalogs = HashMap::default();
for (name, ctl_cfg) in conf.catalogs.iter() {
let CatalogConfig::Hive(hive_ctl_cfg) = ctl_cfg;
let creator = catalog_creators.get(&CatalogType::Hive).ok_or_else(|| {
ErrorCode::BadArguments(format!("unknown catalog type: {:?}", CatalogType::Hive))
})?;

if ctl_cfg.ty.as_str() != "hive" {
return Err(ErrorCode::CatalogNotSupported(format!(
"got unsupported catalog type in config: {}",
ctl_cfg.ty
)));
}

let hive_ctl_cfg = &ctl_cfg.hive;

let ctl_info = CatalogInfo {
id: CatalogIdIdent::new(&tenant, 0).into(),
name_ident: CatalogNameIdent::new(tenant.clone(), name).into(),
Expand Down
1 change: 1 addition & 0 deletions src/query/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ serfig = { workspace = true }
toml = { workspace = true }

[dev-dependencies]
temp-env = { workspace = true }

[lints]
workspace = true
Loading
Loading