Skip to content

Commit 38de9f1

Browse files
authored
fix: query_history missing some columns (#18381)
* fix: query_history missing some columns * fix: query_history missing some columns
1 parent 40136f2 commit 38de9f1

File tree

4 files changed

+19
-9
lines changed

4 files changed

+19
-9
lines changed

src/common/tracing/src/init.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ fn env_filter(level: &str) -> EnvFilter {
108108
EnvFilter::new(
109109
EnvFilterBuilder::new()
110110
.filter(Some("databend::log::query"), LevelFilter::Off)
111+
.filter(Some("databend::log::query::file"), LevelFilter::Off)
111112
.filter(Some("databend::log::profile"), LevelFilter::Off)
112113
.filter(Some("databend::log::structlog"), LevelFilter::Off)
113114
.filter(Some("databend::log::time_series"), LevelFilter::Off)
@@ -288,7 +289,7 @@ pub fn init_logging(
288289
let dispatch = Dispatch::new()
289290
.filter(EnvFilter::new(
290291
EnvFilterBuilder::new()
291-
.filter(Some("databend::log::query"), LevelFilter::Trace),
292+
.filter(Some("databend::log::query::file"), LevelFilter::Trace),
292293
))
293294
.filter(filter_by_thread_tracker())
294295
.append(query_log_file.with_layout(get_layout("identical")));
@@ -314,7 +315,7 @@ pub fn init_logging(
314315
let dispatch = Dispatch::new()
315316
.filter(EnvFilter::new(
316317
EnvFilterBuilder::new()
317-
.filter(Some("databend::log::query"), LevelFilter::Trace),
318+
.filter(Some("databend::log::query::file"), LevelFilter::Trace),
318319
))
319320
.filter(filter_by_thread_tracker())
320321
.append(otel);

src/common/tracing/src/predefined_tables/history_tables.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ delete = "DELETE FROM system_history.log_history WHERE timestamp < subtract_hour
99
[[tables]]
1010
name = "query_history"
1111
target = "databend::log::query"
12-
create = "CREATE TABLE IF NOT EXISTS system_history.query_history (log_type Int8 NULL, log_type_name STRING NULL, handler_type STRING NULL, tenant_id STRING NULL, cluster_id STRING NULL, node_id STRING NULL, sql_user STRING NULL, sql_user_quota STRING NULL, sql_user_privileges STRING NULL, query_id STRING NULL, query_kind STRING NULL, query_text STRING NULL, query_hash STRING NULL, query_parameterized_hash STRING NULL, event_date DATE NULL, event_time TIMESTAMP NULL, query_start_time TIMESTAMP NULL, query_duration_ms Int64 NULL, query_queued_duration_ms Int64 NULL, current_database STRING NULL, written_rows UInt64 NULL, written_bytes UInt64 NULL, join_spilled_rows UInt64 NULL, join_spilled_bytes UInt64 NULL, agg_spilled_rows UInt64 NULL, agg_spilled_bytes UInt64 NULL, group_by_spilled_rows UInt64 NULL, group_by_spilled_bytes UInt64 NULL, written_io_bytes UInt64 NULL, written_io_bytes_cost_ms UInt64 NULL, scan_rows UInt64 NULL, scan_bytes UInt64 NULL, scan_io_bytes UInt64 NULL, scan_io_bytes_cost_ms UInt64 NULL, scan_partitions UInt64 NULL, total_partitions UInt64 NULL, result_rows UInt64 NULL, result_bytes UInt64 NULL, bytes_from_remote_disk UInt64 NULL, bytes_from_local_disk UInt64 NULL, bytes_from_memory UInt64 NULL, client_address STRING NULL, user_agent STRING NULL, exception_code Int32 NULL, exception_text STRING NULL, server_version STRING NULL, query_tag STRING NULL, has_profile BOOLEAN NULL, peek_memory_usage VARIANT NULL, session_id STRING NULL) CLUSTER BY LINEAR(event_time, query_id)"
13-
transform = "settings (timezone='Etc/UTC') MERGE INTO system_history.query_history AS target USING (SELECT f['log_type'] AS log_type, f['log_type_name'] AS log_type_name, f['handler_type'] AS handler_type, f['tenant_id'] AS tenant_id, f['cluster_id'] AS cluster_id, f['node_id'] AS node_id, f['sql_user'] AS sql_user, f['sql_user_quota'] AS sql_user_quota, f['sql_user_privileges'] AS sql_user_privileges, f['query_id'] AS query_id, f['query_kind'] AS query_kind, f['query_text'] AS query_text, f['query_hash'] AS query_hash, f['query_parameterized_hash'] AS query_parameterized_hash, f['event_date'] AS event_date, f['event_time'] AS event_time, f['query_start_time'] AS query_start_time, f['query_duration_ms'] AS query_duration_ms, f['query_queued_duration_ms'] AS query_queued_duration_ms, f['current_database'] AS current_database, f['written_rows'] AS written_rows, f['written_bytes'] AS written_bytes, f['join_spilled_rows'] AS join_spilled_rows, f['join_spilled_bytes'] AS join_spilled_bytes, f['agg_spilled_rows'] AS agg_spilled_rows, f['agg_spilled_bytes'] AS agg_spilled_bytes, f['group_by_spilled_rows'] AS group_by_spilled_rows, f['group_by_spilled_bytes'] AS group_by_spilled_bytes, f['written_io_bytes'] AS written_io_bytes, f['written_io_bytes_cost_ms'] AS written_io_bytes_cost_ms, f['scan_rows'] AS scan_rows, f['scan_bytes'] AS scan_bytes, f['scan_io_bytes'] AS scan_io_bytes, f['scan_io_bytes_cost_ms'] AS scan_io_bytes_cost_ms, f['scan_partitions'] AS scan_partitions, f['total_partitions'] AS total_partitions, f['result_rows'] AS result_rows, f['result_bytes'] AS result_bytes, f['bytes_from_remote_disk'] AS bytes_from_remote_disk, f['bytes_from_local_disk'] AS bytes_from_local_disk, f['bytes_from_memory'] AS bytes_from_memory, f['client_address'] AS client_address, f['user_agent'] AS user_agent, f['exception_code'] AS exception_code, f['exception_text'] AS exception_text, f['server_version'] AS server_version, f['query_tag'] AS query_tag, f['has_profile'] AS has_profile, f['peek_memory_usage'] AS peek_memory_usage, f['session_id'] AS session_id FROM (SELECT ARG_MAX(m, m['log_type']) AS f FROM (SELECT parse_json(message) AS m FROM system_history.log_history WHERE target = 'databend::log::query' AND batch_number >= {batch_begin} AND batch_number < {batch_end}) AS parsed_data GROUP BY m['query_id'])) AS source ON target.query_id = source.query_id WHEN MATCHED AND source.log_type IN (2, 3, 4, 5) THEN UPDATE * WHEN NOT MATCHED THEN INSERT *;"
12+
create = "CREATE TABLE IF NOT EXISTS system_history.query_history (log_type Int8 NULL, log_type_name STRING NULL, handler_type STRING NULL, tenant_id STRING NULL, cluster_id STRING NULL, node_id STRING NULL, sql_user STRING NULL, sql_user_quota STRING NULL, sql_user_privileges STRING NULL, query_id STRING NULL, query_kind STRING NULL, query_text STRING NULL, query_hash STRING NULL, query_parameterized_hash STRING NULL, event_date DATE NULL, event_time TIMESTAMP NULL, query_start_time TIMESTAMP NULL, query_duration_ms Int64 NULL, query_queued_duration_ms Int64 NULL, current_database STRING NULL, written_rows UInt64 NULL, written_bytes UInt64 NULL, join_spilled_rows UInt64 NULL, join_spilled_bytes UInt64 NULL, agg_spilled_rows UInt64 NULL, agg_spilled_bytes UInt64 NULL, group_by_spilled_rows UInt64 NULL, group_by_spilled_bytes UInt64 NULL, written_io_bytes UInt64 NULL, written_io_bytes_cost_ms UInt64 NULL, scan_rows UInt64 NULL, scan_bytes UInt64 NULL, scan_io_bytes UInt64 NULL, scan_io_bytes_cost_ms UInt64 NULL, scan_partitions UInt64 NULL, total_partitions UInt64 NULL, result_rows UInt64 NULL, result_bytes UInt64 NULL, bytes_from_remote_disk UInt64 NULL, bytes_from_local_disk UInt64 NULL, bytes_from_memory UInt64 NULL, client_address STRING NULL, user_agent STRING NULL, exception_code Int32 NULL, exception_text STRING NULL, server_version STRING NULL, query_tag STRING NULL, has_profile BOOLEAN NULL, peek_memory_usage VARIANT NULL, session_id STRING NULL, session_settings STRING NULL) CLUSTER BY LINEAR(event_time, query_id)"
13+
transform = "settings (timezone='Etc/UTC') MERGE INTO system_history.query_history AS target USING (SELECT f['log_type'] AS log_type, f['log_type_name'] AS log_type_name, f['handler_type'] AS handler_type, f['tenant_id'] AS tenant_id, f['cluster_id'] AS cluster_id, f['node_id'] AS node_id, f['sql_user'] AS sql_user, f['sql_user_quota'] AS sql_user_quota, f['sql_user_privileges'] AS sql_user_privileges, f['query_id'] AS query_id, f['query_kind'] AS query_kind, f['query_text'] AS query_text, f['query_hash'] AS query_hash, f['query_parameterized_hash'] AS query_parameterized_hash, f['event_date'] AS event_date, f['event_time'] AS event_time, f['query_start_time'] AS query_start_time, f['query_duration_ms'] AS query_duration_ms, f['query_queued_duration_ms'] AS query_queued_duration_ms, f['current_database'] AS current_database, f['written_rows'] AS written_rows, f['written_bytes'] AS written_bytes, f['join_spilled_rows'] AS join_spilled_rows, f['join_spilled_bytes'] AS join_spilled_bytes, f['agg_spilled_rows'] AS agg_spilled_rows, f['agg_spilled_bytes'] AS agg_spilled_bytes, f['group_by_spilled_rows'] AS group_by_spilled_rows, f['group_by_spilled_bytes'] AS group_by_spilled_bytes, f['written_io_bytes'] AS written_io_bytes, f['written_io_bytes_cost_ms'] AS written_io_bytes_cost_ms, f['scan_rows'] AS scan_rows, f['scan_bytes'] AS scan_bytes, f['scan_io_bytes'] AS scan_io_bytes, f['scan_io_bytes_cost_ms'] AS scan_io_bytes_cost_ms, f['scan_partitions'] AS scan_partitions, f['total_partitions'] AS total_partitions, f['result_rows'] AS result_rows, f['result_bytes'] AS result_bytes, f['bytes_from_remote_disk'] AS bytes_from_remote_disk, f['bytes_from_local_disk'] AS bytes_from_local_disk, f['bytes_from_memory'] AS bytes_from_memory, f['client_address'] AS client_address, f['user_agent'] AS user_agent, f['exception_code'] AS exception_code, f['exception_text'] AS exception_text, f['server_version'] AS server_version, f['query_tag'] AS query_tag, f['has_profile'] AS has_profile, f['peek_memory_usage'] AS peek_memory_usage, f['session_id'] AS session_id, f['session_settings'] as session_settings FROM (SELECT ARG_MAX(m, m['log_type']) AS f FROM (SELECT parse_json(message) AS m FROM system_history.log_history WHERE target = 'databend::log::query' AND batch_number >= {batch_begin} AND batch_number < {batch_end}) AS parsed_data GROUP BY m['query_id'])) AS source ON target.query_id = source.query_id WHEN MATCHED AND source.log_type IN (2, 3, 4, 5) THEN UPDATE * WHEN NOT MATCHED THEN INSERT *;"
1414
delete = "DELETE FROM system_history.query_history WHERE event_time < subtract_hours(NOW(), {retention_hours})"
1515

1616
[[tables]]

src/query/service/src/interpreters/common/query_log.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,19 @@ fn error_fields<C>(log_type: LogType, err: Option<ErrorCode<C>>) -> (LogType, i3
6464
}
6565

6666
impl InterpreterQueryLog {
67-
fn write_log(event: QueryLogElement) -> Result<()> {
67+
fn write_log(mut event: QueryLogElement) -> Result<()> {
68+
// log the query event in the system_history.query_history table
6869
let event_str = serde_json::to_string(&event)?;
69-
// log the query log in JSON format
7070
info!(target: "databend::log::query", "{}", event_str);
71+
72+
// log the query event in `query-details` log file
73+
// remove some fields to keep tidy in the log file
74+
event.session_settings.clear();
75+
event.sql_user_quota.clear();
76+
event.sql_user_privileges.clear();
77+
let event_str = serde_json::to_string(&event)?;
78+
info!(target: "databend::log::query::file", "{}", event_str);
79+
7180
// log the query event in the system log
7281
info!("query: {} becomes {:?}", event.query_id, event.log_type);
7382
Ok(())

src/query/storages/system/src/query_log_table.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,9 @@ pub struct QueryLogElement {
8686
pub node_id: String,
8787
pub sql_user: String,
8888

89-
#[serde(skip_serializing)]
89+
#[serde(skip_serializing_if = "String::is_empty")]
9090
pub sql_user_quota: String,
91-
#[serde(skip_serializing)]
91+
#[serde(skip_serializing_if = "String::is_empty")]
9292
pub sql_user_privileges: String,
9393

9494
// Query.
@@ -154,7 +154,7 @@ pub struct QueryLogElement {
154154

155155
// Session
156156
pub query_tag: String,
157-
#[serde(skip_serializing)]
157+
#[serde(skip_serializing_if = "String::is_empty")]
158158
pub session_settings: String,
159159

160160
// Extra.

0 commit comments

Comments
 (0)