Skip to content

Commit e65d4e6

Browse files
author
Devdutt Shenoi
committed
fix: query logs from on-disk arrow files
1 parent fc6d769 commit e65d4e6

File tree

4 files changed

+108
-31
lines changed

4 files changed

+108
-31
lines changed

src/parseable/staging/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@
2020
pub mod reader;
2121
pub mod writer;
2222

23+
/// File extension for "finish"ed arrow files in staging
24+
const ARROW_FILE_EXTENSION: &str = "data.arrows";
25+
26+
/// File extension for un"finish"ed arrow files in staging
27+
const ARROW_PART_FILE_EXTENSION: &str = "part.arrows";
28+
2329
#[derive(Debug, thiserror::Error)]
2430
pub enum StagingError {
2531
#[error("Unable to create recordbatch stream")]

src/parseable/staging/writer.rs

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919

2020
use std::{
2121
collections::{HashMap, HashSet},
22-
fs::File,
22+
fs::{File, OpenOptions},
2323
io::BufWriter,
24+
path::PathBuf,
2425
sync::Arc,
2526
};
2627

@@ -32,10 +33,58 @@ use itertools::Itertools;
3233

3334
use crate::utils::arrow::adapt_batch;
3435

36+
use super::{StagingError, ARROW_FILE_EXTENSION, ARROW_PART_FILE_EXTENSION};
37+
38+
/// Context regarding `.arrows` file being persisted onto disk
39+
pub struct DiskWriter {
40+
inner: FileWriter<BufWriter<File>>,
41+
// Used to ensure un"finish"ed arrow files are renamed on "finish"
42+
path_prefix: PathBuf,
43+
}
44+
45+
impl DiskWriter {
46+
pub fn new(path_prefix: PathBuf, schema: &Schema) -> Result<Self, StagingError> {
47+
// Live writes happen into partfile
48+
let mut partfile_path = path_prefix.clone();
49+
partfile_path.set_extension(ARROW_PART_FILE_EXTENSION);
50+
let file = OpenOptions::new()
51+
.create(true)
52+
.append(true)
53+
.open(partfile_path)?;
54+
55+
Ok(Self {
56+
inner: FileWriter::try_new_buffered(file, schema)
57+
.expect("File and RecordBatch both are checked"),
58+
path_prefix,
59+
})
60+
}
61+
62+
/// Appends records into an `.arrows` file
63+
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), StagingError> {
64+
self.inner.write(batch).map_err(StagingError::Arrow)
65+
}
66+
67+
/// Ensures `.arrows`` file in staging directory is "finish"ed and renames it from "part".
68+
pub fn finish(mut self) -> Result<(), StagingError> {
69+
self.inner.finish()?;
70+
71+
let mut partfile_path = self.path_prefix.clone();
72+
partfile_path.set_extension(ARROW_PART_FILE_EXTENSION);
73+
74+
let mut arrows_path = self.path_prefix;
75+
arrows_path.set_extension(ARROW_FILE_EXTENSION);
76+
77+
// Rename from part file to finished arrows file
78+
std::fs::rename(partfile_path, arrows_path)?;
79+
80+
Ok(())
81+
}
82+
}
83+
3584
#[derive(Default)]
3685
pub struct Writer {
3786
pub mem: MemWriter<16384>,
38-
pub disk: HashMap<String, FileWriter<BufWriter<File>>>,
87+
pub disk: HashMap<String, DiskWriter>,
3988
}
4089

4190
/// Structure to keep recordbatches in memory.

src/parseable/streams.rs

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use std::{
2727
};
2828

2929
use arrow_array::RecordBatch;
30-
use arrow_ipc::writer::FileWriter;
3130
use arrow_schema::{Field, Fields, Schema};
3231
use chrono::{NaiveDateTime, Timelike, Utc};
3332
use derive_more::{Deref, DerefMut};
@@ -41,7 +40,7 @@ use parquet::{
4140
};
4241
use rand::distributions::DistString;
4342
use relative_path::RelativePathBuf;
44-
use tracing::{error, info, trace, warn};
43+
use tracing::{debug, error, info, trace, warn};
4544

