Skip to content

Commit 784c0aa

Browse files
authored
Automatically add p_datetime if there are no suitable time columns (#227)
Add a dedicated timestamp column to stream if there is none provided (with a valid ISO 8601 timestamp format) in the request. This column is called `p_timestamp`.
1 parent 459b267 commit 784c0aa

File tree

4 files changed

+184
-94
lines changed

4 files changed

+184
-94
lines changed

server/src/event.rs

Lines changed: 137 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,19 @@
1717
*
1818
*/
1919
use actix_web::rt::spawn;
20+
use arrow_schema::{DataType, Field, TimeUnit};
21+
use chrono::{DateTime, Utc};
22+
use datafusion::arrow::array::{Array, TimestampMillisecondArray};
2023
use datafusion::arrow::datatypes::Schema;
2124
use datafusion::arrow::error::ArrowError;
2225
use datafusion::arrow::ipc::writer::StreamWriter;
23-
use datafusion::arrow::json;
24-
use datafusion::arrow::json::reader::infer_json_schema;
26+
27+
use datafusion::arrow::json::reader::{infer_json_schema_from_iterator, Decoder, DecoderOptions};
2528
use datafusion::arrow::record_batch::RecordBatch;
2629
use lazy_static::lazy_static;
30+
use serde_json::Value;
2731
use std::collections::HashMap;
2832
use std::fs::OpenOptions;
29-
use std::io::BufReader;
3033
use std::ops::{Deref, DerefMut};
3134
use std::sync::Arc;
3235
use std::sync::Mutex;
@@ -43,6 +46,9 @@ use self::error::{EventError, StreamWriterError};
4346
type LocalWriter = Mutex<Option<StreamWriter<std::fs::File>>>;
4447
type LocalWriterGuard<'a> = MutexGuard<'a, Option<StreamWriter<std::fs::File>>>;
4548

49+
const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
50+
const TIME_KEYS: &[&str] = &["time", "date", "datetime", "timestamp"];
51+
4652
lazy_static! {
4753
#[derive(Default)]
4854
pub static ref STREAM_WRITERS: RwLock<HashMap<String, LocalWriter>> = RwLock::new(HashMap::new());
@@ -161,38 +167,68 @@ fn init_new_stream_writer_file(
161167

162168
#[derive(Clone)]
163169
pub struct Event {
164-
pub body: String,
170+
pub body: Value,
165171
pub stream_name: String,
166172
}
167173

168174
// Events holds the schema related to a each event for a single log stream
169-
170175
impl Event {
171-
pub async fn process(&self) -> Result<(), EventError> {
172-
let inferred_schema = self.infer_schema()?;
173-
174-
let event = self.get_reader(inferred_schema.clone());
176+
pub async fn process(self) -> Result<(), EventError> {
175177
let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?;
176-
177-
if let Some(existing_schema) = stream_schema {
178+
if let Some(schema) = stream_schema {
179+
let schema_ref = Arc::new(schema);
178180
// validate schema before processing the event
179-
if existing_schema != inferred_schema {
181+
let Ok(mut event) = self.get_record(schema_ref.clone()) else {
180182
return Err(EventError::SchemaMismatch(self.stream_name.clone()));
181-
} else {
182-
self.process_event(event)?
183+
};
184+
185+
if event
186+
.schema()
187+
.column_with_name(DEFAULT_TIMESTAMP_KEY)
188+
.is_some()
189+
{
190+
let rows = event.num_rows();
191+
let timestamp_array = Arc::new(get_timestamp_array(rows));
192+
event = replace(schema_ref, event, DEFAULT_TIMESTAMP_KEY, timestamp_array);
183193
}
194+
195+
self.process_event(&event)?;
184196
} else {
185197
// if stream schema is none then it is first event,
186198
// process first event and store schema in obect store
187-
self.process_first_event(event, inferred_schema)?
199+
// check for a possible datetime field
200+
let time_field = get_datetime_field(&self.body);
201+
202+
if time_field.is_none() {
203+
let schema = Schema::try_merge(vec![
204+
Schema::new(vec![Field::new(
205+
DEFAULT_TIMESTAMP_KEY,
206+
DataType::Timestamp(TimeUnit::Millisecond, None),
207+
true,
208+
)]),
209+
self.infer_schema()?,
210+
])?;
211+
let schema_ref = Arc::new(schema.clone());
212+
let event = self.get_record(schema_ref.clone())?;
213+
let timestamp_array = Arc::new(get_timestamp_array(event.num_rows()));
214+
let event = replace(schema_ref, event, DEFAULT_TIMESTAMP_KEY, timestamp_array);
215+
self.process_first_event(&event, schema)?;
216+
} else {
217+
let schema = self.infer_schema()?;
218+
let schema_ref = Arc::new(schema.clone());
219+
let event = self.get_record(schema_ref)?;
220+
self.process_first_event(&event, schema)?;
221+
}
188222
};
189223

190224
metadata::STREAM_INFO.update_stats(
191225
&self.stream_name,
192-
std::mem::size_of_val(self.body.as_bytes()) as u64,
226+
serde_json::to_vec(&self.body)
227+
.map(|v| std::mem::size_of_val(v.as_slice()))
228+
.unwrap_or(0) as u64,
193229
)?;
194230

195-
if let Err(e) = metadata::STREAM_INFO.check_alerts(self).await {
231+
if let Err(e) = metadata::STREAM_INFO.check_alerts(&self).await {
196232
log::error!("Error checking for alerts. {:?}", e);
197233
}
198234

@@ -202,11 +238,7 @@ impl Event {
202238
// This is called when the first event of a log stream is received. The first event is
203239
// special because we parse this event to generate the schema for the log stream. This
204240
// schema is then enforced on rest of the events sent to this log stream.
205-
fn process_first_event<R: std::io::Read>(
206-
&self,
207-
event: json::Reader<R>,
208-
schema: Schema,
209-
) -> Result<(), EventError> {
241+
fn process_first_event(&self, event: &RecordBatch, schema: Schema) -> Result<(), EventError> {
210242
// note for functions _schema_with_map and _set_schema_with_map,
211243
// these are to be called while holding a write lock specifically.
212244
// this guarantees two things
@@ -266,30 +298,56 @@ impl Event {
266298

267299
// event process all events after the 1st event. Concatenates record batches
268300
// and puts them in memory store for each event.
269-
fn process_event<R: std::io::Read>(
270-
&self,
271-
mut event: json::Reader<R>,
272-
) -> Result<(), EventError> {
273-
let rb = event.next()?.ok_or(EventError::MissingRecord)?;
274-
STREAM_WRITERS::append_to_local(&self.stream_name, &rb)?;
301+
fn process_event(&self, rb: &RecordBatch) -> Result<(), EventError> {
302+
STREAM_WRITERS::append_to_local(&self.stream_name, rb)?;
275303
Ok(())
276304
}
277305

278306
// inferSchema is a constructor to Schema
279307
// returns raw arrow schema type and arrow schema to string type.
280308
fn infer_schema(&self) -> Result<Schema, ArrowError> {
281-
let reader = self.body.as_bytes();
282-
let mut buf_reader = BufReader::new(reader);
283-
infer_json_schema(&mut buf_reader, None)
309+
let iter = std::iter::once(Ok(self.body.clone()));
310+
infer_json_schema_from_iterator(iter)
284311
}
285312

286-
fn get_reader(&self, arrow_schema: Schema) -> json::Reader<&[u8]> {
287-
json::Reader::new(
288-
self.body.as_bytes(),
289-
Arc::new(arrow_schema),
290-
json::reader::DecoderOptions::new().with_batch_size(1024),
291-
)
313+
fn get_record(&self, schema: Arc<Schema>) -> Result<RecordBatch, EventError> {
314+
let mut iter = std::iter::once(Ok(self.body.clone()));
315+
let record = Decoder::new(schema, DecoderOptions::new()).next_batch(&mut iter)?;
316+
317+
record.ok_or(EventError::MissingRecord)
318+
}
319+
}
320+
321+
fn replace(
322+
schema: Arc<Schema>,
323+
batch: RecordBatch,
324+
column: &str,
325+
arr: Arc<dyn Array + 'static>,
326+
) -> RecordBatch {
327+
let index = schema.column_with_name(column).unwrap().0;
328+
let mut arrays = batch.columns().to_vec();
329+
arrays[index] = arr;
330+
331+
RecordBatch::try_new(schema, arrays).unwrap()
332+
}
333+
334+
fn get_timestamp_array(size: usize) -> TimestampMillisecondArray {
335+
let time = Utc::now();
336+
TimestampMillisecondArray::from_value(time.timestamp_millis(), size)
337+
}
338+
339+
fn get_datetime_field(json: &Value) -> Option<&str> {
340+
let Value::Object(object) = json else { panic!() };
341+
for (key, value) in object {
342+
if TIME_KEYS.contains(&key.as_str()) {
343+
if let Value::String(maybe_datetime) = value {
344+
if DateTime::parse_from_rfc3339(maybe_datetime).is_ok() {
345+
return Some(key);
346+
}
347+
}
348+
}
292349
}
350+
None
293351
}
294352

295353
// Special functions which reads from metadata map while holding the lock
@@ -350,3 +408,45 @@ pub mod error {
350408
MutexPoisoned,
351409
}
352410
}
411+
412+
#[cfg(test)]
413+
mod tests {
414+
use std::sync::Arc;
415+
416+
use arrow_schema::{Field, Schema};
417+
use datafusion::arrow::{
418+
array::{Array, Int32Array},
419+
record_batch::RecordBatch,
420+
};
421+
422+
use crate::event::replace;
423+
424+
#[test]
425+
fn check_replace() {
426+
let schema = Schema::new(vec![
427+
Field::new("a", arrow_schema::DataType::Int32, false),
428+
Field::new("b", arrow_schema::DataType::Int32, false),
429+
Field::new("c", arrow_schema::DataType::Int32, false),
430+
]);
431+
432+
let schema_ref = Arc::new(schema);
433+
434+
let rb = RecordBatch::try_new(
435+
schema_ref.clone(),
436+
vec![
437+
Arc::new(Int32Array::from_value(0, 3)),
438+
Arc::new(Int32Array::from_value(0, 3)),
439+
Arc::new(Int32Array::from_value(0, 3)),
440+
],
441+
)
442+
.unwrap();
443+
444+
let arr: Arc<dyn Array + 'static> = Arc::new(Int32Array::from_value(0, 3));
445+
446+
let new_rb = replace(schema_ref.clone(), rb, "c", arr);
447+
448+
assert_eq!(new_rb.schema(), schema_ref);
449+
assert_eq!(new_rb.num_columns(), 3);
450+
assert_eq!(new_rb.num_rows(), 3)
451+
}
452+
}

server/src/handlers/event.rs

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*
1717
*/
1818

19-
use std::collections::HashMap;
20-
2119
use actix_web::{web, HttpRequest, HttpResponse};
2220
use serde_json::Value;
2321

@@ -26,7 +24,7 @@ use crate::option::CONFIG;
2624
use crate::query::Query;
2725
use crate::response::QueryResponse;
2826
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
29-
use crate::utils::{self, flatten_json_body, merge};
27+
use crate::utils::{flatten_json_body, merge};
3028

3129
use self::error::{PostError, QueryError};
3230

@@ -87,39 +85,42 @@ async fn push_logs(
8785
req: HttpRequest,
8886
body: web::Json<serde_json::Value>,
8987
) -> Result<(), PostError> {
90-
let tags = HashMap::from([(
91-
"p_tags".to_string(),
92-
collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?,
93-
)]);
94-
95-
let metadata = HashMap::from([(
96-
"p_metadata".to_string(),
97-
collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?,
98-
)]);
99-
100-
if let Some(array) = body.as_array() {
101-
for body in array {
102-
let body = merge(body.clone(), metadata.clone());
103-
let body = merge(body, tags.clone());
104-
let body = flatten_json_body(web::Json(body)).unwrap();
88+
let tags_n_metadata = [
89+
(
90+
"p_tags".to_string(),
91+
Value::String(collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?),
92+
),
93+
(
94+
"p_metadata".to_string(),
95+
Value::String(collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?),
96+
),
97+
];
98+
99+
match body.0 {
100+
Value::Array(array) => {
101+
for mut body in array {
102+
merge(&mut body, tags_n_metadata.clone().into_iter());
103+
let body = flatten_json_body(&body).unwrap();
104+
105+
let event = event::Event {
106+
body,
107+
stream_name: stream_name.clone(),
108+
};
109+
110+
event.process().await?;
111+
}
112+
}
113+
mut body @ Value::Object(_) => {
114+
merge(&mut body, tags_n_metadata.into_iter());
105115

106116
let event = event::Event {
107-
body,
108-
stream_name: stream_name.clone(),
117+
body: flatten_json_body(&body).unwrap(),
118+
stream_name,
109119
};
110120

111121
event.process().await?;
112122
}
113-
} else {
114-
let body = merge(body.clone(), metadata);
115-
let body = merge(body, tags);
116-
117-
let event = event::Event {
118-
body: utils::flatten_json_body(web::Json(body)).unwrap(),
119-
stream_name,
120-
};
121-
122-
event.process().await?;
123+
_ => return Err(PostError::Invalid),
123124
}
124125

125126
Ok(())
@@ -164,13 +165,16 @@ pub mod error {
164165
Header(#[from] ParseHeaderError),
165166
#[error("Event Error: {0}")]
166167
Event(#[from] EventError),
168+
#[error("Invalid Request")]
169+
Invalid,
167170
}
168171

169172
impl actix_web::ResponseError for PostError {
170173
fn status_code(&self) -> http::StatusCode {
171174
match self {
172175
PostError::Header(_) => StatusCode::BAD_REQUEST,
173176
PostError::Event(_) => StatusCode::INTERNAL_SERVER_ERROR,
177+
PostError::Invalid => StatusCode::BAD_REQUEST,
174178
}
175179
}
176180

server/src/metadata.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,8 @@ impl STREAM_INFO {
6161
event.stream_name.to_owned(),
6262
))?;
6363

64-
let event_json: serde_json::Value = serde_json::from_str(&event.body)?;
65-
6664
for alert in &meta.alerts.alerts {
67-
alert.check_alert(event.stream_name.clone(), &event_json)
65+
alert.check_alert(event.stream_name.clone(), &event.body)
6866
}
6967

7068
Ok(())

0 commit comments

Comments
 (0)