diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 0a137fdb2..a0b5174dd 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -29,7 +29,7 @@ use serde_json::Value; use std::{collections::HashMap, sync::Arc}; use tracing::error; -use super::{EventFormat, Metadata, Tags}; +use super::{override_num_fields_from_stream_schema, EventFormat, Metadata, Tags}; use crate::utils::{arrow::get_field, json::flatten_json_body}; pub struct Event { @@ -50,8 +50,9 @@ impl EventFormat for Event { time_partition: Option, ) -> Result<(Self::Data, Vec>, bool, Tags, Metadata), anyhow::Error> { let data = flatten_json_body(self.data, None, None, None, false)?; - let stream_schema = schema; - + let mut stream_schema = schema; + //override the number fields to Float64 data type in stream schema + stream_schema = override_num_fields_from_stream_schema(stream_schema); // incoming event may be a single json or a json array // but Data (type defined above) is a vector of json values // hence we need to convert the incoming event to a vector of json values diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index e0bb00daf..960c427b6 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -222,6 +222,29 @@ pub fn override_num_fields_from_schema(schema: Vec>) -> Vec>>() } +///All number fields from stream schema are forced into Float64 +pub fn override_num_fields_from_stream_schema( + schema: HashMap>, +) -> HashMap> { + schema + .iter() + .map(|(name, field)| { + if field.data_type().is_numeric() { + ( + name.clone(), + Arc::new(Field::new( + field.name(), + DataType::Float64, + field.is_nullable(), + )), + ) + } else { + (name.clone(), field.clone()) + } + }) + .collect::>>() +} + pub fn update_field_type_in_schema( inferred_schema: Arc, existing_schema: Option<&HashMap>>, diff --git a/src/event/mod.rs b/src/event/mod.rs index 42773ed12..0d5d6882d 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -22,6 +22,7 @@ mod writer; use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema}; +use format::override_num_fields_from_schema; use itertools::Itertools; use std::sync::Arc; use tracing::error; @@ -157,7 +158,11 @@ pub fn commit_schema(stream_name: &str, schema: Arc) -> Result<(), Event .get_mut(stream_name) .expect("map has entry for this stream name") .schema; - let current_schema = Schema::new(map.values().cloned().collect::()); + //override the number fields to Float64 data type in memory map schema before merge + let mut current_schema = Schema::new(map.values().cloned().collect::()); + current_schema = Schema::new(override_num_fields_from_schema( + current_schema.fields.to_vec(), + )); let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?; map.clear(); map.extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone())));