Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ use panic_logging::SendPanicsToTracing;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::collections::HashMap;
use std::str::FromStr;
use std::{env, num::NonZeroUsize, sync::Arc, time::Duration};
use std::{
env,
num::{NonZeroU64, NonZeroUsize},
sync::Arc,
time::Duration,
};
use std::{path::PathBuf, process::Command};
use thiserror::Error;
use tokio::net::TcpListener;
Expand Down Expand Up @@ -87,6 +92,8 @@ pub const DEFAULT_ADMIN_TOKEN_RECOVERY_BIND_ADDR: &str = "127.0.0.1:8182";

pub const DEFAULT_TELEMETRY_ENDPOINT: &str = "https://telemetry.v3.influxdata.com";

const MIN_SNAPSHOTS_TO_LOAD_ON_START: u64 = 100;

mod cli_params;

#[derive(Debug, Error)]
Expand Down Expand Up @@ -989,9 +996,13 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re
Err(error) => return Err(Error::InitializeCatalog(error)),
};

let n_snapshots_to_load_on_start =
let num_snapshots_to_load =
config.gen1_lookback_duration.as_secs() / gen1_duration.as_duration().as_secs();

let n_snapshots_to_load_on_start =
NonZeroU64::new(MIN_SNAPSHOTS_TO_LOAD_ON_START.max(num_snapshots_to_load))
.expect("n_snapshots_to_load_on_start is always >= 1");

let wal_config = WalConfig {
gen1_duration,
max_write_buffer_size: config.wal_max_write_buffer_size,
Expand All @@ -1012,7 +1023,7 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re
metric_registry: Arc::clone(&metrics),
snapshotted_wal_files_to_keep: config.snapshotted_wal_files_to_keep,
query_file_limit: config.query_file_limit,
n_snapshots_to_load_on_start: n_snapshots_to_load_on_start as usize,
n_snapshots_to_load_on_start,
shutdown: shutdown_manager.register(),
wal_replay_concurrency_limit: config.wal_replay_concurrency_limit,
})
Expand Down
194 changes: 194 additions & 0 deletions influxdb3/tests/server/gen1_lookback_guard.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
use std::fs;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant, SystemTime};

use influxdb3_client::Precision;
use reqwest::StatusCode;
use tempfile::TempDir;
use tokio::time::sleep;

use crate::server::{ConfigProvider, TestServer};

fn list_snapshot_files(object_store_root: &str, node_id: &str) -> Vec<PathBuf> {
let dir = Path::new(object_store_root).join(node_id).join("snapshots");
read_dir_sorted(&dir)
}

fn read_dir_sorted(dir: &Path) -> Vec<PathBuf> {
if !dir.exists() {
return vec![];
}
let mut paths = fs::read_dir(dir)
.unwrap_or_else(|e| panic!("failed to read dir {dir:?}: {e}"))
.filter_map(|entry| entry.ok())
.map(|entry| entry.path())
.filter(|path| path.is_file())
.collect::<Vec<_>>();
paths.sort();
paths
}

fn get_file_mtimes(object_store_root: &str, node_id: &str) -> Vec<(PathBuf, SystemTime)> {
let root = Path::new(object_store_root).join(node_id);
let mut results = vec![];
if root.exists() {
collect_parquet_json_files(&root, &mut results);
}
results.sort_by(|a, b| a.0.cmp(&b.0));
results
}

fn collect_parquet_json_files(dir: &Path, results: &mut Vec<(PathBuf, SystemTime)>) {
if let Ok(entries) = fs::read_dir(dir) {
for entry in entries.filter_map(|e| e.ok()) {
let path = entry.path();
if path.is_dir() {
collect_parquet_json_files(&path, results);
} else if path.is_file() {
let is_parquet_or_json = path
.extension()
.map(|ext| ext == "parquet" || ext == "json")
.unwrap_or(false);
if !is_parquet_or_json {
continue;
}
if let Ok(meta) = fs::metadata(&path)
&& let Ok(mtime) = meta.modified()
{
results.push((path, mtime));
}
}
}
}
}

fn list_wal_files(object_store_root: &str, node_id: &str) -> Vec<PathBuf> {
let wal_dir = Path::new(object_store_root).join(node_id).join("wal");
read_dir_sorted(&wal_dir)
}

async fn wait_for_snapshot_count(
object_store_root: &str,
node_id: &str,
min_count: usize,
timeout: Duration,
) -> Vec<PathBuf> {
let start = Instant::now();
loop {
let files = list_snapshot_files(object_store_root, node_id);
if files.len() >= min_count {
return files;
}
if start.elapsed() > timeout {
panic!(
"timed out waiting for {} snapshots, only found {} in {}/{}/snapshots/",
min_count,
files.len(),
object_store_root,
node_id,
);
}
sleep(Duration::from_millis(100)).await;
}
}

