Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ path = "src/bin.rs"

[dependencies]
futures-util = "0.3.30"
arrow = "52.0.0"
parquet = { version = "52.0.0", features = ["arrow", "async"] }

chrono = "=0.4.38"
arrow = "=53.2.0"
parquet = { version = "=53.2.0", features = ["arrow", "async"] }
axum = "0.7.5"
tokio = { version = "1.37.0", features = ["full"] }
hyper = { version="1.3.1", features = ["full"] }
Expand Down
69 changes: 37 additions & 32 deletions src/loaders/parquet/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,48 @@ use std::sync::Arc;
/// # Returns
///
/// This function returns an Arrow Result with the boolean mask.
pub fn create_boolean_mask(batch: &RecordBatch, original_schema: &Arc<Schema>, filters: Vec<(&str, &str, &str)>) -> arrow::error::Result<Arc<BooleanArray>> {
pub fn create_boolean_mask(batch: &RecordBatch, original_schema: &Arc<Schema>, filters: Vec<Vec<(&str, &str, &str)>>) -> arrow::error::Result<Arc<BooleanArray>> {
let num_rows = batch.num_rows();
let mut boolean_builder = BooleanBuilder::new();

// Initialize all rows as true
for _ in 0..num_rows {
boolean_builder.append_value(true);
}
boolean_builder.append_n(num_rows, false);
let mut boolean_mask = boolean_builder.finish();
for conjunction in filters.iter() {
let mut conj_boolean_builder = BooleanBuilder::new();
conj_boolean_builder.append_n(num_rows, true);
let mut conj_boolean_mask = conj_boolean_builder.finish();
for filter in conjunction.iter() {
let column = batch.column(original_schema.index_of(filter.0).unwrap());

for filter in filters.iter() {
let column = batch.column(original_schema.index_of(filter.0).unwrap());

if column.data_type() == &arrow::datatypes::DataType::Float32 {
let column = column.as_any().downcast_ref::<Float32Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Float64 {
let column = column.as_any().downcast_ref::<Float64Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int16 {
let column = column.as_any().downcast_ref::<Int16Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int32 {
let column = column.as_any().downcast_ref::<Int32Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int64 {
let column = column.as_any().downcast_ref::<Int64Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int8 {
let column = column.as_any().downcast_ref::<Int8Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Boolean {
let column = column.as_any().downcast_ref::<Int16Array>().unwrap();
apply_filter(&mut boolean_mask, column, filter)?;
} else {
return Err(arrow::error::ArrowError::NotYetImplemented(format!("Data type {:?} not yet implemented", column.data_type())));
if column.data_type() == &arrow::datatypes::DataType::Float32 {
let column = column.as_any().downcast_ref::<Float32Array>().unwrap();
apply_filter(&mut conj_boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Float64 {
let column = column.as_any().downcast_ref::<Float64Array>().unwrap();
apply_filter(&mut conj_boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int16 {
let column = column.as_any().downcast_ref::<Int16Array>().unwrap();
apply_filter(&mut conj_boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int32 {
let column = column.as_any().downcast_ref::<Int32Array>().unwrap();
apply_filter(&mut conj_boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int64 {
let column = column.as_any().downcast_ref::<Int64Array>().unwrap();
apply_filter(&mut conj_boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Int8 {
let column = column.as_any().downcast_ref::<Int8Array>().unwrap();
apply_filter(&mut conj_boolean_mask, column, filter)?;
} else if column.data_type() == &arrow::datatypes::DataType::Boolean {
let column = column.as_any().downcast_ref::<Int16Array>().unwrap();
apply_filter(&mut conj_boolean_mask, column, filter)?;
} else {
return Err(arrow::error::ArrowError::NotYetImplemented(format!("Data type {:?} not yet implemented", column.data_type())));
}
}
let mut new_mask = BooleanBuilder::new();
for (index, val) in conj_boolean_mask.iter().enumerate(){
new_mask.append_value(boolean_mask.value(index) || val.unwrap());
}
boolean_mask = new_mask.finish();
}
Ok(Arc::new(boolean_mask))
}
Expand Down
77 changes: 42 additions & 35 deletions src/loaders/parquet/parse_params.rs
Original file line number Diff line number Diff line change
@@ -1,70 +1,77 @@
use std::collections::HashMap;
use regex::Regex;
use std::collections::HashMap;

/// # Arguments
///
///
/// * `params` - A reference to a HashMap of parameters containing 'columns' key.
///
///
/// # Returns
///
///
/// A vector of Polars with the selected columns.
pub fn parse_columns_from_params_to_str(params: &HashMap<String, String>) -> Option<Vec<String>> {
// Parse columns from params

// Initialize a set of columns to return
let mut select_cols = if let Some(cols) = params.get("columns") {
cols.split(",").map(|x| x.to_string()).collect::<Vec<_>>()
Some(cols.split(",").map(|x| x.to_string()).collect::<Vec<_>>())
} else {
Vec::new()
None
};

// If filters exist, extract and add filter columns if not already present
if let Some(query) = params.get("filters") {
let re = Regex::new(r"([0-9a-zA-Z_]+)([!<>=]+)([-+]?[0-9]*\.?[0-9]*)").unwrap();

for filter in query.split(",") {
if let Some(captures) = re.captures(filter) {
let filter_col = captures.get(1).unwrap().as_str();

// Add filter column only if it's not already in select_cols
if !select_cols.contains(&filter_col.to_string()) {
select_cols.push(filter_col.to_string());
if let Some(cols) = select_cols.as_mut() {
if let Some(query) = params.get("filters") {
let re = Regex::new(r"([0-9a-zA-Z_]+)([!<>=]+)([-+]?[0-9]*\.?[0-9]*)").unwrap();

for filter in query.split(",") {
if let Some(captures) = re.captures(filter) {
let filter_col = captures.get(1).unwrap().as_str();

// Add filter column only if it's not already in select_cols
if !cols.contains(&filter_col.to_string()) {
cols.push(filter_col.to_string());
}
}
}
}
}

// Return Some(select_cols) if not empty, otherwise None
if !select_cols.is_empty() {
Some(select_cols)
} else {
None
}
return select_cols;
}

/// # Arguments
///
///
/// * `params` - A reference to a HashMap of parameters containing 'filters' key.
///
///
/// # Returns
///
///
/// A vector of tuples containing the column name, the comparison operator and the value to compare.
pub fn parse_filters(params: &HashMap<String, String>) -> Option<Vec<(&str, &str, &str)>> {
let mut filters = Vec::new();
pub fn parse_filters(params: &HashMap<String, String>) -> Option<Vec<Vec<(&str, &str, &str)>>> {
let mut outer_filters = Vec::new();
if let Some(query) = params.get("filters") {
filters = query.split(",").collect::<Vec<_>>();
outer_filters = query.split(";").collect::<Vec<_>>();
}

if filters.len() == 0 {
return None
if outer_filters.len() == 0 {
return None;
}

let re = Regex::new(r"([0-9a-zA-Z_]+)([!<>=]+)([-+]?[0-9]*\.?[0-9]*)").unwrap();
let mut filter_vec = Vec::new();
for filter in filters {
let f_vec = re.captures(filter).unwrap();
filter_vec.push((f_vec.get(1).unwrap().as_str(), f_vec.get(2).unwrap().as_str(), f_vec.get(3).unwrap().as_str()));
let re = Regex::new(r"([0-9a-zA-Z_]+)([!<>=]+)([-+]?[0-9]*\.?[0-9]*)").unwrap();
for inner_filters in outer_filters {
let mut filters = Vec::new();
filters = inner_filters.split(",").collect::<Vec<_>>();
let mut inner_filter_vec = Vec::new();
for filter in filters {
let f_vec = re.captures(filter).unwrap();
inner_filter_vec.push((
f_vec.get(1).unwrap().as_str(),
f_vec.get(2).unwrap().as_str(),
f_vec.get(3).unwrap().as_str(),
));
}
filter_vec.push(inner_filter_vec);
}

Some(filter_vec)
}
}
5 changes: 4 additions & 1 deletion src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ pub async fn entry_route(
// No Range header: Process and return the full Parquet file as before
match loaders::parquet::parquet::process_and_return_parquet_file(&file_path.to_str().unwrap(), &params).await {
Ok(bytes) => Bytes::from(bytes).into_response(),
Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Failed to load file").into_response(),
Err(e) => {
eprintln!("Application error: {e}");
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to load file").into_response();
},
}
}

Expand Down
13 changes: 12 additions & 1 deletion tests/parsers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
#[cfg(test)]
mod parser {
use lsdb_server::loaders::parquet;
use std::collections::HashMap;


#[tokio::test]
async fn test_parse_filters() {
let mut params = HashMap::new();

params.insert("filters".to_string(), "RA>=30.1241,DEC<=-30.3,RA>30,DEC<=30;RA==1;RA=1,RA!=0".to_string());

let filters = parquet::parse_params::parse_filters(&params);
println!("{:#?}", filters);
// TODO: Add assertions here to verify the result
}
}
Loading