Skip to content

Commit bdfbe3c

Browse files
authored
refactor: query_history table should only have one record per query (#18098)
* use MERGE instead of INSERT * error retry should have limited times * add config `log_only` into HistoryConfig
1 parent 96461c2 commit bdfbe3c

File tree

7 files changed

+41
-15
lines changed

7 files changed

+41
-15
lines changed

src/binaries/query/entry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
291291
if conf.log.structlog.on {
292292
println!(" structlog: {}", conf.log.structlog);
293293
}
294-
if conf.log.history.on {
294+
if conf.log.history.on && !conf.log.history.log_only {
295295
GlobalHistoryLog::instance().initialized();
296296
println!(" system history tables: {}", conf.log.history);
297297
}

src/common/tracing/src/config.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ impl Default for OTLPEndpointConfig {
340340
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)]
341341
pub struct HistoryConfig {
342342
pub on: bool,
343+
pub log_only: bool,
343344
pub interval: usize,
344345
pub stage_name: String,
345346
pub level: String,
@@ -357,8 +358,9 @@ impl Display for HistoryConfig {
357358
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
358359
write!(
359360
f,
360-
"enabled={}, interval={}, stage_name={}, level={}, retention_interval={}, tables=[{}]",
361+
"enabled={}, log_only={}, interval={}, stage_name={}, level={}, retention_interval={}, tables=[{}]",
361362
self.on,
363+
self.log_only,
362364
self.interval,
363365
self.stage_name,
364366
self.level,
@@ -376,6 +378,7 @@ impl Default for HistoryConfig {
376378
Self {
377379
on: false,
378380
interval: 2,
381+
log_only: false,
379382
// The default value of stage name uses an uuid to avoid conflicts with existing stages
380383
stage_name: "log_1f93b76af0bd4b1d8e018667865fbc65".to_string(),
381384
level: "WARN".to_string(),

src/common/tracing/src/predefined_tables/history_tables.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ delete = "DELETE FROM system_history.log_history WHERE timestamp < subtract_hour
1010
name = "query_history"
1111
target = "databend::log::query"
1212
create = "CREATE TABLE IF NOT EXISTS system_history.query_history (log_type Int8 NULL, log_type_name STRING NULL, handler_type STRING NULL, tenant_id STRING NULL, cluster_id STRING NULL, node_id STRING NULL, sql_user STRING NULL, sql_user_quota STRING NULL, sql_user_privileges STRING NULL, query_id STRING NULL, query_kind STRING NULL, query_text STRING NULL, query_hash STRING NULL, query_parameterized_hash STRING NULL, event_date DATE NULL, event_time TIMESTAMP NULL, query_start_time TIMESTAMP NULL, query_duration_ms Int64 NULL, query_queued_duration_ms Int64 NULL, current_database STRING NULL, written_rows UInt64 NULL, written_bytes UInt64 NULL, join_spilled_rows UInt64 NULL, join_spilled_bytes UInt64 NULL, agg_spilled_rows UInt64 NULL, agg_spilled_bytes UInt64 NULL, group_by_spilled_rows UInt64 NULL, group_by_spilled_bytes UInt64 NULL, written_io_bytes UInt64 NULL, written_io_bytes_cost_ms UInt64 NULL, scan_rows UInt64 NULL, scan_bytes UInt64 NULL, scan_io_bytes UInt64 NULL, scan_io_bytes_cost_ms UInt64 NULL, scan_partitions UInt64 NULL, total_partitions UInt64 NULL, result_rows UInt64 NULL, result_bytes UInt64 NULL, bytes_from_remote_disk UInt64 NULL, bytes_from_local_disk UInt64 NULL, bytes_from_memory UInt64 NULL, client_address STRING NULL, user_agent STRING NULL, exception_code Int32 NULL, exception_text STRING NULL, server_version STRING NULL, query_tag STRING NULL, has_profile BOOLEAN NULL, peek_memory_usage VARIANT NULL, session_id STRING NULL) CLUSTER BY LINEAR(event_time, query_id)"
13-
transform = "settings (timezone='Etc/UTC') INSERT INTO system_history.query_history FROM (SELECT m['log_type'], m['log_type_name'], m['handler_type'], m['tenant_id'], m['cluster_id'], m['node_id'], m['sql_user'], m['sql_user_quota'], m['sql_user_privileges'], m['query_id'], m['query_kind'], m['query_text'], m['query_hash'], m['query_parameterized_hash'], m['event_date'], m['event_time'], m['query_start_time'], m['query_duration_ms'], m['query_queued_duration_ms'], m['current_database'], m['written_rows'], m['written_bytes'], m['join_spilled_rows'], m['join_spilled_bytes'], m['agg_spilled_rows'], m['agg_spilled_bytes'], m['group_by_spilled_rows'], m['group_by_spilled_bytes'], m['written_io_bytes'], m['written_io_bytes_cost_ms'], m['scan_rows'], m['scan_bytes'], m['scan_io_bytes'], m['scan_io_bytes_cost_ms'], m['scan_partitions'], m['total_partitions'], m['result_rows'], m['result_bytes'], m['bytes_from_remote_disk'], m['bytes_from_local_disk'], m['bytes_from_memory'], m['client_address'], m['user_agent'], m['exception_code'], m['exception_text'], m['server_version'], m['query_tag'], m['has_profile'], m['peek_memory_usage'], m['session_id'] FROM (SELECT parse_json(message) as m FROM system_history.log_history WHERE target='databend::log::query' AND batch_number >= {batch_begin} AND batch_number < {batch_end}))"
13+
transform = "settings (timezone='Etc/UTC') MERGE INTO system_history.query_history AS target USING (SELECT f['log_type'] AS log_type, f['log_type_name'] AS log_type_name, f['handler_type'] AS handler_type, f['tenant_id'] AS tenant_id, f['cluster_id'] AS cluster_id, f['node_id'] AS node_id, f['sql_user'] AS sql_user, f['sql_user_quota'] AS sql_user_quota, f['sql_user_privileges'] AS sql_user_privileges, f['query_id'] AS query_id, f['query_kind'] AS query_kind, f['query_text'] AS query_text, f['query_hash'] AS query_hash, f['query_parameterized_hash'] AS query_parameterized_hash, f['event_date'] AS event_date, f['event_time'] AS event_time, f['query_start_time'] AS query_start_time, f['query_duration_ms'] AS query_duration_ms, f['query_queued_duration_ms'] AS query_queued_duration_ms, f['current_database'] AS current_database, f['written_rows'] AS written_rows, f['written_bytes'] AS written_bytes, f['join_spilled_rows'] AS join_spilled_rows, f['join_spilled_bytes'] AS join_spilled_bytes, f['agg_spilled_rows'] AS agg_spilled_rows, f['agg_spilled_bytes'] AS agg_spilled_bytes, f['group_by_spilled_rows'] AS group_by_spilled_rows, f['group_by_spilled_bytes'] AS group_by_spilled_bytes, f['written_io_bytes'] AS written_io_bytes, f['written_io_bytes_cost_ms'] AS written_io_bytes_cost_ms, f['scan_rows'] AS scan_rows, f['scan_bytes'] AS scan_bytes, f['scan_io_bytes'] AS scan_io_bytes, f['scan_io_bytes_cost_ms'] AS scan_io_bytes_cost_ms, f['scan_partitions'] AS scan_partitions, f['total_partitions'] AS total_partitions, f['result_rows'] AS result_rows, f['result_bytes'] AS result_bytes, f['bytes_from_remote_disk'] AS bytes_from_remote_disk, f['bytes_from_local_disk'] AS bytes_from_local_disk, f['bytes_from_memory'] AS bytes_from_memory, f['client_address'] AS client_address, f['user_agent'] AS user_agent, f['exception_code'] AS exception_code, f['exception_text'] AS exception_text, f['server_version'] AS server_version, f['query_tag'] AS query_tag, f['has_profile'] AS has_profile, f['peek_memory_usage'] AS peek_memory_usage, f['session_id'] AS session_id FROM (SELECT ARG_MAX(m, m['log_type']) AS f FROM (SELECT parse_json(message) AS m FROM system_history.log_history WHERE target = 'databend::log::query' AND batch_number >= {batch_begin} AND batch_number < {batch_end}) AS parsed_data GROUP BY m['query_id'])) AS source ON target.query_id = source.query_id WHEN MATCHED AND source.log_type IN (2, 3) THEN UPDATE * WHEN NOT MATCHED THEN INSERT *;"
1414
delete = "DELETE FROM system_history.query_history WHERE event_time < subtract_hours(NOW(), {retention_hours})"
1515

1616
[[tables]]

src/query/config/src/config.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2724,6 +2724,21 @@ pub struct HistoryLogConfig {
27242724
#[serde(rename = "on")]
27252725
pub log_history_on: bool,
27262726

2727+
/// Enables log-only mode for the history feature.
2728+
///
2729+
/// When set to true, this node will only record raw log data to the specified stage,
2730+
/// but will not perform the transform and clean process.
2731+
/// Please make sure that the transform and clean process is handled by other nodes
2732+
/// otherwise the raw log data will not be processed and cleaned up.
2733+
///
2734+
/// Note: This is useful for nodes that should avoid the performance overhead of the
2735+
/// transform and clean process
2736+
#[clap(
2737+
long = "log-history-log-only", value_name = "VALUE", default_value = "false", action = ArgAction::Set, num_args = 0..=1, require_equals = true, default_missing_value = "true"
2738+
)]
2739+
#[serde(rename = "log_only")]
2740+
pub log_history_log_only: bool,
2741+
27272742
/// Specifies the interval in seconds for how often the history log is flushed
27282743
#[clap(
27292744
long = "log-history-interval",
@@ -2819,6 +2834,7 @@ impl TryInto<InnerHistoryConfig> for HistoryLogConfig {
28192834
fn try_into(self) -> Result<InnerHistoryConfig> {
28202835
Ok(InnerHistoryConfig {
28212836
on: self.log_history_on,
2837+
log_only: self.log_history_log_only,
28222838
interval: self.log_history_interval,
28232839
stage_name: self.log_history_stage_name,
28242840
level: self.log_history_level,
@@ -2836,6 +2852,7 @@ impl From<InnerHistoryConfig> for HistoryLogConfig {
28362852
fn from(inner: InnerHistoryConfig) -> Self {
28372853
Self {
28382854
log_history_on: inner.on,
2855+
log_history_log_only: inner.log_only,
28392856
log_history_interval: inner.interval,
28402857
log_history_stage_name: inner.stage_name,
28412858
log_history_level: inner.level,

src/query/service/src/history_tables/global_history_log.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ pub struct GlobalHistoryLog {
7474
impl GlobalHistoryLog {
7575
pub async fn init(cfg: &InnerConfig) -> Result<()> {
7676
setup_operator().await?;
77+
if cfg.log.history.log_only {
78+
info!(
79+
"[HISTORY-TABLES] History tables transform is disabled, only logging is enabled."
80+
);
81+
return Ok(());
82+
}
7783
let meta_client = MetaGrpcClient::try_new(&cfg.meta.to_meta_grpc_client_conf())
7884
.map_err(|_e| ErrorCode::Internal("Create MetaClient failed for SystemHistory"))?;
7985
let stage_name = cfg.log.history.stage_name.clone();
@@ -135,10 +141,12 @@ impl GlobalHistoryLog {
135141
let meta_key = format!("{}/history_log_transform", self.tenant_id).clone();
136142
let log = GlobalHistoryLog::instance();
137143
let handle = spawn(async move {
144+
let mut consecutive_error = 0;
138145
loop {
139146
match log.transform(&table_clone, &meta_key).await {
140147
Ok(acquired_lock) => {
141148
if acquired_lock {
149+
consecutive_error = 0;
142150
let _ = log
143151
.finish_hook(&format!("{}/{}/lock", meta_key, table_clone.name))
144152
.await;
@@ -148,21 +156,18 @@ impl GlobalHistoryLog {
148156
let _ = log
149157
.finish_hook(&format!("{}/{}/lock", meta_key, table_clone.name))
150158
.await;
151-
152-
// BadArguments(1006), if the table schema is changed
153-
// means this node is older version then exit
154-
if e.code() == 1006 {
155-
info!(
156-
"[HISTORY-TABLES] {} log transform failed due to schema changed, exit",
159+
error!(
160+
"[HISTORY-TABLES] {} log transform failed due to {}, retry {}",
161+
table_clone.name, e, consecutive_error
162+
);
163+
consecutive_error += 1;
164+
if consecutive_error > 3 {
165+
error!(
166+
"[HISTORY-TABLES] {} log transform failed too many times, exit",
157167
table_clone.name
158168
);
159169
break;
160170
}
161-
162-
error!(
163-
"[HISTORY-TABLES] {} log transform failed due to {}, retry",
164-
table_clone.name, e
165-
);
166171
}
167172
}
168173
sleep(sleep_time).await;

src/query/service/tests/it/storages/testdata/configs_table_basic.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
4141
| 'log' | 'file.prefix_filter' | 'null' | '' |
4242
| 'log' | 'history.interval' | '2' | '' |
4343
| 'log' | 'history.level' | 'WARN' | '' |
44+
| 'log' | 'history.log_only' | 'false' | '' |
4445
| 'log' | 'history.on' | 'false' | '' |
4546
| 'log' | 'history.retention_interval' | '24' | '' |
4647
| 'log' | 'history.stage_name' | 'log_1f93b76af0bd4b1d8e018667865fbc65' | '' |

tests/logging/check_logs_table.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,4 @@ check_query_log "2" "$query_id" "SELECT count(*) FROM system_history.profile_his
4040
check_query_log "3" "$query_id" "SELECT count(*) FROM system_history.log_history WHERE target = 'databend::log::query' and" "2"
4141

4242
# Test 4
43-
check_query_log "4" "$query_id" "SELECT count(*) FROM system_history.query_history WHERE" "2"
43+
check_query_log "4" "$query_id" "SELECT count(*) FROM system_history.query_history WHERE" "1"

0 commit comments

Comments
 (0)