async fn kill_and_wait(server: &mut TestServer) {
server.kill();
let start = Instant::now();
while !server.is_stopped() {
if start.elapsed() > Duration::from_secs(5) {
panic!("timed out waiting for server process to exit after kill");
}
sleep(Duration::from_millis(100)).await;
}
}

const SNAPSHOT_TIMEOUT: Duration = Duration::from_secs(30);
const TOTAL_WRITES: i64 = 10;
const WAL_FILES_TO_KEEP: &str = "1";

#[test_log::test(tokio::test)]
async fn test_lookback_lt_gen1_data_loss_on_restart() {
let tmp_dir = TempDir::new().unwrap();
let obj_store_path = tmp_dir.path().to_str().unwrap().to_string();
let node_id = "lookback-dataloss";

let mut server = TestServer::configure()
.with_node_id(node_id)
.with_object_store_dir(&obj_store_path)
.with_gen1_duration("10m")
.with_gen1_lookback_duration("5m")
.with_snapshotted_wal_files_to_keep(WAL_FILES_TO_KEEP)
.spawn()
.await;

for i in 1_i64..=TOTAL_WRITES {
server
.write_lp_to_db(
"testdb",
&format!("cpu,host=server{i} usage={i}.0 {}", i * 1_000_000_000),
Precision::Nanosecond,
)
.await
.expect("write lp");
sleep(Duration::from_millis(50)).await;
}

wait_for_snapshot_count(&obj_store_path, node_id, 1, SNAPSHOT_TIMEOUT).await;
sleep(Duration::from_secs(3)).await;

let resp = server
.api_v3_query_sql(&[
("db", "testdb"),
("q", "SELECT COUNT(*) as cnt FROM cpu"),
("format", "json"),
])
.await;
assert_eq!(StatusCode::OK, resp.status());
let body = resp.text().await.unwrap();
assert!(
body.contains(&format!("\"cnt\":{TOTAL_WRITES}")),
"expected {TOTAL_WRITES} rows before restart, got: {body}"
);

let wal_files = list_wal_files(&obj_store_path, node_id);
let snap_files = list_snapshot_files(&obj_store_path, node_id);
println!(
"before restart: {} WAL files, {} snapshot files",
wal_files.len(),
snap_files.len()
);
assert!(
wal_files.len() <= WAL_FILES_TO_KEEP.parse::<usize>().unwrap() + 2,
"expected WAL files to be cleaned up to ~{WAL_FILES_TO_KEEP}, got {}",
wal_files.len()
);

let files_before_restart = get_file_mtimes(&obj_store_path, node_id);

kill_and_wait(&mut server).await;

let mut server = TestServer::configure()
.with_node_id(node_id)
.with_object_store_dir(&obj_store_path)
.with_gen1_duration("10m")
.with_gen1_lookback_duration("5m")
.with_snapshotted_wal_files_to_keep(WAL_FILES_TO_KEEP)
.spawn()
.await;

wait_for_snapshot_count(&obj_store_path, node_id, 1, SNAPSHOT_TIMEOUT).await;
sleep(Duration::from_secs(3)).await;

let files_after_restart = get_file_mtimes(&obj_store_path, node_id);
for (path, mtime_before) in &files_before_restart {
if let Some((_, mtime_after)) = files_after_restart.iter().find(|(p, _)| p == path) {
assert_eq!(
mtime_before, mtime_after,
"file was modified after restart: {path:?}"
);
}
}

kill_and_wait(&mut server).await;
}
33 changes: 32 additions & 1 deletion influxdb3/tests/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mod auth;
mod client;
mod configure;
mod flight;
mod gen1_lookback_guard;
mod limits;
mod logs;
mod packages;
Expand Down Expand Up @@ -94,6 +95,8 @@ pub struct TestConfig {
object_store_dir: Option<String>,
disable_authz: Vec<String>,
gen1_duration: Option<String>,
gen1_lookback_duration: Option<String>,
snapshotted_wal_files_to_keep: Option<String>,
capture_logs: bool,
enable_recovery_endpoint: bool,
admin_token_file: Option<String>,
Expand Down Expand Up @@ -182,6 +185,16 @@ impl TestConfig {
self
}

pub fn with_gen1_lookback_duration(mut self, duration: impl Into<String>) -> Self {
self.gen1_lookback_duration = Some(duration.into());
self
}

pub fn with_snapshotted_wal_files_to_keep(mut self, count: impl Into<String>) -> Self {
self.snapshotted_wal_files_to_keep = Some(count.into());
self
}

/// Enable capturing of stdout/stderr logs for this [`TestServer`]
pub fn with_capture_logs(mut self) -> Self {
self.capture_logs = true;
Expand Down Expand Up @@ -280,6 +293,25 @@ impl ConfigProvider for TestConfig {
])
}

