Skip to content

Commit 88ef60a

Browse files
authored
fix: prevent history table's log loss during node startup (#18566)
* fix: prevent history table's log loss during node startup before operator initialization * thanks copilot review
1 parent 79ed72d commit 88ef60a

File tree

8 files changed

+69
-13
lines changed

8 files changed

+69
-13
lines changed

.github/actions/test_logs/action.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ runs:
2525
export AWS_EC2_METADATA_DISABLED=true
2626
2727
aws --endpoint-url http://127.0.0.1:9900/ s3 mb s3://testbucket
28-
aws --endpoint-url http://127.0.0.1:9900/ s3 cp tests/data s3://testbucket/data --recursive --no-progress
2928
3029
- name: Run logging Tests
3130
shell: bash

src/common/base/tests/it/time_series/profile.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,26 @@ fn test_finish_flush() {
139139
profile.record(TimeSeriesProfileName::OutputRows, 1);
140140

141141
let batch = profile.flush(true, &mut 4);
142-
143142
assert_eq!(batch.len(), 2);
144-
// [[timestamp, 1000, 1]]
145-
assert_eq!(batch[0][0].len(), 3);
146-
// [[timestamp, 2000, 2]]
147-
assert_eq!(batch[1][0].len(), 3);
143+
// sleep(1) is not exactly 1 second sleep time, but **at least** 1 second.
144+
if batch[0].len() == 1 {
145+
// 99.9%: [[timestamp, 1000, 1]]
146+
assert_eq!(batch[0][0][1], 1000);
147+
assert_eq!(batch[0][0][2], 1);
148+
} else {
149+
// 0.1%: [[timestamp, 1000], [timestamp+2, 1]]
150+
assert_eq!(batch[0][0][1], 1000);
151+
assert_eq!(batch[0][1][1], 1);
152+
}
153+
if batch[1].len() == 1 {
154+
// 99.9%: [[timestamp, 2000, 2]]
155+
assert_eq!(batch[1][0][1], 2000);
156+
assert_eq!(batch[1][0][2], 2);
157+
} else {
158+
// 0.1%: [[timestamp, 2000], [timestamp+2, 2]]
159+
assert_eq!(batch[1][0][1], 2000);
160+
assert_eq!(batch[1][1][1], 2);
161+
}
148162
}
149163

150164
#[test]

src/common/tracing/src/init.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
use std::borrow::Cow;
1616
use std::collections::BTreeMap;
17+
use std::sync::atomic::AtomicBool;
18+
use std::sync::atomic::Ordering;
1719
use std::sync::Arc;
1820

1921
use databend_common_base::base::tokio;
@@ -47,6 +49,7 @@ const HEADER_TRACE_PARENT: &str = "traceparent";
4749
pub struct GlobalLogger {
4850
_drop_guards: Vec<Box<dyn Send + Sync + 'static>>,
4951
pub remote_log_operator: RwLock<Option<Operator>>,
52+
pub ready: AtomicBool,
5053
}
5154

