Skip to content

Commit 224574a

Browse files
author
Devdutt Shenoi
committed
fix: properly group arrows into parquet
1 parent 06524b6 commit 224574a

File tree

1 file changed

+15
-5
lines changed

1 file changed

+15
-5
lines changed

src/parseable/streams.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use arrow_schema::{Field, Fields, Schema};
3131
use chrono::{NaiveDateTime, Timelike, Utc};
3232
use derive_more::{Deref, DerefMut};
3333
use itertools::Itertools;
34+
use once_cell::sync::Lazy;
3435
use parquet::{
3536
arrow::ArrowWriter,
3637
basic::Encoding,
@@ -39,6 +40,7 @@ use parquet::{
3940
schema::types::ColumnPath,
4041
};
4142
use rand::distributions::DistString;
43+
use regex::Regex;
4244
use relative_path::RelativePathBuf;
4345
use tracing::{debug, error, info, trace, warn};
4446

@@ -64,6 +66,10 @@ use super::{
6466
LogStream, ARROW_FILE_EXTENSION,
6567
};
6668

69+
static ARROWS_NAME_STRUCTURE: Lazy<Regex> = Lazy::new(|| {
70+
Regex::new(r"^[[:alnum:]]+\.(?P<front>\S+)\.\d+\.data\.arrows$").expect("Validated regex")
71+
});
72+
6773
#[derive(Debug, thiserror::Error)]
6874
#[error("Stream not found: {0}")]
6975
pub struct StreamNotFound(pub String);
@@ -281,10 +287,14 @@ impl Stream {
281287
}
282288

283289
fn arrow_path_to_parquet(path: &Path, random_string: &str) -> PathBuf {
284-
let filename = path.file_stem().unwrap().to_str().unwrap();
285-
let (_, filename) = filename.split_once('.').unwrap();
286-
assert!(filename.contains('.'), "contains the delim `.`");
287-
let filename_with_random_number = format!("{filename}.{random_string}.arrows");
290+
let filename = path.file_name().unwrap().to_str().unwrap();
291+
let filename = ARROWS_NAME_STRUCTURE
292+
.captures(filename)
293+
.unwrap()
294+
.get(1)
295+
.unwrap()
296+
.as_str();
297+
let filename_with_random_number = format!("{filename}.data.{random_string}.arrows");
288298
let mut parquet_path = path.to_owned();
289299
parquet_path.set_file_name(filename_with_random_number);
290300
parquet_path.set_extension("parquet");
@@ -298,7 +308,7 @@ impl Stream {
298308
self.stream_name
299309
);
300310

301-
let time_partition = self.get_time_partition();
311+
let time_partition: Option<String> = self.get_time_partition();
302312
let custom_partition = self.get_custom_partition();
303313

304314
// read arrow files on disk

0 commit comments

Comments
 (0)