Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 77 additions & 19 deletions src/routes.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,90 @@
use axum::{
body::Bytes, extract::{OriginalUri, Query}, response::IntoResponse
body::Bytes,
extract::{OriginalUri, Query},
http::{HeaderMap, StatusCode},
response::{IntoResponse, Response},
};

use std::collections::HashMap;
use std::path::PathBuf;

use crate::loaders;



/// Handles all incoming HTTP GET requests that do not match any other routes.
///
/// Captures the URI of the request and any query parameters, loads the parquet file. It processes the Parquet file
/// lazily based on the query parameters and returns the content as a byte stream.
///
/// # Arguments
/// * `uri` - The original URI of the request, used to determine the file to be accessed.
/// * `params` - A hash map containing query parameters which may affect how the Parquet file is processed.
///
/// # Returns
/// This function returns a byte stream that can be directly used as an HTTP response body.
pub async fn entry_route(uri: OriginalUri, Query(params): Query<HashMap<String, String>>) -> impl IntoResponse {
/// Handles HTTP GET requests with Range support for Parquet files.
pub async fn entry_route(
uri: OriginalUri,
Query(params): Query<HashMap<String, String>>,
headers: HeaderMap,
) -> impl IntoResponse {
let path = uri.0.path().trim_start_matches("/");
let base_path = PathBuf::from("/storage2/splus");

let file_path = base_path.join(path);
let bytes = loaders::parquet::parquet::process_and_return_parquet_file(&file_path.to_str().unwrap(), &params).await.unwrap();

Bytes::from(bytes)
// println!("Processing file: {:?}", file_path);

// Check for Range header
if let Some(range_header) = headers.get("Range") {
// println!("Range header: {:?}", range_header);
if let Ok(range_str) = range_header.to_str() {
let full_bytes = match loaders::parquet::parquet::process_and_return_parquet_file(&file_path.to_str().unwrap(), &params).await {
Ok(data) => data,
Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to load file").into_response(),
};

let file_size = full_bytes.len() as u64;

if let Some((start, end)) = parse_range(range_str, file_size) {
let end = end.unwrap_or(file_size - 1);
if start >= file_size || end >= file_size {
return (
StatusCode::RANGE_NOT_SATISFIABLE,
[("Content-Range", format!("bytes */{}", file_size))],
)
.into_response();
}

let chunk = &full_bytes[start as usize..=end as usize];
let content_range = format!("bytes {}-{}/{}", start, end, file_size);

return (
StatusCode::PARTIAL_CONTENT,
[
("Content-Range", content_range),
("Accept-Ranges", "bytes".to_string()),
("Content-Length", chunk.len().to_string()),
],
Bytes::from(chunk.to_vec()),
)
.into_response();
}
}
}

// No Range header: Process and return the full Parquet file as before
match loaders::parquet::parquet::process_and_return_parquet_file(&file_path.to_str().unwrap(), &params).await {
Ok(bytes) => Bytes::from(bytes).into_response(),
Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Failed to load file").into_response(),
}
}

/// Parses the `Range` header value and returns `(start, end)` positions.
fn parse_range(range: &str, file_size: u64) -> Option<(u64, Option<u64>)> {
if !range.starts_with("bytes=") {
return None;
}

let range = &range[6..]; // Remove "bytes="
let parts: Vec<&str> = range.split('-').collect();

if parts.len() != 2 {
return None;
}

let start = parts[0].parse::<u64>().ok();
let end = parts[1].parse::<u64>().ok();

match (start, end) {
(Some(s), Some(e)) if s <= e => Some((s, Some(e))), // Valid range
(Some(s), None) => Some((s, None)), // Open-ended range
_ => None, // Invalid range
}
}
Loading