Skip to content

Commit 02cf646

Browse files
committed
Fix WAL recovery test and improve error handling
- Enable test_recovery by setting WALRUS_DATA_DIR env var - Use test_helpers for proper schema-compatible test batches - Add #[serial] to prevent test isolation issues - Improve error handling in wal.rs persist_topic() - Remove explicit shutdown to avoid premature WAL consumption
1 parent 201449d commit 02cf646

File tree

3 files changed

+42
-26
lines changed

3 files changed

+42
-26
lines changed

src/buffered_write_layer.rs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -560,8 +560,8 @@ impl BufferedWriteLayer {
560560
#[cfg(test)]
561561
mod tests {
562562
use super::*;
563-
use arrow::array::{Int64Array, StringViewArray};
564-
use arrow::datatypes::{DataType, Field, Schema};
563+
use crate::test_utils::test_helpers::{json_to_batch, test_span};
564+
use serial_test::serial;
565565
use std::path::PathBuf;
566566
use tempfile::tempdir;
567567

@@ -571,14 +571,14 @@ mod tests {
571571
Arc::new(cfg)
572572
}
573573

574-
fn create_test_batch() -> RecordBatch {
575-
let schema = Arc::new(Schema::new(vec![
576-
Field::new("id", DataType::Int64, false),
577-
Field::new("name", DataType::Utf8View, false),
578-
]));
579-
let id_array = Int64Array::from(vec![1, 2, 3]);
580-
let name_array = StringViewArray::from(vec!["a", "b", "c"]);
581-
RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)]).unwrap()
574+
fn create_test_batch(project_id: &str) -> RecordBatch {
575+
// Use test_span helper which creates data matching the default schema
576+
json_to_batch(vec![
577+
test_span("test1", "span1", project_id),
578+
test_span("test2", "span2", project_id),
579+
test_span("test3", "span3", project_id),
580+
])
581+
.unwrap()
582582
}
583583

584584
#[tokio::test]
@@ -592,7 +592,7 @@ mod tests {
592592
let table = format!("t{}", test_id);
593593

594594
let layer = BufferedWriteLayer::with_config(cfg).unwrap();
595-
let batch = create_test_batch();
595+
let batch = create_test_batch(&project);
596596

597597
layer.insert(&project, &table, vec![batch.clone()]).await.unwrap();
598598

@@ -601,15 +601,16 @@ mod tests {
601601
assert_eq!(results[0].num_rows(), 3);
602602
}
603603

604-
// NOTE: This test is ignored because walrus-rust creates new files for each instance
605-
// rather than discovering existing files from previous instances in the same directory.
606-
// This is a limitation of the walrus library, not our code.
607-
#[ignore]
604+
#[serial]
608605
#[tokio::test]
609606
async fn test_recovery() {
610607
let dir = tempdir().unwrap();
611608
let cfg = create_test_config(dir.path().to_path_buf());
612609

610+
// SAFETY: walrus-rust reads WALRUS_DATA_DIR from environment. We use #[serial]
611+
// to prevent concurrent access to this process-global state.
612+
unsafe { std::env::set_var("WALRUS_DATA_DIR", &cfg.core.walrus_data_dir) };
613+
613614
// Use unique but short project/table names (walrus has metadata size limit)
614615
let test_id = &uuid::Uuid::new_v4().to_string()[..4];
615616
let project = format!("r{}", test_id);
@@ -618,10 +619,9 @@ mod tests {
618619
// First instance - write data
619620
{
620621
let layer = BufferedWriteLayer::with_config(Arc::clone(&cfg)).unwrap();
621-
let batch = create_test_batch();
622+
let batch = create_test_batch(&project);
622623
layer.insert(&project, &table, vec![batch]).await.unwrap();
623-
// Shutdown to ensure WAL is synced
624-
layer.shutdown().await.unwrap();
624+
// Layer drops here - WAL data should be persisted
625625
}
626626

627627
// Second instance - recover from WAL
@@ -648,7 +648,7 @@ mod tests {
648648
let layer = BufferedWriteLayer::with_config(cfg).unwrap();
649649

650650
// First insert should succeed
651-
let batch = create_test_batch();
651+
let batch = create_test_batch(&project);
652652
layer.insert(&project, &table, vec![batch]).await.unwrap();
653653

654654
// Verify reservation is released (should be 0 after successful insert)

src/database.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1736,7 +1736,9 @@ impl ProjectRoutingTable {
17361736

17371737
// Determine target schema based on projection
17381738
let target_schema = match projection {
1739-
Some(proj) => Arc::new(arrow_schema::Schema::new(proj.iter().map(|&idx| self.schema.field(idx).clone()).collect::<Vec<_>>())),
1739+
Some(proj) => Arc::new(arrow_schema::Schema::new(
1740+
proj.iter().map(|&idx| self.schema.field(idx).clone()).collect::<Vec<_>>(),
1741+
)),
17401742
None => self.schema.clone(),
17411743
};
17421744

src/wal.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -210,10 +210,18 @@ impl WalManager {
210210
fn persist_topic(&self, topic: &str) {
211211
if self.known_topics.insert(topic.to_string()) {
212212
let meta_dir = self.data_dir.join(".timefusion_meta");
213-
let _ = std::fs::create_dir_all(&meta_dir);
214-
if let Ok(mut file) = std::fs::OpenOptions::new().create(true).append(true).open(meta_dir.join("topics")) {
215-
use std::io::Write;
216-
let _ = writeln!(file, "{}", topic);
213+
if let Err(e) = std::fs::create_dir_all(&meta_dir) {
214+
warn!("Failed to create WAL meta dir {:?}: {}", meta_dir, e);
215+
return;
216+
}
217+
match std::fs::OpenOptions::new().create(true).append(true).open(meta_dir.join("topics")) {
218+
Ok(mut file) => {
219+
use std::io::Write;
220+
if let Err(e) = writeln!(file, "{}", topic) {
221+
warn!("Failed to write topic '{}' to index: {}", topic, e);
222+
}
223+
}
224+
Err(e) => warn!("Failed to open topics file: {}", e),
217225
}
218226
}
219227
}
@@ -414,7 +422,10 @@ fn serialize_record_batch(batch: &RecordBatch) -> Result<Vec<u8>, WalError> {
414422

415423
fn deserialize_record_batch(data: &[u8], schema: &SchemaRef) -> Result<RecordBatch, WalError> {
416424
if data.len() > MAX_BATCH_SIZE {
417-
return Err(WalError::BatchTooLarge { size: data.len(), max: MAX_BATCH_SIZE });
425+
return Err(WalError::BatchTooLarge {
426+
size: data.len(),
427+
max: MAX_BATCH_SIZE,
428+
});
418429
}
419430

420431
let (compact, _): (CompactBatch, _) = bincode::decode_from_slice(data, BINCODE_CONFIG)?;
@@ -451,7 +462,10 @@ fn deserialize_wal_entry(data: &[u8]) -> Result<WalEntry, WalError> {
451462
return Err(WalError::TooShort { len: data.len() });
452463
}
453464
if data[4] != WAL_VERSION {
454-
return Err(WalError::UnsupportedVersion { version: data[4], expected: WAL_VERSION });
465+
return Err(WalError::UnsupportedVersion {
466+
version: data[4],
467+
expected: WAL_VERSION,
468+
});
455469
}
456470
WalOperation::try_from(data[5])?;
457471
let (entry, _): (WalEntry, _) = bincode::decode_from_slice(&data[6..], BINCODE_CONFIG)?;

0 commit comments

Comments
 (0)