Skip to content

Commit d304b9f

Browse files
author
Devdutt Shenoi
committed
feat: DiskWriter handles writing to arrow part file
1 parent f268422 commit d304b9f

File tree

2 files changed

+44
-16
lines changed

2 files changed

+44
-16
lines changed

src/parseable/staging/writer.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
use std::{
2121
collections::{HashMap, HashSet},
22-
fs::File,
22+
fs::{File, OpenOptions},
23+
path::PathBuf,
2324
sync::Arc,
2425
};
2526

@@ -28,13 +29,48 @@ use arrow_ipc::writer::StreamWriter;
2829
use arrow_schema::Schema;
2930
use arrow_select::concat::concat_batches;
3031
use itertools::Itertools;
32+
use tracing::error;
3133

32-
use crate::utils::arrow::adapt_batch;
34+
use crate::{parseable::ARROW_FILE_EXTENSION, utils::arrow::adapt_batch};
35+
36+
use super::StagingError;
3337

3438
#[derive(Default)]
3539
pub struct Writer {
3640
pub mem: MemWriter<16384>,
37-
pub disk: HashMap<String, StreamWriter<File>>,
41+
pub disk: HashMap<String, DiskWriter>,
42+
}
43+
44+
pub struct DiskWriter {
45+
pub inner: StreamWriter<File>,
46+
pub path: PathBuf,
47+
}
48+
49+
impl DiskWriter {
50+
pub fn new(path: PathBuf, schema: &Schema) -> Result<Self, StagingError> {
51+
let file = OpenOptions::new().create(true).append(true).open(&path)?;
52+
53+
let inner = StreamWriter::try_new(file, schema)?;
54+
55+
Ok(Self { inner, path })
56+
}
57+
58+
pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> {
59+
self.inner.write(rb).map_err(StagingError::Arrow)
60+
}
61+
62+
pub fn finish(&mut self) {
63+
if let Err(err) = self.inner.finish() {
64+
error!("Couldn't finish arrow file {:?}, error = {err}", self.path);
65+
return;
66+
}
67+
68+
let mut arrow_path = self.path.to_owned();
69+
arrow_path.set_extension(ARROW_FILE_EXTENSION);
70+
if let Err(err) = std::fs::rename(&self.path, &arrow_path) {
71+
error!("Couldn't rename file {:?}, error = {err}", self.path);
72+
}
73+
}
3874
}
3975

4076
/// Structure to keep recordbatches in memory.

src/parseable/streams.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use std::{
2828
};
2929

3030
use arrow_array::RecordBatch;
31-
use arrow_ipc::writer::StreamWriter;
3231
use arrow_schema::{Field, Fields, Schema};
3332
use chrono::{NaiveDateTime, Timelike};
3433
use derive_more::{Deref, DerefMut};
@@ -63,7 +62,7 @@ use crate::{
6362
use super::{
6463
staging::{
6564
reader::{MergedRecordReader, MergedReverseRecordReader},
66-
writer::Writer,
65+
writer::{DiskWriter, Writer},
6766
StagingError,
6867
},
6968
LogStream, ARROW_FILE_EXTENSION,
@@ -143,21 +142,14 @@ impl Stream {
143142
}
144143
None => {
145144
// entry is not present thus we create it
146-
let file_path = self.path_by_current_time(
145+
let path = self.path_by_current_time(
147146
schema_key,
148147
parsed_timestamp,
149148
custom_partition_values,
150149
);
151150
std::fs::create_dir_all(&self.data_path)?;
152151

153-
let file = OpenOptions::new()
154-
.create(true)
155-
.append(true)
156-
.open(&file_path)?;
157-
158-
let mut writer = StreamWriter::try_new(file, &record.schema())
159-
.expect("File and RecordBatch both are checked");
160-
152+
let mut writer = DiskWriter::new(path, &record.schema())?;
161153
writer.write(record)?;
162154
guard.disk.insert(schema_key.to_owned(), writer);
163155
}
@@ -180,7 +172,7 @@ impl Stream {
180172
hostname.push_str(id);
181173
}
182174
let filename = format!(
183-
"{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.{ARROW_FILE_EXTENSION}",
175+
"{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.part",
184176
parsed_timestamp.date(),
185177
parsed_timestamp.hour(),
186178
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
@@ -391,7 +383,7 @@ impl Stream {
391383

392384
// Flush disk
393385
for writer in disk_writers.values_mut() {
394-
_ = writer.finish();
386+
writer.finish();
395387
}
396388
}
397389

0 commit comments

Comments
 (0)