Skip to content

Commit 0e68516

Browse files
fix: update data type of all numeric fields in storage schema to Float64
1 parent 2f56ac7 commit 0e68516

File tree

4 files changed

+56
-22
lines changed

4 files changed

+56
-22
lines changed

src/catalog/column.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ impl TypedStatistics {
6666
max: max(this.max, other.max),
6767
})
6868
}
69+
70+
// Ints are casted to Float if self is Float and other in Int
71+
(TypedStatistics::Float(this), TypedStatistics::Int(other)) => {
72+
TypedStatistics::Float(Float64Type {
73+
min: this.min.min(other.min as f64),
74+
max: this.max.max(other.max as f64),
75+
})
76+
}
6977
(TypedStatistics::Float(this), TypedStatistics::Float(other)) => {
7078
TypedStatistics::Float(Float64Type {
7179
min: this.min.min(other.min),

src/event/format/mod.rs

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ pub trait EventFormat: Sized {
103103
return Err(anyhow!("Schema mismatch"));
104104
}
105105
new_schema = update_field_type_in_schema(new_schema, None, time_partition, None);
106+
new_schema = Arc::new(Schema::new(override_num_fields_from_schema(
107+
new_schema.fields().to_vec(),
108+
)));
106109
let rb = Self::decode(data, new_schema.clone())?;
107110
let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows()));
108111
let metadata_arr =
@@ -205,24 +208,21 @@ pub fn override_timestamp_fields(
205208
}
206209

207210
/// All number fields from inferred schema are forced into Float64
208-
pub fn override_num_fields_from_schema(schema: Arc<Schema>) -> Arc<Schema> {
209-
Arc::new(Schema::new(
210-
schema
211-
.fields()
212-
.iter()
213-
.map(|field| {
214-
if field.data_type().is_numeric() {
215-
Arc::new(Field::new(
216-
field.name(),
217-
DataType::Float64,
218-
field.is_nullable(),
219-
))
220-
} else {
221-
field.clone()
222-
}
223-
})
224-
.collect::<Vec<Arc<Field>>>(),
225-
))
211+
pub fn override_num_fields_from_schema(schema: Vec<Arc<Field>>) -> Vec<Arc<Field>> {
212+
schema
213+
.iter()
214+
.map(|field| {
215+
if field.data_type().is_numeric() && field.data_type() != &DataType::Float64 {
216+
Arc::new(Field::new(
217+
field.name(),
218+
DataType::Float64,
219+
field.is_nullable(),
220+
))
221+
} else {
222+
field.clone()
223+
}
224+
})
225+
.collect::<Vec<Arc<Field>>>()
226226
}
227227

228228
pub fn update_field_type_in_schema(
@@ -232,8 +232,6 @@ pub fn update_field_type_in_schema(
232232
log_records: Option<&Vec<Value>>,
233233
) -> Arc<Schema> {
234234
let mut updated_schema = inferred_schema.clone();
235-
updated_schema = override_num_fields_from_schema(updated_schema);
236-
237235
if let Some(existing_schema) = existing_schema {
238236
let existing_fields = get_existing_fields(inferred_schema.clone(), Some(existing_schema));
239237
let existing_timestamp_fields = get_existing_timestamp_fields(existing_schema);

src/metadata.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,20 @@ impl StreamInfo {
164164
Ok(Arc::new(schema))
165165
}
166166

167+
/// update the schema in the metadata
168+
pub fn set_schema(
169+
&self,
170+
stream_name: &str,
171+
schema: HashMap<String, Arc<Field>>,
172+
) -> Result<(), MetadataError> {
173+
let mut map = self.write().expect(LOCK_EXPECT);
174+
map.get_mut(stream_name)
175+
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
176+
.map(|metadata| {
177+
metadata.schema = schema;
178+
})
179+
}
180+
167181
pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> {
168182
let mut map = self.write().expect(LOCK_EXPECT);
169183
map.get_mut(stream_name)

src/storage/object_storage.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use super::{
2525
SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
2626
};
2727

28+
use crate::event::format::override_num_fields_from_schema;
2829
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
2930
use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR};
3031
use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE};
@@ -40,7 +41,7 @@ use crate::{
4041
};
4142

4243
use actix_web_prometheus::PrometheusMetrics;
43-
use arrow_schema::Schema;
44+
use arrow_schema::{Field, Schema};
4445
use async_trait::async_trait;
4546
use bytes::Bytes;
4647
use chrono::Local;
@@ -667,8 +668,21 @@ pub async fn commit_schema_to_storage(
667668
schema: Schema,
668669
) -> Result<(), ObjectStorageError> {
669670
let storage = CONFIG.storage().get_object_store();
670-
let stream_schema = storage.get_schema(stream_name).await?;
671+
let mut stream_schema = storage.get_schema(stream_name).await?;
672+
// override the data type of all numeric fields to Float64
673+
//if data type is not Float64 already
674+
stream_schema = Schema::new(override_num_fields_from_schema(
675+
stream_schema.fields().iter().cloned().collect(),
676+
));
671677
let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap();
678+
679+
//update the merged schema in the metadata and storage
680+
let schema_map: HashMap<String, Arc<Field>> = new_schema
681+
.fields()
682+
.iter()
683+
.map(|field| (field.name().clone(), Arc::clone(field)))
684+
.collect();
685+
let _ = STREAM_INFO.set_schema(stream_name, schema_map);
672686
storage.put_schema(stream_name, &new_schema).await
673687
}
674688

0 commit comments

Comments
 (0)