Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ impl TypedStatistics {
max: max(this.max, other.max),
})
}

// Ints are casted to Float if self is Float and other in Int
(TypedStatistics::Float(this), TypedStatistics::Int(other)) => {
TypedStatistics::Float(Float64Type {
min: this.min.min(other.min as f64),
max: this.max.max(other.max as f64),
})
}
(TypedStatistics::Float(this), TypedStatistics::Float(other)) => {
TypedStatistics::Float(Float64Type {
min: this.min.min(other.min),
Expand Down
4 changes: 3 additions & 1 deletion src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
DataType::Boolean => value.is_boolean(),
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(),
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(),
DataType::Float16 | DataType::Float32 | DataType::Float64 => value.is_f64(),
DataType::Float16 | DataType::Float32 => value.is_f64(),
// NOTE: All numbers can be ingested as Float64
DataType::Float64 => value.is_number(),
DataType::Utf8 => value.is_string(),
DataType::List(field) => {
let data_type = field.data_type();
Expand Down
22 changes: 22 additions & 0 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,24 @@ pub fn override_timestamp_fields(
Arc::new(Schema::new(updated_fields))
}

/// All number fields from inferred schema are forced into Float64
pub fn override_num_fields_from_schema(schema: Vec<Arc<Field>>) -> Vec<Arc<Field>> {
schema
.iter()
.map(|field| {
if field.data_type().is_numeric() && field.data_type() != &DataType::Float64 {
Arc::new(Field::new(
field.name(),
DataType::Float64,
field.is_nullable(),
))
} else {
field.clone()
}
})
.collect::<Vec<Arc<Field>>>()
}

pub fn update_field_type_in_schema(
inferred_schema: Arc<Schema>,
existing_schema: Option<&HashMap<String, Arc<Field>>>,
Expand All @@ -212,6 +230,10 @@ pub fn update_field_type_in_schema(
) -> Arc<Schema> {
let mut updated_schema = inferred_schema.clone();

// All number fields from inferred schema are forced into Float64
updated_schema = Arc::new(Schema::new(override_num_fields_from_schema(
updated_schema.fields().to_vec(),
)));
if let Some(existing_schema) = existing_schema {
let existing_fields = get_existing_fields(inferred_schema.clone(), Some(existing_schema));
let existing_timestamp_fields = get_existing_timestamp_fields(existing_schema);
Expand Down
61 changes: 28 additions & 33 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ mod tests {
use std::{collections::HashMap, sync::Arc};

use actix_web::test::TestRequest;
use arrow_array::{ArrayRef, Float64Array, Int64Array, StringArray};
use arrow_array::{ArrayRef, Float64Array, StringArray};
use arrow_schema::{DataType, Field};
use serde_json::json;

Expand All @@ -287,16 +287,11 @@ mod tests {
};

trait TestExt {
fn as_int64_arr(&self) -> &Int64Array;
fn as_float64_arr(&self) -> &Float64Array;
fn as_utf8_arr(&self) -> &StringArray;
}

impl TestExt for ArrayRef {
fn as_int64_arr(&self) -> &Int64Array {
self.as_any().downcast_ref().unwrap()
}

fn as_float64_arr(&self) -> &Float64Array {
self.as_any().downcast_ref().unwrap()
}
Expand Down Expand Up @@ -328,8 +323,8 @@ mod tests {
assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 6);
assert_eq!(
rb.column_by_name("a").unwrap().as_int64_arr(),
&Int64Array::from_iter([1])
rb.column_by_name("a").unwrap().as_float64_arr(),
&Float64Array::from_iter([1.0])
);
assert_eq!(
rb.column_by_name("b").unwrap().as_utf8_arr(),
Expand Down Expand Up @@ -368,8 +363,8 @@ mod tests {
assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 5);
assert_eq!(
rb.column_by_name("a").unwrap().as_int64_arr(),
&Int64Array::from_iter([1])
rb.column_by_name("a").unwrap().as_float64_arr(),
&Float64Array::from_iter([1.0])
);
assert_eq!(
rb.column_by_name("b").unwrap().as_utf8_arr(),
Expand All @@ -386,7 +381,7 @@ mod tests {

let schema = fields_to_map(
[
Field::new("a", DataType::Int64, true),
Field::new("a", DataType::Float64, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Float64, true),
]
Expand All @@ -400,8 +395,8 @@ mod tests {
assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 5);
assert_eq!(
rb.column_by_name("a").unwrap().as_int64_arr(),
&Int64Array::from_iter([1])
rb.column_by_name("a").unwrap().as_float64_arr(),
&Float64Array::from_iter([1.0])
);
assert_eq!(
rb.column_by_name("b").unwrap().as_utf8_arr(),
Expand All @@ -418,7 +413,7 @@ mod tests {

let schema = fields_to_map(
[
Field::new("a", DataType::Int64, true),
Field::new("a", DataType::Float64, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Float64, true),
]
Expand Down Expand Up @@ -488,21 +483,21 @@ mod tests {
let schema = rb.schema();
let fields = &schema.fields;

assert_eq!(&*fields[1], &Field::new("a", DataType::Int64, true));
assert_eq!(&*fields[1], &Field::new("a", DataType::Float64, true));
assert_eq!(&*fields[2], &Field::new("b", DataType::Utf8, true));
assert_eq!(&*fields[3], &Field::new("c", DataType::Int64, true));
assert_eq!(&*fields[3], &Field::new("c", DataType::Float64, true));

assert_eq!(
rb.column_by_name("a").unwrap().as_int64_arr(),
&Int64Array::from(vec![None, Some(1), Some(1)])
rb.column_by_name("a").unwrap().as_float64_arr(),
&Float64Array::from(vec![None, Some(1.0), Some(1.0)])
);
assert_eq!(
rb.column_by_name("b").unwrap().as_utf8_arr(),
&StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),])
);
assert_eq!(
rb.column_by_name("c").unwrap().as_int64_arr(),
&Int64Array::from(vec![None, Some(1), None])
rb.column_by_name("c").unwrap().as_float64_arr(),
&Float64Array::from(vec![None, Some(1.0), None])
);
}

Expand Down Expand Up @@ -533,8 +528,8 @@ mod tests {
assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 6);
assert_eq!(
rb.column_by_name("a").unwrap().as_int64_arr(),
&Int64Array::from(vec![None, Some(1), Some(1)])
rb.column_by_name("a").unwrap().as_float64_arr(),
&Float64Array::from(vec![None, Some(1.0), Some(1.0)])
);
assert_eq!(
rb.column_by_name("b").unwrap().as_utf8_arr(),
Expand Down Expand Up @@ -568,7 +563,7 @@ mod tests {

let schema = fields_to_map(
[
Field::new("a", DataType::Int64, true),
Field::new("a", DataType::Float64, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Float64, true),
]
Expand All @@ -581,8 +576,8 @@ mod tests {
assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 6);
assert_eq!(
rb.column_by_name("a").unwrap().as_int64_arr(),
&Int64Array::from(vec![None, Some(1), Some(1)])
rb.column_by_name("a").unwrap().as_float64_arr(),
&Float64Array::from(vec![None, Some(1.0), Some(1.0)])
);
assert_eq!(
rb.column_by_name("b").unwrap().as_utf8_arr(),
Expand All @@ -608,7 +603,7 @@ mod tests {
"c": 1
},
{
"a": 1,
"a": "1",
"b": "hello",
"c": null
},
Expand All @@ -618,7 +613,7 @@ mod tests {

let schema = fields_to_map(
[
Field::new("a", DataType::Int64, true),
Field::new("a", DataType::Float64, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Float64, true),
]
Expand Down Expand Up @@ -658,8 +653,8 @@ mod tests {
assert_eq!(rb.num_rows(), 4);
assert_eq!(rb.num_columns(), 7);
assert_eq!(
rb.column_by_name("a").unwrap().as_int64_arr(),
&Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)])
rb.column_by_name("a").unwrap().as_float64_arr(),
&Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)])
);
assert_eq!(
rb.column_by_name("b").unwrap().as_utf8_arr(),
Expand All @@ -672,13 +667,13 @@ mod tests {
);

assert_eq!(
rb.column_by_name("c_a").unwrap().as_int64_arr(),
&Int64Array::from(vec![None, None, Some(1), Some(1)])
rb.column_by_name("c_a").unwrap().as_float64_arr(),
&Float64Array::from(vec![None, None, Some(1.0), Some(1.0)])
);

assert_eq!(
rb.column_by_name("c_b").unwrap().as_int64_arr(),
&Int64Array::from(vec![None, None, None, Some(2)])
rb.column_by_name("c_b").unwrap().as_float64_arr(),
&Float64Array::from(vec![None, None, None, Some(2.0)])
);
}
}
14 changes: 14 additions & 0 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,20 @@ impl StreamInfo {
Ok(Arc::new(schema))
}

/// update the schema in the metadata
pub fn set_schema(
&self,
stream_name: &str,
schema: HashMap<String, Arc<Field>>,
) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| {
metadata.schema = schema;
})
}

pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
Expand Down
18 changes: 16 additions & 2 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use super::{
SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
};

use crate::event::format::override_num_fields_from_schema;
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR};
use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE};
Expand All @@ -40,7 +41,7 @@ use crate::{
};

use actix_web_prometheus::PrometheusMetrics;
use arrow_schema::Schema;
use arrow_schema::{Field, Schema};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::Local;
Expand Down Expand Up @@ -667,8 +668,21 @@ pub async fn commit_schema_to_storage(
schema: Schema,
) -> Result<(), ObjectStorageError> {
let storage = CONFIG.storage().get_object_store();
let stream_schema = storage.get_schema(stream_name).await?;
let mut stream_schema = storage.get_schema(stream_name).await?;
// override the data type of all numeric fields to Float64
//if data type is not Float64 already
stream_schema = Schema::new(override_num_fields_from_schema(
stream_schema.fields().iter().cloned().collect(),
));
let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap();

//update the merged schema in the metadata and storage
let schema_map: HashMap<String, Arc<Field>> = new_schema
.fields()
.iter()
.map(|field| (field.name().clone(), Arc::clone(field)))
.collect();
let _ = STREAM_INFO.set_schema(stream_name, schema_map);
storage.put_schema(stream_name, &new_schema).await
}

Expand Down
Loading