11use axum:: {
2- body:: Bytes , extract:: { OriginalUri , Query } , response:: IntoResponse
2+ body:: Bytes ,
3+ extract:: { OriginalUri , Query } ,
4+ http:: { HeaderMap , StatusCode } ,
5+ response:: { IntoResponse , Response } ,
36} ;
4-
57use std:: collections:: HashMap ;
68use std:: path:: PathBuf ;
79
810use crate :: loaders;
911
10-
11-
12- /// Handles all incoming HTTP GET requests that do not match any other routes.
13- ///
14- /// Captures the URI of the request and any query parameters, loads the parquet file. It processes the Parquet file
15- /// lazily based on the query parameters and returns the content as a byte stream.
16- ///
17- /// # Arguments
18- /// * `uri` - The original URI of the request, used to determine the file to be accessed.
19- /// * `params` - A hash map containing query parameters which may affect how the Parquet file is processed.
20- ///
21- /// # Returns
22- /// This function returns a byte stream that can be directly used as an HTTP response body.
23- pub async fn entry_route ( uri : OriginalUri , Query ( params) : Query < HashMap < String , String > > ) -> impl IntoResponse {
12+ /// Handles HTTP GET requests with Range support for Parquet files.
13+ pub async fn entry_route (
14+ uri : OriginalUri ,
15+ Query ( params) : Query < HashMap < String , String > > ,
16+ headers : HeaderMap ,
17+ ) -> impl IntoResponse {
2418 let path = uri. 0 . path ( ) . trim_start_matches ( "/" ) ;
2519 let base_path = PathBuf :: from ( "/storage2/splus" ) ;
26-
2720 let file_path = base_path. join ( path) ;
28- let bytes = loaders:: parquet:: parquet:: process_and_return_parquet_file ( & file_path. to_str ( ) . unwrap ( ) , & params) . await . unwrap ( ) ;
2921
30- Bytes :: from ( bytes)
22+ // println!("Processing file: {:?}", file_path);
23+
24+ // Check for Range header
25+ if let Some ( range_header) = headers. get ( "Range" ) {
26+ // println!("Range header: {:?}", range_header);
27+ if let Ok ( range_str) = range_header. to_str ( ) {
28+ let full_bytes = match loaders:: parquet:: parquet:: process_and_return_parquet_file ( & file_path. to_str ( ) . unwrap ( ) , & params) . await {
29+ Ok ( data) => data,
30+ Err ( _) => return ( StatusCode :: INTERNAL_SERVER_ERROR , "Failed to load file" ) . into_response ( ) ,
31+ } ;
32+
33+ let file_size = full_bytes. len ( ) as u64 ;
34+
35+ if let Some ( ( start, end) ) = parse_range ( range_str, file_size) {
36+ let end = end. unwrap_or ( file_size - 1 ) ;
37+ if start >= file_size || end >= file_size {
38+ return (
39+ StatusCode :: RANGE_NOT_SATISFIABLE ,
40+ [ ( "Content-Range" , format ! ( "bytes */{}" , file_size) ) ] ,
41+ )
42+ . into_response ( ) ;
43+ }
44+
45+ let chunk = & full_bytes[ start as usize ..=end as usize ] ;
46+ let content_range = format ! ( "bytes {}-{}/{}" , start, end, file_size) ;
47+
48+ return (
49+ StatusCode :: PARTIAL_CONTENT ,
50+ [
51+ ( "Content-Range" , content_range) ,
52+ ( "Accept-Ranges" , "bytes" . to_string ( ) ) ,
53+ ( "Content-Length" , chunk. len ( ) . to_string ( ) ) ,
54+ ] ,
55+ Bytes :: from ( chunk. to_vec ( ) ) ,
56+ )
57+ . into_response ( ) ;
58+ }
59+ }
60+ }
61+
62+ // No Range header: Process and return the full Parquet file as before
63+ match loaders:: parquet:: parquet:: process_and_return_parquet_file ( & file_path. to_str ( ) . unwrap ( ) , & params) . await {
64+ Ok ( bytes) => Bytes :: from ( bytes) . into_response ( ) ,
65+ Err ( _) => ( StatusCode :: INTERNAL_SERVER_ERROR , "Failed to load file" ) . into_response ( ) ,
66+ }
3167}
3268
69+ /// Parses the `Range` header value and returns `(start, end)` positions.
70+ fn parse_range ( range : & str , file_size : u64 ) -> Option < ( u64 , Option < u64 > ) > {
71+ if !range. starts_with ( "bytes=" ) {
72+ return None ;
73+ }
74+
75+ let range = & range[ 6 ..] ; // Remove "bytes="
76+ let parts: Vec < & str > = range. split ( '-' ) . collect ( ) ;
77+
78+ if parts. len ( ) != 2 {
79+ return None ;
80+ }
81+
82+ let start = parts[ 0 ] . parse :: < u64 > ( ) . ok ( ) ;
83+ let end = parts[ 1 ] . parse :: < u64 > ( ) . ok ( ) ;
84+
85+ match ( start, end) {
86+ ( Some ( s) , Some ( e) ) if s <= e => Some ( ( s, Some ( e) ) ) , // Valid range
87+ ( Some ( s) , None ) => Some ( ( s, None ) ) , // Open-ended range
88+ _ => None , // Invalid range
89+ }
90+ }
0 commit comments