Skip to content

Commit 3cbba37

Browse files
authored
feat: init support collecting meta node logs into history table (#18551)
* feat: support logging meta node to history table * add test * taplo * clippy * change config name * minor fix
1 parent 05753d9 commit 3cbba37

File tree

7 files changed

+192
-17
lines changed

7 files changed

+192
-17
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/meta/binaries/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ databend-common-meta-semaphore = { workspace = true }
3535
databend-common-meta-sled-store = { workspace = true }
3636
databend-common-meta-store = { workspace = true }
3737
databend-common-meta-types = { workspace = true }
38+
databend-common-storage = { workspace = true }
3839
databend-common-tracing = { workspace = true }
3940
databend-common-version = { workspace = true }
4041
databend-meta = { workspace = true }
@@ -48,6 +49,7 @@ fastrace = { workspace = true }
4849
futures = { workspace = true }
4950
log = { workspace = true }
5051
mlua = { workspace = true }
52+
num_cpus = { workspace = true }
5153
rand = { workspace = true }
5254
serde = { workspace = true }
5355
serde_json = { workspace = true }

src/meta/binaries/meta/entry.rs

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,15 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::BTreeMap;
1615
use std::ops::Deref;
1716
use std::sync::Arc;
1817
use std::time::Duration;
1918

2019
use anyerror::AnyError;
20+
use databend_common_base::base::GlobalInstance;
2121
use databend_common_base::base::StopHandle;
2222
use databend_common_base::base::Stoppable;
23+
use databend_common_base::runtime::GlobalIORuntime;
2324
use databend_common_grpc::RpcClientConf;
2425
use databend_common_meta_raft_store::ondisk::OnDisk;
2526
use databend_common_meta_raft_store::ondisk::DATA_VERSION;
@@ -29,8 +30,9 @@ use databend_common_meta_types::node::Node;
2930
use databend_common_meta_types::Cmd;
3031
use databend_common_meta_types::LogEntry;
3132
use databend_common_meta_types::MetaAPIError;
32-
use databend_common_tracing::init_logging;
33+
use databend_common_storage::init_operator;
3334
use databend_common_tracing::set_panic_hook;
35+
use databend_common_tracing::GlobalLogger;
3436
use databend_common_version::DATABEND_COMMIT_VERSION;
3537
use databend_meta::api::GrpcServer;
3638
use databend_meta::api::HttpService;
@@ -61,18 +63,7 @@ pub async fn entry(conf: Config) -> anyhow::Result<()> {
6163

6264
set_panic_hook(binary_version);
6365

64-
// app name format: node_id@cluster_id
65-
let app_name_shuffle = format!(
66-
"databend-meta-{}@{}",
67-
conf.raft_config.id, conf.raft_config.cluster_name
68-
);
69-
let mut log_labels = BTreeMap::new();
70-
log_labels.insert(
71-
"cluster_name".to_string(),
72-
conf.raft_config.cluster_name.clone(),
73-
);
74-
let guards = init_logging(&app_name_shuffle, &conf.log, log_labels);
75-
Box::new(guards).leak();
66+
init_logging_system(&conf).await?;
7667

7768
info!("Databend Meta version: {}", METASRV_COMMIT_VERSION.as_str());
7869
info!(
@@ -137,6 +128,9 @@ pub async fn entry(conf: Config) -> anyhow::Result<()> {
137128
if conf.log.tracing.on {
138129
println!(" Tracing: {}", conf.log.tracing);
139130
}
131+
if conf.log.history.on {
132+
println!(" Storage: {}", conf.log.history.on);
133+
}
140134
let r = &conf.raft_config;
141135
println!("Raft Id: {}; Cluster: {}", r.id, r.cluster_name);
142136
println!(" Dir: {}", r.raft_dir);
@@ -380,6 +374,38 @@ async fn run_cmd(conf: &Config) -> bool {
380374
true
381375
}
382376

377+
async fn init_logging_system(conf: &Config) -> anyhow::Result<()> {
378+
let app_name = format!(
379+
"databend-meta-{}@{}",
380+
conf.raft_config.id, conf.raft_config.cluster_name
381+
);
382+
383+
let log_labels = [
384+
("cluster_name", conf.raft_config.cluster_name.as_str()),
385+
("node_id", &conf.raft_config.id.to_string()),
386+
("cluster_id", conf.raft_config.cluster_name.as_str()),
387+
]
388+
.into_iter()
389+
.map(|(k, v)| (k.to_string(), v.to_string()))
390+
.collect();
391+
392+
GlobalInstance::init_production();
393+
GlobalLogger::init(&app_name, &conf.log, log_labels);
394+
395+
if conf.log.history.on {
396+
GlobalIORuntime::init(num_cpus::get())?;
397+
398+
let params = conf.log.history.storage_params.as_ref().ok_or_else(|| {
399+
anyhow::anyhow!("Log history is enabled but storage_params is not set")
400+
})?;
401+
402+
let remote_log_op = init_operator(params).map_err(|e| anyhow::anyhow!(e))?;
403+
GlobalLogger::instance().set_operator(remote_log_op).await;
404+
}
405+
406+
Ok(())
407+
}
408+
383409
fn pretty<T>(v: &T) -> Result<String, serde_json::Error>
384410
where T: serde::Serialize {
385411
serde_json::to_string_pretty(v)

src/meta/service/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ async-trait = { workspace = true }
2323
backon = { workspace = true }
2424
clap = { workspace = true }
2525
databend-common-base = { workspace = true }
26+
databend-common-config = { workspace = true }
2627
databend-common-grpc = { workspace = true }
2728
databend-common-http = { workspace = true }
2829
databend-common-meta-api = { workspace = true }
@@ -33,6 +34,7 @@ databend-common-meta-sled-store = { workspace = true }
3334
databend-common-meta-stoerr = { workspace = true }
3435
databend-common-meta-types = { workspace = true }
3536
databend-common-metrics = { workspace = true }
37+
databend-common-storage = { workspace = true }
3638
databend-common-tracing = { workspace = true }
3739
databend-common-version = { workspace = true }
3840
deepsize = { workspace = true }

src/meta/service/src/configs/outer_v0.rs

Lines changed: 99 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ use std::env;
1717
use clap::ArgAction;
1818
use clap::Args;
1919
use clap::Parser;
20+
use databend_common_config::StorageConfig;
2021
use databend_common_meta_raft_store::config::get_default_raft_advertise_host;
2122
use databend_common_meta_raft_store::config::RaftConfig as InnerRaftConfig;
2223
use databend_common_meta_types::MetaStartupError;
24+
use databend_common_storage::StorageConfig as InnerStorageConfig;
2325
use databend_common_tracing::Config as InnerLogConfig;
2426
use databend_common_tracing::FileConfig as InnerFileLogConfig;
25-
use databend_common_tracing::HistoryConfig;
27+
use databend_common_tracing::HistoryConfig as InnerLogHistoryConfig;
2628
use databend_common_tracing::OTLPConfig;
2729
use databend_common_tracing::ProfileLogConfig;
2830
use databend_common_tracing::QueryLogConfig;
@@ -422,6 +424,7 @@ impl Into<Config> for ConfigViaEnv {
422424
stderr_level: self.metasrv_log_stderr_level,
423425
stderr_format: self.metasrv_log_stderr_format,
424426
},
427+
storage: StorageLogConfig::default(),
425428
};
426429

427430
Config {
@@ -671,6 +674,9 @@ pub struct LogConfig {
671674

672675
#[clap(flatten)]
673676
pub stderr: StderrLogConfig,
677+
678+
#[clap(flatten)]
679+
pub storage: StorageLogConfig,
674680
}
675681

676682
impl Default for LogConfig {
@@ -690,7 +696,7 @@ impl Into<InnerLogConfig> for LogConfig {
690696
profile: ProfileLogConfig::default(),
691697
structlog: StructLogConfig::default(),
692698
tracing: TracingConfig::default(),
693-
history: HistoryConfig::default(),
699+
history: self.storage.into(),
694700
}
695701
}
696702
}
@@ -700,6 +706,7 @@ impl From<InnerLogConfig> for LogConfig {
700706
Self {
701707
file: inner.file.into(),
702708
stderr: inner.stderr.into(),
709+
storage: inner.history.into(),
703710
}
704711
}
705712
}
@@ -825,3 +832,93 @@ impl From<InnerStderrLogConfig> for StderrLogConfig {
825832
}
826833
}
827834
}
835+
836+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Args)]
837+
#[serde(default)]
838+
pub struct StorageLogConfig {
839+
#[clap(
840+
long = "log-storage-on", value_name = "VALUE", default_value = "false", action = ArgAction::Set, num_args = 0..=1, require_equals = true, default_missing_value = "true"
841+
)]
842+
#[serde(rename = "on")]
843+
pub log_storage_on: bool,
844+
845+
/// Specifies the interval in seconds for how often the log is flushed
846+
#[clap(
847+
long = "log-storage-interval",
848+
value_name = "VALUE",
849+
default_value = "2"
850+
)]
851+
#[serde(rename = "interval")]
852+
pub log_storage_interval: usize,
853+
854+
/// Specifies the name of the staging area that temporarily holds log data before it is finally copied into the table
855+
///
856+
/// Note:
857+
/// The default value uses an uuid to avoid conflicts with existing stages
858+
#[clap(
859+
long = "log-storage-stage-name",
860+
value_name = "VALUE",
861+
default_value = "log_1f93b76af0bd4b1d8e018667865fbc65"
862+
)]
863+
#[serde(rename = "stage_name")]
864+
pub log_storage_stage_name: String,
865+
866+
/// Log level <DEBUG|INFO|WARN|ERROR>
867+
#[clap(
868+
long = "log-storage-level",
869+
value_name = "VALUE",
870+
default_value = "INFO"
871+
)]
872+
#[serde(rename = "level")]
873+
pub log_storage_level: String,
874+
875+
/// Specify store the log into where
876+
#[clap(skip)]
877+
#[serde(rename = "params")]
878+
pub log_storage_params: StorageConfig,
879+
}
880+
881+
impl Default for StorageLogConfig {
882+
fn default() -> Self {
883+
StorageLogConfig {
884+
log_storage_on: false,
885+
log_storage_interval: 2,
886+
log_storage_stage_name: "log_1f93b76af0bd4b1d8e018667865fbc65".to_string(),
887+
log_storage_level: "INFO".to_string(),
888+
log_storage_params: Default::default(),
889+
}
890+
}
891+
}
892+
893+
#[allow(clippy::from_over_into)]
894+
impl Into<InnerLogHistoryConfig> for StorageLogConfig {
895+
fn into(self) -> InnerLogHistoryConfig {
896+
let storage_params: Option<InnerStorageConfig> =
897+
Some(self.log_storage_params.try_into().unwrap_or_default());
898+
InnerLogHistoryConfig {
899+
on: self.log_storage_on,
900+
interval: self.log_storage_interval,
901+
stage_name: self.log_storage_stage_name,
902+
level: self.log_storage_level,
903+
storage_params: storage_params.map(|cfg| cfg.params),
904+
..Default::default()
905+
}
906+
}
907+
}
908+
909+
impl From<InnerLogHistoryConfig> for StorageLogConfig {
910+
fn from(value: InnerLogHistoryConfig) -> Self {
911+
let inner_storage_config: Option<InnerStorageConfig> =
912+
value.storage_params.map(|params| InnerStorageConfig {
913+
params,
914+
..Default::default()
915+
});
916+
Self {
917+
log_storage_on: value.on,
918+
log_storage_interval: value.interval,
919+
log_storage_stage_name: value.stage_name,
920+
log_storage_level: value.level,
921+
log_storage_params: inner_storage_config.map(Into::into).unwrap_or_default(),
922+
}
923+
}
924+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[log.storage]
2+
on = true
3+
4+
[log.storage.params]
5+
type = "s3"
6+
7+
[log.storage.params.s3]
8+
bucket = "testbucket"
9+
root = "admin"
10+
endpoint_url = "http://127.0.0.1:9900"
11+
access_key_id = "minioadmin"
12+
secret_access_key = "minioadmin"

tests/logging/test-history-tables.sh

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,20 @@
22

33
set -e
44

5+
echo "*************************************"
6+
echo "* Setting STORAGE_TYPE to S3. *"
7+
echo "* *"
8+
echo "* Please make sure that S3 backend *"
9+
echo "* is ready, and configured properly.*"
10+
echo "*************************************"
11+
export STORAGE_TYPE=s3
12+
export STORAGE_S3_BUCKET=testbucket
13+
export STORAGE_S3_ROOT=admin
14+
export STORAGE_S3_ENDPOINT_URL=http://127.0.0.1:9900
15+
export STORAGE_S3_ACCESS_KEY_ID=minioadmin
16+
export STORAGE_S3_SECRET_ACCESS_KEY=minioadmin
17+
export STORAGE_ALLOW_INSECURE=true
18+
519
BUILD_PROFILE="${BUILD_PROFILE:-debug}"
620
SCRIPT_PATH="$(cd "$(dirname "$0")" >/dev/null 2>&1 && pwd)"
721
cd "$SCRIPT_PATH/../../" || exit
@@ -18,10 +32,17 @@ echo "Starting Databend Query cluster with 2 nodes enable history tables"
1832
for node in 1 2; do
1933
CONFIG_FILE="./scripts/ci/deploy/config/databend-query-node-${node}.toml"
2034

21-
echo "Appending history table config to node-${node}"
35+
echo "Appending history table config to query node-${node}"
2236
cat ./tests/logging/history_table.toml >> "$CONFIG_FILE"
2337
done
2438

39+
for node in 1 2 3; do
40+
CONFIG_FILE="./scripts/ci/deploy/config/databend-meta-node-${node}.toml"
41+
42+
echo "Appending history table config to meta node-${node}"
43+
cat ./tests/logging/history_table_meta.toml >> "$CONFIG_FILE"
44+
done
45+
2546
# Start meta cluster (3 nodes - needed for HA)
2647
echo 'Start Meta service HA cluster(3 nodes)...'
2748

@@ -66,6 +87,17 @@ echo "Running test queries to test inner history tables"
6687

6788
response1=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"select * from system_history.log_history where query_id = '${drop_query_id}'\"}")
6889

90+
meta_count_response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"select count(*) from system_history.log_history where message like 'Databend Meta version%'\"}")
91+
92+
meta_count_response_data=$(echo "$meta_count_response" | jq -r '.data')
93+
if [[ "$meta_count_response_data" != *"3"* ]]; then
94+
echo "ERROR: meta node logs is empty"
95+
echo "meta_count_response_data: $meta_count_response_data"
96+
exit 1
97+
else
98+
echo "✓ meta node logs are collected as expected"
99+
fi
100+
69101
# **Internal -> External**: should reset
70102

71103
echo "Add a node with external history table enabled"

0 commit comments

Comments
 (0)