4645
use crate::{
4746
cli::Options,
@@ -57,16 +56,18 @@ use crate::{
5756
};
5857

5958
use super::{
60-
staging::{reader::MergedRecordReader, writer::Writer, StagingError},
59+
staging::{
60+
reader::MergedRecordReader,
61+
writer::{DiskWriter, Writer},
62+
StagingError,
63+
},
6164
LogStream,
6265
};
6366

6467
#[derive(Debug, thiserror::Error)]
6568
#[error("Stream not found: {0}")]
6669
pub struct StreamNotFound(pub String);
6770

68-
const ARROW_FILE_EXTENSION: &str = "data.arrows";
69-
7071
pub type StreamRef = Arc<Stream>;
7172

7273
/// All state associated with a single logstream in Parseable.
@@ -116,22 +117,15 @@ impl Stream {
116117
}
117118
None => {
118119
// entry is not present thus we create it
119-
let file_path = self.path_by_current_time(
120+
let path_prefix = self.path_prefix_by_current_time(
120121
schema_key,
121122
parsed_timestamp,
122123
custom_partition_values,
123124
);
124-
std::fs::create_dir_all(&self.data_path)?;
125-
126-
let file = OpenOptions::new()
127-
.create(true)
128-
.append(true)
129-
.open(&file_path)?;
130-
131-
let mut writer = FileWriter::try_new_buffered(file, &record.schema())
132-
.expect("File and RecordBatch both are checked");
133125

126+
let mut writer = DiskWriter::new(path_prefix, &record.schema())?;
134127
writer.write(record)?;
128+
135129
guard.disk.insert(schema_key.to_owned(), writer);
136130
}
137131
};
@@ -142,7 +136,7 @@ impl Stream {
142136
Ok(())
143137
}
144138

145-
pub fn path_by_current_time(
139+
pub fn path_prefix_by_current_time(
146140
&self,
147141
stream_hash: &str,
148142
parsed_timestamp: NaiveDateTime,
@@ -153,7 +147,7 @@ impl Stream {
153147
hostname.push_str(id);
154148
}
155149
let filename = format!(
156-
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}",
150+
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}",
157151
Utc::now().format("%Y%m%dT%H%M"),
158152
parsed_timestamp.date(),
159153
parsed_timestamp.hour(),
@@ -345,16 +339,34 @@ impl Stream {
345339
Ok(())
346340
}
347341

348-
pub fn recordbatches_cloned(&self, schema: &Arc<Schema>) -> Vec<RecordBatch> {
349-
self.writer.lock().unwrap().mem.recordbatch_cloned(schema)
342+
/// Returns records batches as found in staging
343+
pub fn recordbatches_cloned(
344+
&self,
345+
schema: &Arc<Schema>,
346+
time_partition: Option<String>,
347+
) -> Vec<RecordBatch> {
348+
// All records found in memory
349+
let mut records = self.writer.lock().unwrap().mem.recordbatch_cloned(schema);
350+
// Append records batches picked up from `.arrows` files
351+
let arrow_files = self.arrow_files();
352+
let record_reader = MergedRecordReader::new(&arrow_files);
353+
if record_reader.readers.is_empty() {
354+
return vec![];
355+
}
356+
let mut from_file = record_reader
357+
.merged_iter(schema.clone(), time_partition)
358+
.collect();
359+
records.append(&mut from_file);
360+
361+
records
350362
}
351363

352364
pub fn clear(&self) {
353365
self.writer.lock().unwrap().mem.clear();
354366
}
355367

356368
pub fn flush(&self) {
357-
let mut disk_writers = {
369+
let disk_writers = {
358370
let mut writer = self.writer.lock().unwrap();
359371
// Flush memory
360372
writer.mem.clear();
@@ -363,8 +375,12 @@ impl Stream {
363375
};
364376

365377
// Flush disk
366-
for writer in disk_writers.values_mut() {
367-
_ = writer.finish();
378+
for (_, writer) in disk_writers {
379+
if let Err(err) = writer.finish() {
380+
warn!("Couldn't finish `.arrows` file: {err}");
381+
} else {
382+
debug!("Finished `.arrows` file sync onto disk")
383+
}
368384
}
369385
}
370386

@@ -855,16 +871,19 @@ mod tests {
855871
);
856872

857873
let expected_path = staging.data_path.join(format!(
858-
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}",
874+
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}",
859875
Utc::now().format("%Y%m%dT%H%M"),
860876
parsed_timestamp.date(),
861877
parsed_timestamp.hour(),
862878
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
863879
hostname::get().unwrap().into_string().unwrap()
864880
));
865881

866-
let generated_path =
867-
staging.path_by_current_time(stream_hash, parsed_timestamp, &custom_partition_values);
882+
let generated_path = staging.path_prefix_by_current_time(
883+
stream_hash,
884+
parsed_timestamp,
885+
&custom_partition_values,
886+
);
868887

869888
assert_eq!(generated_path, expected_path);
870889
}
@@ -890,16 +909,19 @@ mod tests {
890909
);
891910

892911
let expected_path = staging.data_path.join(format!(
893-
"{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}",
912+
"{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}",
894913
Utc::now().format("%Y%m%dT%H%M"),
895914
parsed_timestamp.date(),
896915
parsed_timestamp.hour(),
897916
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
898917
hostname::get().unwrap().into_string().unwrap()
899918
));
900919

901-
let generated_path =
902-
staging.path_by_current_time(stream_hash, parsed_timestamp, &custom_partition_values);
920+
let generated_path = staging.path_prefix_by_current_time(
921+
stream_hash,
922+
parsed_timestamp,
923+
&custom_partition_values,
924+
);
903925

904926
assert_eq!(generated_path, expected_path);
905927
}

src/query/stream_schema_provider.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ impl StandardTableProvider {
233233
};
234234

235235
// Staging arrow exection plan
236-
let records = staging.recordbatches_cloned(&self.schema);
236+
let records = staging.recordbatches_cloned(&self.schema, staging.get_time_partition());
237237
let arrow_exec = reversed_mem_table(records, self.schema.clone())?
238238
.scan(state, projection, filters, limit)
239239
.await?;

0 commit comments

Comments
 (0)