Skip to content

Commit f21bfe4

Browse files
authored
Revert "Mutex Poison Error if a large amount of data is sent" (#629)
1 parent 39d6c67 commit f21bfe4

File tree

3 files changed

+10
-30
lines changed

3 files changed

+10
-30
lines changed

server/src/event/writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub static STREAM_WRITERS: Lazy<WriterTable> = Lazy::new(WriterTable::default);
3838

3939
#[derive(Default)]
4040
pub struct Writer {
41-
pub mem: MemWriter,
41+
pub mem: MemWriter<16384>,
4242
pub disk: FileWriter,
4343
}
4444

server/src/event/writer/mem_writer.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,22 @@ use arrow_schema::Schema;
2323
use arrow_select::concat::concat_batches;
2424
use itertools::Itertools;
2525

26-
use crate::{option::CONFIG, utils::arrow::adapt_batch};
26+
use crate::utils::arrow::adapt_batch;
2727

2828
/// Structure to keep recordbatches in memory.
2929
///
3030
/// Any new schema is updated in the schema map.
3131
/// Recordbatches are pushed to mutable buffer first and then concated together and pushed to read buffer
3232
#[derive(Debug)]
33-
pub struct MemWriter {
33+
pub struct MemWriter<const N: usize> {
3434
schema: Schema,
3535
// for checking uniqueness of schema
3636
schema_map: HashSet<String>,
3737
read_buffer: Vec<RecordBatch>,
38-
mutable_buffer: MutableBuffer,
38+
mutable_buffer: MutableBuffer<N>,
3939
}
4040

41-
impl Default for MemWriter {
41+
impl<const N: usize> Default for MemWriter<N> {
4242
fn default() -> Self {
4343
Self {
4444
schema: Schema::empty(),
@@ -49,7 +49,7 @@ impl Default for MemWriter {
4949
}
5050
}
5151

52-
impl MemWriter {
52+
impl<const N: usize> MemWriter<N> {
5353
pub fn push(&mut self, schema_key: &str, rb: RecordBatch) {
5454
if !self.schema_map.contains(schema_key) {
5555
self.schema_map.insert(schema_key.to_owned());
@@ -83,17 +83,15 @@ fn concat_records(schema: &Arc<Schema>, record: &[RecordBatch]) -> RecordBatch {
8383
}
8484

8585
#[derive(Debug, Default)]
86-
struct MutableBuffer {
86+
struct MutableBuffer<const N: usize> {
8787
pub inner: Vec<RecordBatch>,
8888
pub rows: usize,
8989
}
9090

91-
impl MutableBuffer {
91+
impl<const N: usize> MutableBuffer<N> {
9292
fn push(&mut self, rb: RecordBatch) -> Option<Vec<RecordBatch>> {
93-
let maxima = CONFIG.parseable.records_per_request;
94-
95-
if self.rows + rb.num_rows() >= maxima {
96-
let left = maxima - self.rows;
93+
if self.rows + rb.num_rows() >= N {
94+
let left = N - self.rows;
9795
let right = rb.num_rows() - left;
9896
let left_slice = rb.slice(0, left);
9997
let right_slice = if left < rb.num_rows() {

server/src/option.rs

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -229,9 +229,6 @@ pub struct Server {
229229

230230
/// Parquet compression algorithm
231231
pub parquet_compression: Compression,
232-
233-
/// Max value a record can be before splitting the request
234-
pub records_per_request: usize,
235232
}
236233

237234
impl FromArgMatches for Server {
@@ -250,10 +247,6 @@ impl FromArgMatches for Server {
250247
let openid_client_secret = m.get_one::<String>(Self::OPENID_CLIENT_SECRET).cloned();
251248
let openid_issuer = m.get_one::<Url>(Self::OPENID_ISSUER).cloned();
252249

253-
self.records_per_request = m
254-
.get_one(Self::BUFFER_SIZE)
255-
.cloned()
256-
.expect("default value for records per request");
257250
self.address = m
258251
.get_one::<String>(Self::ADDRESS)
259252
.cloned()
@@ -369,7 +362,6 @@ impl Server {
369362
pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo";
370363
pub const DEFAULT_USERNAME: &'static str = "admin";
371364
pub const DEFAULT_PASSWORD: &'static str = "admin";
372-
pub const BUFFER_SIZE: &'static str = "buffer-size";
373365

374366
pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
375367
self.local_staging_path.join(stream_name)
@@ -517,16 +509,6 @@ impl Server {
517509
.value_parser(validation::url)
518510
.help("OIDC provider's host address"),
519511
)
520-
.arg(
521-
Arg::new(Self::BUFFER_SIZE)
522-
.long(Self::BUFFER_SIZE)
523-
.env("P_BUFFER_SIZE")
524-
.value_name("NUMBER")
525-
.default_value("16384")
526-
.required(false)
527-
.value_parser(value_parser!(usize))
528-
.help("buffer size for internal request buffer"),
529-
)
530512
.arg(
531513
Arg::new(Self::DOMAIN_URI)
532514
.long(Self::DOMAIN_URI)

0 commit comments

Comments
 (0)