Skip to content

Commit 34a5c12

Browse files
fix: update numeric fields data type in storage schema before merge
this handles scenario where stream has existing data and event is ingested with additional fields where infer schema differs from storage schema server updates the storage schema before merge with infer schema and persist to storage
1 parent fc44fca commit 34a5c12

File tree

3 files changed

+33
-4
lines changed

3 files changed

+33
-4
lines changed

src/event/format/json.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use serde_json::Value;
2929
use std::{collections::HashMap, sync::Arc};
3030
use tracing::error;
3131

32-
use super::{EventFormat, Metadata, Tags};
32+
use super::{override_num_fields_from_stream_schema, EventFormat, Metadata, Tags};
3333
use crate::utils::{arrow::get_field, json::flatten_json_body};
3434

3535
pub struct Event {
@@ -50,8 +50,9 @@ impl EventFormat for Event {
5050
time_partition: Option<String>,
5151
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
5252
let data = flatten_json_body(self.data, None, None, None, false)?;
53-
let stream_schema = schema;
54-
53+
let mut stream_schema = schema;
54+
//override the number fields to Float64 data type in stream schema
55+
stream_schema = override_num_fields_from_stream_schema(stream_schema);
5556
// incoming event may be a single json or a json array
5657
// but Data (type defined above) is a vector of json values
5758
// hence we need to convert the incoming event to a vector of json values

src/event/format/mod.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,29 @@ pub fn override_num_fields_from_schema(schema: Vec<Arc<Field>>) -> Vec<Arc<Field
222222
.collect::<Vec<Arc<Field>>>()
223223
}
224224

225+
///All number fields from stream schema are forced into Float64
226+
pub fn override_num_fields_from_stream_schema(
227+
schema: HashMap<String, Arc<Field>>,
228+
) -> HashMap<String, Arc<Field>> {
229+
schema
230+
.iter()
231+
.map(|(name, field)| {
232+
if field.data_type().is_numeric() {
233+
(
234+
name.clone(),
235+
Arc::new(Field::new(
236+
field.name(),
237+
DataType::Float64,
238+
field.is_nullable(),
239+
)),
240+
)
241+
} else {
242+
(name.clone(), field.clone())
243+
}
244+
})
245+
.collect::<HashMap<String, Arc<Field>>>()
246+
}
247+
225248
pub fn update_field_type_in_schema(
226249
inferred_schema: Arc<Schema>,
227250
existing_schema: Option<&HashMap<String, Arc<Field>>>,

src/event/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ mod writer;
2222

2323
use arrow_array::RecordBatch;
2424
use arrow_schema::{Field, Fields, Schema};
25+
use format::override_num_fields_from_schema;
2526
use itertools::Itertools;
2627
use std::sync::Arc;
2728
use tracing::error;
@@ -157,7 +158,11 @@ pub fn commit_schema(stream_name: &str, schema: Arc<Schema>) -> Result<(), Event
157158
.get_mut(stream_name)
158159
.expect("map has entry for this stream name")
159160
.schema;
160-
let current_schema = Schema::new(map.values().cloned().collect::<Fields>());
161+
//override the number fields to Float64 data type in memory map schema before merge
162+
let mut current_schema = Schema::new(map.values().cloned().collect::<Fields>());
163+
current_schema = Schema::new(override_num_fields_from_schema(
164+
current_schema.fields.to_vec(),
165+
));
161166
let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?;
162167
map.clear();
163168
map.extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone())));

0 commit comments

Comments
 (0)