Skip to content

Commit 236c3ec

Browse files
trueleonitisht
andauthored
Add in-memory writer (#369)
This PR refactors the ingestion flow to allow optional, memory only staging approach during ingestion. Memory staging means many times faster ingestion at the risk of loosing events (in memory) in case of a server crash / restart. --------- Co-authored-by: Nitish Tiwari <[email protected]>
1 parent 098f1b0 commit 236c3ec

23 files changed

+1899
-1021
lines changed

Cargo.lock

Lines changed: 105 additions & 146 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/Cargo.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ actix-cors = "0.6"
1313
actix-web-prometheus = { version = "0.1" }
1414
prometheus = { version = "0.13", features = ["process"] }
1515
anyhow = { version = "1.0", features = ["backtrace"] }
16-
arrow-schema = { version = "34.0.0", features = ["serde"] }
17-
arrow-array = { version = "34.0.0" }
18-
arrow-json = "34.0.0"
19-
arrow-ipc = "34.0.0"
16+
arrow-schema = { version = "36.0.0", features = ["serde"] }
17+
arrow-array = { version = "36.0.0" }
18+
arrow-json = "36.0.0"
19+
arrow-ipc = "36.0.0"
2020
async-trait = "0.1"
2121
base64 = "0.21"
2222
bytes = "1.4"
@@ -32,7 +32,7 @@ clap = { version = "4.1", default-features = false, features = [
3232
"error-context",
3333
] }
3434
crossterm = "0.26"
35-
datafusion = "21.0.0"
35+
datafusion = "22.0.0"
3636
object_store = { version = "0.5.6", features = ["aws", "aws_profile"] }
3737
derive_more = "0.99"
3838
env_logger = "0.10"
@@ -70,7 +70,7 @@ xxhash-rust = { version = "0.8", features = ["xxh3"] }
7070
xz2 = { version = "*", features=["static"] }
7171
bzip2 = { version = "*", features=["static"] }
7272
once_cell = "1.17.1"
73-
parquet = "34.0.0"
73+
parquet = "36.0.0"
7474
pyroscope = { version = "0.5.3", optional = true }
7575
pyroscope_pprofrs = { version = "0.2", optional = true }
7676
uptime_lib = "0.2.2"

server/src/alerts/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,15 @@ pub struct Alert {
6464
}
6565

6666
impl Alert {
67-
pub fn check_alert(&self, stream_name: String, events: RecordBatch) {
67+
pub fn check_alert(&self, stream_name: &str, events: RecordBatch) {
6868
let resolves = self.rule.resolves(events.clone());
6969

7070
for (index, state) in resolves.into_iter().enumerate() {
7171
match state {
7272
AlertState::Listening | AlertState::Firing => (),
7373
alert_state @ (AlertState::SetToFiring | AlertState::Resolved) => {
7474
let context = self.get_context(
75-
stream_name.clone(),
75+
stream_name.to_owned(),
7676
alert_state,
7777
&self.rule,
7878
events.slice(index, 1),

server/src/event.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,25 @@ pub struct Event {
4848
impl Event {
4949
pub async fn process(self) -> Result<(), EventError> {
5050
let key = get_schema_key(&self.rb.schema().fields);
51+
let num_rows = self.rb.num_rows() as u64;
5152

5253
if self.is_first_event(metadata::STREAM_INFO.schema(&self.stream_name)?.as_ref()) {
5354
commit_schema(&self.stream_name, self.rb.schema())?;
5455
}
5556

56-
self.process_event(&key)?;
57+
Self::process_event(&self.stream_name, &key, self.rb.clone())?;
5758

5859
metadata::STREAM_INFO.update_stats(
5960
&self.stream_name,
6061
self.origin_format,
6162
self.origin_size,
62-
self.rb.num_rows() as u64,
63+
num_rows,
6364
)?;
6465

65-
if let Err(e) = metadata::STREAM_INFO.check_alerts(&self).await {
66+
if let Err(e) = metadata::STREAM_INFO
67+
.check_alerts(&self.stream_name, self.rb)
68+
.await
69+
{
6670
log::error!("Error checking for alerts. {:?}", e);
6771
}
6872

@@ -90,8 +94,12 @@ impl Event {
9094

9195
// event process all events after the 1st event. Concatenates record batches
9296
// and puts them in memory store for each event.
93-
fn process_event(&self, schema_key: &str) -> Result<(), EventError> {
94-
STREAM_WRITERS.append_to_local(&self.stream_name, schema_key, &self.rb)?;
97+
fn process_event(
98+
stream_name: &str,
99+
schema_key: &str,
100+
rb: RecordBatch,
101+
) -> Result<(), EventError> {
102+
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb)?;
95103
Ok(())
96104
}
97105
}

server/src/event/format/json.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
214214
false
215215
}
216216
}
217+
DataType::Timestamp(_, _) => value.is_string() || value.is_number(),
217218
_ => unreachable!(),
218219
}
219220
}

0 commit comments

Comments
 (0)