Skip to content

Commit 39d6c67

Browse files
temp: allow configurable buffer size while ingestion (#624)
Co-authored-by: Nitish Tiwari <[email protected]>
1 parent 8394cf4 commit 39d6c67

File tree

3 files changed

+30
-10
lines changed

3 files changed

+30
-10
lines changed

server/src/event/writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub static STREAM_WRITERS: Lazy<WriterTable> = Lazy::new(WriterTable::default);
3838

3939
#[derive(Default)]
4040
pub struct Writer {
41-
pub mem: MemWriter<16384>,
41+
pub mem: MemWriter,
4242
pub disk: FileWriter,
4343
}
4444

server/src/event/writer/mem_writer.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,22 @@ use arrow_schema::Schema;
2323
use arrow_select::concat::concat_batches;
2424
use itertools::Itertools;
2525

26-
use crate::utils::arrow::adapt_batch;
26+
use crate::{option::CONFIG, utils::arrow::adapt_batch};
2727

2828
/// Structure to keep recordbatches in memory.
2929
///
3030
/// Any new schema is updated in the schema map.
3131
/// Recordbatches are pushed to mutable buffer first and then concated together and pushed to read buffer
3232
#[derive(Debug)]
33-
pub struct MemWriter<const N: usize> {
33+
pub struct MemWriter {
3434
schema: Schema,
3535
// for checking uniqueness of schema
3636
schema_map: HashSet<String>,
3737
read_buffer: Vec<RecordBatch>,
38-
mutable_buffer: MutableBuffer<N>,
38+
mutable_buffer: MutableBuffer,
3939
}
4040

41-
impl<const N: usize> Default for MemWriter<N> {
41+
impl Default for MemWriter {
4242
fn default() -> Self {
4343
Self {
4444
schema: Schema::empty(),
@@ -49,7 +49,7 @@ impl<const N: usize> Default for MemWriter<N> {
4949
}
5050
}
5151

52-
impl<const N: usize> MemWriter<N> {
52+
impl MemWriter {
5353
pub fn push(&mut self, schema_key: &str, rb: RecordBatch) {
5454
if !self.schema_map.contains(schema_key) {
5555
self.schema_map.insert(schema_key.to_owned());
@@ -83,15 +83,17 @@ fn concat_records(schema: &Arc<Schema>, record: &[RecordBatch]) -> RecordBatch {
8383
}
8484

8585
#[derive(Debug, Default)]
86-
struct MutableBuffer<const N: usize> {
86+
struct MutableBuffer {
8787
pub inner: Vec<RecordBatch>,
8888
pub rows: usize,
8989
}
9090

91-
impl<const N: usize> MutableBuffer<N> {
91+
impl MutableBuffer {
9292
fn push(&mut self, rb: RecordBatch) -> Option<Vec<RecordBatch>> {
93-
if self.rows + rb.num_rows() >= N {
94-
let left = N - self.rows;
93+
let maxima = CONFIG.parseable.records_per_request;
94+
95+
if self.rows + rb.num_rows() >= maxima {
96+
let left = maxima - self.rows;
9597
let right = rb.num_rows() - left;
9698
let left_slice = rb.slice(0, left);
9799
let right_slice = if left < rb.num_rows() {

server/src/option.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,9 @@ pub struct Server {
229229

230230
/// Parquet compression algorithm
231231
pub parquet_compression: Compression,
232+
233+
/// Max value a record can be before splitting the request
234+
pub records_per_request: usize,
232235
}
233236

234237
impl FromArgMatches for Server {
@@ -247,6 +250,10 @@ impl FromArgMatches for Server {
247250
let openid_client_secret = m.get_one::<String>(Self::OPENID_CLIENT_SECRET).cloned();
248251
let openid_issuer = m.get_one::<Url>(Self::OPENID_ISSUER).cloned();
249252

253+
self.records_per_request = m
254+
.get_one(Self::BUFFER_SIZE)
255+
.cloned()
256+
.expect("default value for records per request");
250257
self.address = m
251258
.get_one::<String>(Self::ADDRESS)
252259
.cloned()
@@ -362,6 +369,7 @@ impl Server {
362369
pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo";
363370
pub const DEFAULT_USERNAME: &'static str = "admin";
364371
pub const DEFAULT_PASSWORD: &'static str = "admin";
372+
pub const BUFFER_SIZE: &'static str = "buffer-size";
365373

366374
pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
367375
self.local_staging_path.join(stream_name)
@@ -509,6 +517,16 @@ impl Server {
509517
.value_parser(validation::url)
510518
.help("OIDC provider's host address"),
511519
)
520+
.arg(
521+
Arg::new(Self::BUFFER_SIZE)
522+
.long(Self::BUFFER_SIZE)
523+
.env("P_BUFFER_SIZE")
524+
.value_name("NUMBER")
525+
.default_value("16384")
526+
.required(false)
527+
.value_parser(value_parser!(usize))
528+
.help("buffer size for internal request buffer"),
529+
)
512530
.arg(
513531
Arg::new(Self::DOMAIN_URI)
514532
.long(Self::DOMAIN_URI)

0 commit comments

Comments
 (0)