5255
impl GlobalLogger {
@@ -59,10 +62,19 @@ impl GlobalLogger {
5962
let instance = Arc::new(Self {
6063
_drop_guards,
6164
remote_log_operator,
65+
ready: AtomicBool::new(false),
6266
});
6367
GlobalInstance::set(instance);
6468
}
6569

70+
pub fn dummy() -> Arc<GlobalLogger> {
71+
Arc::new(Self {
72+
_drop_guards: Vec::new(),
73+
remote_log_operator: RwLock::new(None),
74+
ready: AtomicBool::new(true),
75+
})
76+
}
77+
6678
pub fn instance() -> Arc<GlobalLogger> {
6779
GlobalInstance::get()
6880
}
@@ -80,6 +92,7 @@ impl GlobalLogger {
8092
pub async fn set_operator(&self, operator: Operator) {
8193
let mut remote_log_operator = self.remote_log_operator.write().await;
8294
*remote_log_operator = Some(operator);
95+
self.ready.store(true, Ordering::SeqCst);
8396
}
8497
}
8598

src/common/tracing/src/remote_log.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,12 @@ impl LogBuffer {
289289
Ordering::SeqCst,
290290
) {
291291
Ok(_) => {
292+
// Ensure operator is ready before collecting logs to prevent data loss during startup.
293+
// If operator is not ready, skip collection this round and keep log entries in the queue
294+
// MAX_BUFFER_SIZE limit can help to prevent unbounded queue growth.
295+
if !GlobalLogger::instance().ready.load(Ordering::SeqCst) {
296+
break;
297+
}
292298
self.collect()?;
293299
break;
294300
}

src/common/tracing/tests/it/remote_log.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ use std::time::Duration;
1818

1919
use async_channel::bounded;
2020
use databend_common_base::base::tokio;
21+
use databend_common_base::base::GlobalInstance;
2122
use databend_common_exception::Result;
2223
use databend_common_tracing::convert_to_batch;
2324
use databend_common_tracing::Config;
25+
use databend_common_tracing::GlobalLogger;
2426
use databend_common_tracing::LogMessage;
2527
use databend_common_tracing::RemoteLog;
2628
use databend_common_tracing::RemoteLogBuffer;
@@ -103,17 +105,18 @@ async fn test_buffer_flush_with_buffer_limit() -> Result<()> {
103105
Ok(())
104106
}
105107

106-
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
107-
async fn test_buffer_flush_with_buffer_interval() -> Result<()> {
108+
#[test]
109+
fn test_buffer_flush_with_buffer_interval() -> Result<()> {
110+
init_global_logger()?;
108111
let (tx, rx) = bounded(10);
109112
let interval = Duration::from_secs(1).as_micros() as u64;
110113
let buffer = Arc::new(RemoteLogBuffer::new(tx.clone(), interval));
111114
for _i in 0..5 {
112115
buffer.log(get_remote_log_elements())?
113116
}
114-
tokio::time::sleep(Duration::from_secs(1)).await;
117+
std::thread::sleep(Duration::from_secs(1));
115118
buffer.log(get_remote_log_elements())?;
116-
let res = rx.recv().await.unwrap();
119+
let res = rx.recv_blocking().unwrap();
117120
if let LogMessage::Flush(elements) = res {
118121
assert_eq!(elements.len(), 6);
119122
}
@@ -152,3 +155,11 @@ async fn test_do_flush() -> Result<()> {
152155

153156
Ok(())
154157
}
158+
159+
fn init_global_logger() -> Result<()> {
160+
let thread = std::thread::current();
161+
GlobalInstance::init_testing(thread.name().unwrap());
162+
let instance = GlobalLogger::dummy();
163+
GlobalInstance::set(instance);
164+
Ok(())
165+
}

tests/logging/check_logs_table.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ function check_query_log() {
2929
fi
3030
}
3131

32-
response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d '{"sql": "set global timezone='Asia/Shanghai'"}')
32+
response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"set global timezone='Asia/Shanghai'\"}")
3333

3434
# Execute the initial query and get the query_id
3535
response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d '{"sql": "select 123"}')

tests/logging/history_table.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
[log.history]
22
on = true
3+
interval = 1
34

45
[[log.history.tables]]
56
table_name = "query_history"
@@ -11,4 +12,4 @@ table_name = "profile_history"
1112
table_name = "login_history"
1213

1314
[[log.history.tables]]
14-
table_name = "access_history"
15+
table_name = "access_history"

tests/logging/test-history-tables.sh

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ echo "Cleaning up previous runs"
2525
killall -9 databend-query || true
2626
killall -9 databend-meta || true
2727
killall -9 vector || true
28-
rm -rf .databend
28+
rm -rf ./.databend
2929

3030
echo "Starting Databend Query cluster with 2 nodes enable history tables"
3131

@@ -98,6 +98,18 @@ else
9898
echo "✓ meta node logs are collected as expected"
9999
fi
100100

101+
startup_check_response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"select count(*) from system_history.log_history where message like 'Ready for connections after%'\"}")
102+
startup_check_response_data=$(echo "$startup_check_response" | jq -r '.data')
103+
if [[ "$startup_check_response_data" != *"2"* ]]; then
104+
echo "ERROR: startup check failed"
105+
debug_info=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"select * from system_history.log_history where message like 'Ready for connections after%'\"}")
106+
echo "$debug_info"
107+
echo "startup_check_response_data: $startup_check_response_data"
108+
exit 1
109+
else
110+
echo "✓ node startup test passed"
111+
fi
112+
101113
# **Internal -> External**: should reset
102114

103115
echo "Add a node with external history table enabled"

0 commit comments

Comments
 (0)