Skip to content

Commit 6c04d86

Browse files
authored
fix: log table transform should keep UTC time (#18059)
* fix: log table transform should keep UTC time * fix: log table transform should keep UTC time * fix: log table transform should keep UTC time
1 parent 1313f64 commit 6c04d86

File tree

3 files changed

+10
-12
lines changed

3 files changed

+10
-12
lines changed

src/common/tracing/src/predefined_tables/history_tables.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,26 @@
22
name = "log_history"
33
target = ""
44
create = "CREATE TABLE IF NOT EXISTS system_history.log_history (timestamp TIMESTAMP NULL, path STRING NULL, target STRING NULL, log_level STRING NULL, cluster_id STRING NULL, node_id STRING NULL, warehouse_id STRING NULL, query_id STRING NULL, message STRING NULL, fields VARIANT NULL, batch_number Int64) CLUSTER BY LINEAR(timestamp, query_id)"
5-
transform = "COPY INTO system_history.log_history FROM (SELECT timestamp, path, target, log_level, cluster_id,node_id, warehouse_id, query_id, message, fields, {batch_number} FROM @{stage_name}) file_format = (TYPE = PARQUET) PURGE = TRUE"
5+
transform = "settings (timezone='Etc/UTC') COPY INTO system_history.log_history FROM (SELECT timestamp, path, target, log_level, cluster_id,node_id, warehouse_id, query_id, message, fields, {batch_number} FROM @{stage_name}) file_format = (TYPE = PARQUET) PURGE = TRUE"
66
delete = "DELETE FROM system_history.log_history WHERE timestamp < subtract_hours(NOW(), {retention_hours})"
77

88
[[tables]]
99
name = "query_history"
1010
target = "databend::log::query"
1111
create = "CREATE TABLE IF NOT EXISTS system_history.query_history (log_type Int8 NULL, log_type_name STRING NULL, handler_type STRING NULL, tenant_id STRING NULL, cluster_id STRING NULL, node_id STRING NULL, sql_user STRING NULL, sql_user_quota STRING NULL, sql_user_privileges STRING NULL, query_id STRING NULL, query_kind STRING NULL, query_text STRING NULL, query_hash STRING NULL, query_parameterized_hash STRING NULL, event_date DATE NULL, event_time TIMESTAMP NULL, query_start_time TIMESTAMP NULL, query_duration_ms Int64 NULL, query_queued_duration_ms Int64 NULL, current_database STRING NULL, written_rows UInt64 NULL, written_bytes UInt64 NULL, join_spilled_rows UInt64 NULL, join_spilled_bytes UInt64 NULL, agg_spilled_rows UInt64 NULL, agg_spilled_bytes UInt64 NULL, group_by_spilled_rows UInt64 NULL, group_by_spilled_bytes UInt64 NULL, written_io_bytes UInt64 NULL, written_io_bytes_cost_ms UInt64 NULL, scan_rows UInt64 NULL, scan_bytes UInt64 NULL, scan_io_bytes UInt64 NULL, scan_io_bytes_cost_ms UInt64 NULL, scan_partitions UInt64 NULL, total_partitions UInt64 NULL, result_rows UInt64 NULL, result_bytes UInt64 NULL, bytes_from_remote_disk UInt64 NULL, bytes_from_local_disk UInt64 NULL, bytes_from_memory UInt64 NULL, client_address STRING NULL, user_agent STRING NULL, exception_code Int32 NULL, exception_text STRING NULL, server_version STRING NULL, query_tag STRING NULL, has_profile BOOLEAN NULL, peek_memory_usage VARIANT NULL) CLUSTER BY LINEAR(event_time, query_id)"
12-
transform = "INSERT INTO system_history.query_history FROM (SELECT m['log_type'], m['log_type_name'], m['handler_type'], m['tenant_id'], m['cluster_id'], m['node_id'], m['sql_user'], m['sql_user_quota'], m['sql_user_privileges'], m['query_id'], m['query_kind'], m['query_text'], m['query_hash'], m['query_parameterized_hash'], m['event_date'], m['event_time'], m['query_start_time'], m['query_duration_ms'], m['query_queued_duration_ms'], m['current_database'], m['written_rows'], m['written_bytes'], m['join_spilled_rows'], m['join_spilled_bytes'], m['agg_spilled_rows'], m['agg_spilled_bytes'], m['group_by_spilled_rows'], m['group_by_spilled_bytes'], m['written_io_bytes'], m['written_io_bytes_cost_ms'], m['scan_rows'], m['scan_bytes'], m['scan_io_bytes'], m['scan_io_bytes_cost_ms'], m['scan_partitions'], m['total_partitions'], m['result_rows'], m['result_bytes'], m['bytes_from_remote_disk'], m['bytes_from_local_disk'], m['bytes_from_memory'], m['client_address'], m['user_agent'], m['exception_code'], m['exception_text'], m['server_version'], m['query_tag'], m['has_profile'], m['peek_memory_usage'] FROM (SELECT parse_json(message) as m FROM system_history.log_history WHERE target='databend::log::query' AND batch_number >= {batch_begin} AND batch_number < {batch_end}))"
12+
transform = "settings (timezone='Etc/UTC') INSERT INTO system_history.query_history FROM (SELECT m['log_type'], m['log_type_name'], m['handler_type'], m['tenant_id'], m['cluster_id'], m['node_id'], m['sql_user'], m['sql_user_quota'], m['sql_user_privileges'], m['query_id'], m['query_kind'], m['query_text'], m['query_hash'], m['query_parameterized_hash'], m['event_date'], m['event_time'], m['query_start_time'], m['query_duration_ms'], m['query_queued_duration_ms'], m['current_database'], m['written_rows'], m['written_bytes'], m['join_spilled_rows'], m['join_spilled_bytes'], m['agg_spilled_rows'], m['agg_spilled_bytes'], m['group_by_spilled_rows'], m['group_by_spilled_bytes'], m['written_io_bytes'], m['written_io_bytes_cost_ms'], m['scan_rows'], m['scan_bytes'], m['scan_io_bytes'], m['scan_io_bytes_cost_ms'], m['scan_partitions'], m['total_partitions'], m['result_rows'], m['result_bytes'], m['bytes_from_remote_disk'], m['bytes_from_local_disk'], m['bytes_from_memory'], m['client_address'], m['user_agent'], m['exception_code'], m['exception_text'], m['server_version'], m['query_tag'], m['has_profile'], m['peek_memory_usage'] FROM (SELECT parse_json(message) as m FROM system_history.log_history WHERE target='databend::log::query' AND batch_number >= {batch_begin} AND batch_number < {batch_end}))"
1313
delete = "DELETE FROM system_history.query_history WHERE event_time < subtract_hours(NOW(), {retention_hours})"
1414

1515
[[tables]]
1616
name = "profile_history"
1717
target = "databend::log::profile"
1818
create = "CREATE TABLE IF NOT EXISTS system_history.profile_history (timestamp TIMESTAMP NULL, query_id VARCHAR NULL, profiles VARIANT NULL, statistics_desc VARIANT NULL) CLUSTER BY (timestamp, query_id)"
19-
transform = "INSERT INTO system_history.profile_history FROM (SELECT timestamp, m['query_id'], m['profiles'], m['statistics_desc'] FROM (SELECT timestamp, parse_json(message) as m FROM system_history.log_history WHERE target='databend::log::profile' AND batch_number >= {batch_begin} AND batch_number < {batch_end}))"
19+
transform = "settings (timezone='Etc/UTC') INSERT INTO system_history.profile_history FROM (SELECT timestamp, m['query_id'], m['profiles'], m['statistics_desc'] FROM (SELECT timestamp, parse_json(message) as m FROM system_history.log_history WHERE target='databend::log::profile' AND batch_number >= {batch_begin} AND batch_number < {batch_end}))"
2020
delete = "DELETE FROM system_history.profile_history WHERE timestamp < subtract_hours(NOW(), {retention_hours})"
2121

2222
[[tables]]
2323
name = "login_history"
2424
target = "databend::log::login"
2525
create = "CREATE TABLE IF NOT EXISTS system_history.login_history (event_time TIMESTAMP NULL, handler STRING NULL, event_type STRING NULL, connection_uri STRING NULL, auth_type STRING NULL, user_name STRING NULL, client_ip STRING NULL, user_agent STRING NULL, session_id STRING NULL, node_id STRING NULL, error_message STRING NULL)"
26-
transform = "INSERT INTO system_history.login_history FROM (SELECT m['event_time'], m['handler'], m['event_type'], m['connection_uri'], m['auth_type'], m['user_name'], m['client_ip'], m['user_agent'], m['session_id'], m['node_id'], m['error_message'] FROM (SELECT parse_json(message) as m FROM system_history.log_history WHERE target='databend::log::login' AND batch_number >= {batch_begin} AND batch_number < {batch_end}))"
26+
transform = "settings (timezone='Etc/UTC') INSERT INTO system_history.login_history FROM (SELECT m['event_time'], m['handler'], m['event_type'], m['connection_uri'], m['auth_type'], m['user_name'], m['client_ip'], m['user_agent'], m['session_id'], m['node_id'], m['error_message'] FROM (SELECT parse_json(message) as m FROM system_history.log_history WHERE target='databend::log::login' AND batch_number >= {batch_begin} AND batch_number < {batch_end}))"
2727
delete = "DELETE FROM system_history.login_history WHERE event_time < subtract_hours(NOW(), {retention_hours})"

src/common/tracing/src/remote_log.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ impl RemoteLog {
217217
let fields = serde_json::to_string(&fields).unwrap_or_default();
218218

219219
let log_level = record.level().to_string();
220-
let timestamp = chrono::Local::now().timestamp_micros();
220+
let timestamp = chrono::Utc::now().timestamp_micros();
221221

222222
let path = format!(
223223
"{}: {}:{}",
@@ -266,7 +266,7 @@ impl LogBuffer {
266266
pub fn new(sender: Sender<LogMessage>, interval: u64) -> Self {
267267
Self {
268268
queue: ConcurrentQueue::unbounded(),
269-
last_collect: AtomicU64::new(chrono::Local::now().timestamp_micros() as u64),
269+
last_collect: AtomicU64::new(chrono::Utc::now().timestamp_micros() as u64),
270270
sender,
271271
interval,
272272
}
@@ -277,12 +277,12 @@ impl LogBuffer {
277277
self.queue.push(log_element)?;
278278
if self.queue.len() >= Self::MAX_BUFFER_SIZE {
279279
self.last_collect.store(
280-
chrono::Local::now().timestamp_micros() as u64,
280+
chrono::Utc::now().timestamp_micros() as u64,
281281
Ordering::SeqCst,
282282
);
283283
self.collect()?;
284284
}
285-
let now = chrono::Local::now().timestamp_micros() as u64;
285+
let now = chrono::Utc::now().timestamp_micros() as u64;
286286
let mut current_last_collect = 0;
287287
loop {
288288
match self.last_collect.compare_exchange_weak(

src/query/service/src/history_tables/global_history_log.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ impl GlobalHistoryLog {
219219
{
220220
Some(v) => {
221221
let last: u64 = serde_json::from_slice(&v.data)?;
222-
chrono::Local::now().timestamp_millis() as u64
222+
chrono::Utc::now().timestamp_millis() as u64
223223
- Duration::from_secs(interval).as_millis() as u64
224224
> last
225225
}
@@ -238,9 +238,7 @@ impl GlobalHistoryLog {
238238
.upsert_kv(UpsertKV::new(
239239
format!("{}/last_timestamp", meta_key),
240240
MatchSeq::Any,
241-
Operation::Update(serde_json::to_vec(
242-
&chrono::Local::now().timestamp_millis(),
243-
)?),
241+
Operation::Update(serde_json::to_vec(&chrono::Utc::now().timestamp_millis())?),
244242
None,
245243
))
246244
.await?;

0 commit comments

Comments
 (0)