Skip to content

Commit 261fd66

Browse files
author
Devdutt Shenoi
committed
fix: convert only finished arrow files
1 parent a829b9e commit 261fd66

File tree

4 files changed

+13
-9
lines changed

4 files changed

+13
-9
lines changed

src/parseable/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ use crate::{
5858
mod staging;
5959
mod streams;
6060

61+
/// File extension for "finish"ed arrow files in staging
62+
const ARROW_FILE_EXTENSION: &str = "data.arrows";
63+
64+
/// File extension for un"finish"ed arrow files in staging
65+
const ARROW_PART_FILE_EXTENSION: &str = "part.arrows";
66+
6167
/// Name of a Stream
6268
/// NOTE: this used to be a struct, flattened out for simplicity
6369
pub type LogStream = String;

src/parseable/staging/mod.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,6 @@
2020
pub mod reader;
2121
pub mod writer;
2222

23-
/// File extension for "finish"ed arrow files in staging
24-
const ARROW_FILE_EXTENSION: &str = "data.arrows";
25-
26-
/// File extension for un"finish"ed arrow files in staging
27-
const ARROW_PART_FILE_EXTENSION: &str = "part.arrows";
28-
2923
#[derive(Debug, thiserror::Error)]
3024
pub enum StagingError {
3125
#[error("Unable to create recordbatch stream")]

src/parseable/staging/writer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@ use arrow_schema::Schema;
3131
use arrow_select::concat::concat_batches;
3232
use itertools::Itertools;
3333

34+
use crate::parseable::{ARROW_FILE_EXTENSION, ARROW_PART_FILE_EXTENSION};
3435
use crate::utils::arrow::adapt_batch;
3536

36-
use super::{StagingError, ARROW_FILE_EXTENSION, ARROW_PART_FILE_EXTENSION};
37+
use super::StagingError;
3738

3839
/// Context regarding `.arrows` file being persisted onto disk
3940
pub struct DiskWriter {

src/parseable/streams.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ use super::{
6161
writer::{DiskWriter, Writer},
6262
StagingError,
6363
},
64-
LogStream,
64+
LogStream, ARROW_FILE_EXTENSION,
6565
};
6666

6767
#[derive(Debug, thiserror::Error)]
@@ -170,7 +170,10 @@ impl Stream {
170170
let paths = dir
171171
.flatten()
172172
.map(|file| file.path())
173-
.filter(|file| file.extension().is_some_and(|ext| ext.eq("arrows")))
173+
.filter(|path| {
174+
path.file_name()
175+
.is_some_and(|f| f.to_string_lossy().ends_with(ARROW_FILE_EXTENSION))
176+
})
174177
.sorted_by_key(|f| f.metadata().unwrap().modified().unwrap())
175178
.collect();
176179

0 commit comments

Comments
 (0)