Skip to content

Commit ec66d08

Browse files
authored
Add querying data in staging area (#448)
Implements and makes use of a custom query table provider which combines ListingTable and MemTable for querying remote data and staging data together. UnionExec is used to combine two inputs together. This can be switched out for SortPreservingMergeExec if output is interleaving staging and remote data together on something like select * from table Fixes #445
1 parent 3bb9186 commit ec66d08

File tree

9 files changed

+389
-63
lines changed

9 files changed

+389
-63
lines changed

server/src/event/writer.rs

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,52 @@
1818
*/
1919

2020
mod file_writer;
21+
mod mem_writer;
2122

2223
use std::{
2324
collections::HashMap,
24-
sync::{Mutex, RwLock},
25+
sync::{Arc, Mutex, RwLock},
2526
};
2627

27-
use self::{errors::StreamWriterError, file_writer::FileWriter};
28-
use arrow_array::RecordBatch;
28+
use crate::utils;
29+
30+
use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter};
31+
use arrow_array::{RecordBatch, TimestampMillisecondArray};
32+
use arrow_schema::Schema;
33+
use chrono::Utc;
2934
use derive_more::{Deref, DerefMut};
3035
use once_cell::sync::Lazy;
3136

3237
pub static STREAM_WRITERS: Lazy<WriterTable> = Lazy::new(WriterTable::default);
3338

39+
#[derive(Default)]
40+
pub struct Writer {
41+
pub mem: MemWriter<16384>,
42+
pub disk: FileWriter,
43+
}
44+
45+
impl Writer {
46+
fn push(
47+
&mut self,
48+
stream_name: &str,
49+
schema_key: &str,
50+
rb: RecordBatch,
51+
) -> Result<(), StreamWriterError> {
52+
let rb = utils::arrow::replace_columns(
53+
rb.schema(),
54+
&rb,
55+
&[0],
56+
&[Arc::new(get_timestamp_array(rb.num_rows()))],
57+
);
58+
59+
self.disk.push(stream_name, schema_key, &rb)?;
60+
self.mem.push(schema_key, rb);
61+
Ok(())
62+
}
63+
}
64+
3465
#[derive(Deref, DerefMut, Default)]
35-
pub struct WriterTable(RwLock<HashMap<String, Mutex<FileWriter>>>);
66+
pub struct WriterTable(RwLock<HashMap<String, Mutex<Writer>>>);
3667

3768
impl WriterTable {
3869
// append to a existing stream
@@ -49,7 +80,7 @@ impl WriterTable {
4980
stream_writer
5081
.lock()
5182
.unwrap()
52-
.push(stream_name, schema_key, &record)?;
83+
.push(stream_name, schema_key, record)?;
5384
}
5485
None => {
5586
drop(hashmap_guard);
@@ -60,10 +91,10 @@ impl WriterTable {
6091
writer
6192
.lock()
6293
.unwrap()
63-
.push(stream_name, schema_key, &record)?;
94+
.push(stream_name, schema_key, record)?;
6495
} else {
65-
let mut writer = FileWriter::default();
66-
writer.push(stream_name, schema_key, &record)?;
96+
let mut writer = Writer::default();
97+
writer.push(stream_name, schema_key, record)?;
6798
map.insert(stream_name.to_owned(), Mutex::new(writer));
6899
}
69100
}
@@ -81,9 +112,31 @@ impl WriterTable {
81112
drop(table);
82113
for writer in map.into_values() {
83114
let writer = writer.into_inner().unwrap();
84-
writer.close_all();
115+
writer.disk.close_all();
85116
}
86117
}
118+
119+
pub fn recordbatches_cloned(
120+
&self,
121+
stream_name: &str,
122+
schema: &Arc<Schema>,
123+
) -> Option<Vec<RecordBatch>> {
124+
let records = self
125+
.0
126+
.read()
127+
.unwrap()
128+
.get(stream_name)?
129+
.lock()
130+
.unwrap()
131+
.mem
132+
.recordbatch_cloned(schema);
133+
134+
Some(records)
135+
}
136+
}
137+
138+
fn get_timestamp_array(size: usize) -> TimestampMillisecondArray {
139+
TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size)
87140
}
88141