if let Some(gen1_lookback_duration) = &self.gen1_lookback_duration {
args.append(&mut vec![
"--gen1-lookback-duration".to_string(),
gen1_lookback_duration.to_owned(),
])
}

args.append(&mut vec![
"--wal-snapshot-size".to_string(),
"1".to_string(),
]);

if let Some(count) = &self.snapshotted_wal_files_to_keep {
args.append(&mut vec![
"--snapshotted-wal-files-to-keep".to_string(),
count.to_owned(),
])
}

if let Some(admin_token_file) = &self.admin_token_file {
args.append(&mut vec![
"--admin-token-file".to_string(),
Expand Down Expand Up @@ -408,7 +440,6 @@ impl TestServer {
.arg("--disable-telemetry-upload")
.args(["--http-bind", "0.0.0.0:0"])
.args(["--wal-flush-interval", "10ms"])
.args(["--wal-snapshot-size", "1"])
.args([
"--tcp-listener-file-path",
tcp_addr_file
Expand Down
36 changes: 16 additions & 20 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use observability_deps::tracing::{debug, info, trace, warn};
use parquet_file::storage::DataSourceExecInput;
use queryable_buffer::QueryableBufferArgs;
use schema::Schema;
use std::{borrow::Borrow, sync::Arc, time::Duration};
use std::{borrow::Borrow, num::NonZeroU64, sync::Arc, time::Duration};
use thiserror::Error;

#[derive(Debug, Error)]
Expand Down Expand Up @@ -169,7 +169,7 @@ pub struct WriteBufferImpl {
}

/// The maximum number of snapshots to load on start
pub const N_SNAPSHOTS_TO_LOAD_ON_START: usize = 1_000;
pub const N_SNAPSHOTS_TO_LOAD_ON_START: NonZeroU64 = NonZeroU64::new(1_000).unwrap();

#[derive(Debug)]
pub struct WriteBufferImplArgs {
Expand All @@ -184,7 +184,7 @@ pub struct WriteBufferImplArgs {
pub metric_registry: Arc<Registry>,
pub snapshotted_wal_files_to_keep: u64,
pub query_file_limit: Option<usize>,
pub n_snapshots_to_load_on_start: usize,
pub n_snapshots_to_load_on_start: NonZeroU64,
pub shutdown: ShutdownToken,
pub wal_replay_concurrency_limit: usize,
}
Expand All @@ -209,22 +209,18 @@ impl WriteBufferImpl {
}: WriteBufferImplArgs,
) -> Result<Arc<Self>> {
// Calculate sequence cutoff based on n_snapshots_to_load_on_start
let sequence_cutoff = if n_snapshots_to_load_on_start > 0 {
match persister.get_latest_snapshot_sequence().await {
Ok(Some(latest)) => {
let cutoff = latest
.as_u64()
.saturating_sub(n_snapshots_to_load_on_start as u64);
Some(SnapshotSequenceNumber::new(cutoff))
}
Ok(None) => None, // No snapshots exist yet
Err(e) => {
warn!(%e, "Failed to get latest snapshot sequence, loading all checkpoints");
None
}
let sequence_cutoff = match persister.get_latest_snapshot_sequence().await {
Ok(Some(latest)) => {
let cutoff = latest
.as_u64()
.saturating_sub(n_snapshots_to_load_on_start.get());
Some(SnapshotSequenceNumber::new(cutoff))
}
Ok(None) => None,
Err(e) => {
warn!(%e, "Failed to get latest snapshot sequence, loading all checkpoints");
None
}
} else {
None // n_snapshots_to_load_on_start = 0 means load all
};

// Try to load from checkpoints first for faster startup
Expand Down Expand Up @@ -281,7 +277,7 @@ impl WriteBufferImpl {
// Load snapshots newer than the checkpoint
let additional_snapshots = if let Some(max_seq) = max_checkpoint_snapshot_seq {
persister
.load_snapshots_after(max_seq, n_snapshots_to_load_on_start)
.load_snapshots_after(max_seq, n_snapshots_to_load_on_start.get() as usize)
.await?
.into_iter()
.map(|psv| match psv {
Expand Down Expand Up @@ -336,7 +332,7 @@ impl WriteBufferImpl {
debug!("No checkpoints found, loading snapshots directly");

let persisted_snapshots = persister
.load_snapshots(n_snapshots_to_load_on_start)
.load_snapshots(n_snapshots_to_load_on_start.get() as usize)
.await?
.into_iter()
.map(|psv| match psv {
Expand Down