89142
pub mod errors {

server/src/event/writer/file_writer.rs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,15 @@
1717
*
1818
*/
1919

20-
use arrow_array::{RecordBatch, TimestampMillisecondArray};
21-
use arrow_ipc::writer::StreamWriter;
22-
use chrono::Utc;
23-
use derive_more::{Deref, DerefMut};
2420
use std::collections::HashMap;
2521
use std::fs::{File, OpenOptions};
2622
use std::path::PathBuf;
27-
use std::sync::Arc;
23+
24+
use arrow_array::RecordBatch;
25+
use arrow_ipc::writer::StreamWriter;
26+
use derive_more::{Deref, DerefMut};
2827

2928
use crate::storage::staging::StorageDir;
30-
use crate::utils;
3129

3230
use super::errors::StreamWriterError;
3331

@@ -47,24 +45,17 @@ impl FileWriter {
4745
schema_key: &str,
4846
record: &RecordBatch,
4947
) -> Result<(), StreamWriterError> {
50-
let record = utils::arrow::replace_columns(
51-
record.schema(),
52-
record,
53-
&[0],
54-
&[Arc::new(get_timestamp_array(record.num_rows()))],
55-
);
56-
5748
match self.get_mut(schema_key) {
5849
Some(writer) => {
5950
writer
6051
.writer
61-
.write(&record)
52+
.write(record)
6253
.map_err(StreamWriterError::Writer)?;
6354
}
6455
// entry is not present thus we create it
6556
None => {
6657
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
67-
let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, &record)?;
58+
let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record)?;
6859
self.insert(
6960
schema_key.to_owned(),
7061
ArrowWriter {
@@ -85,10 +76,6 @@ impl FileWriter {
8576
}
8677
}
8778

88-
fn get_timestamp_array(size: usize) -> TimestampMillisecondArray {
89-
TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size)
90-
}
91-
9279
fn init_new_stream_writer_file(
9380
stream_name: &str,
9481
schema_key: &str,

server/src/event/writer/mem_writer.rs

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use std::{collections::HashSet, sync::Arc};
20+
21+
use arrow_array::RecordBatch;
22+
use arrow_schema::Schema;
23+
use arrow_select::concat::concat_batches;
24+
use itertools::Itertools;
25+
26+
use crate::utils::arrow::adapt_batch;
27+
28+
/// Structure to keep recordbatches in memory.
29+
///
30+
/// Any new schema is updated in the schema map.
31+
/// Recordbatches are pushed to mutable buffer first and then concated together and pushed to read buffer
32+
#[derive(Debug)]
33+
pub struct MemWriter<const N: usize> {
34+
schema: Schema,
35+
// for checking uniqueness of schema
36+
schema_map: HashSet<String>,
37+
read_buffer: Vec<RecordBatch>,
38+
mutable_buffer: MutableBuffer<N>,
39+
}
40+
41+
impl<const N: usize> Default for MemWriter<N> {
42+
fn default() -> Self {
43+
Self {
44+
schema: Schema::empty(),
45+
schema_map: HashSet::default(),
46+
read_buffer: Vec::default(),
47+
mutable_buffer: MutableBuffer::default(),
48+
}
49+
}
50+
}
51+
52+
impl<const N: usize> MemWriter<N> {
53+
pub fn push(&mut self, schema_key: &str, rb: RecordBatch) {
54+
if !self.schema_map.contains(schema_key) {
55+
self.schema_map.insert(schema_key.to_owned());
56+
self.schema = Schema::try_merge([self.schema.clone(), (*rb.schema()).clone()]).unwrap();
57+
}
58+
59+
if let Some(record) = self.mutable_buffer.push(rb) {
60+
let record = concat_records(&Arc::new(self.schema.clone()), &record);
61+
self.read_buffer.push(record);
62+
}
63+
}
64+
65+
pub fn recordbatch_cloned(&self, schema: &Arc<Schema>) -> Vec<RecordBatch> {
66+
let mut read_buffer = self.read_buffer.clone();
67+
if self.mutable_buffer.rows > 0 {
68+
let rb = concat_records(schema, &self.mutable_buffer.inner);
69+
read_buffer.push(rb)
70+
}
71+
72+
read_buffer
73+
.into_iter()
74+
.map(|rb| adapt_batch(schema, &rb))
75+
.collect()
76+
}
77+
}
78+
79+
fn concat_records(schema: &Arc<Schema>, record: &[RecordBatch]) -> RecordBatch {
80+
let records = record.iter().map(|x| adapt_batch(schema, x)).collect_vec();
81+
let record = concat_batches(schema, records.iter()).unwrap();
82+
record
83+
}
84+
85+
#[derive(Debug, Default)]
86+
struct MutableBuffer<const N: usize> {
87+
pub inner: Vec<RecordBatch>,
88+
pub rows: usize,
89+
}
90+
91+
impl<const N: usize> MutableBuffer<N> {
92+
fn push(&mut self, rb: RecordBatch) -> Option<Vec<RecordBatch>> {
93+
if self.rows + rb.num_rows() >= N {
94+
let left = N - self.rows;
95+
let right = rb.num_rows() - left;
96+
let left_slice = rb.slice(0, left);
97+
let right_slice = if left < rb.num_rows() {
98+
Some(rb.slice(left, right))
99+
} else {
100+
None
101+
};
102+
self.inner.push(left_slice);
103+
// take all records
104+
let src = Vec::with_capacity(self.inner.len());
105+
let inner = std::mem::replace(&mut self.inner, src);
106+
self.rows = 0;
107+
108+
if let Some(right_slice) = right_slice {
109+
self.rows = right_slice.num_rows();
110+
self.inner.push(right_slice);
111+
}
112+
113+
Some(inner)
114+
} else {
115+
self.rows += rb.num_rows();
116+
self.inner.push(rb);
117+
None
118+
}
119+
}
120+
}

0 commit comments

Comments
 (0)