1+ // sql_middleware.rs
2+
3+ use sqlparser:: {
4+ dialect:: PostgreSqlDialect ,
5+ parser:: Parser ,
6+ } ;
7+ use anyhow:: { Result , anyhow} ;
8+ use datafusion_postgres:: DfSessionService ; // Add this import
9+
10+ /// Splits a multi-statement SQL query into individual statements.
11+ pub fn split_sql_statements ( query : & str ) -> Result < Vec < String > > {
12+ let dialect = PostgreSqlDialect { } ;
13+ let statements = Parser :: parse_sql ( & dialect, query)
14+ . map_err ( |e| anyhow ! ( "Failed to parse SQL query: {:?}" , e) ) ?;
15+
16+ let mut result = Vec :: new ( ) ;
17+ for statement in statements {
18+ result. push ( statement. to_string ( ) ) ;
19+ }
20+ Ok ( result)
21+ }
22+
23+ /// Middleware to handle multi-statement SQL queries by splitting and executing them sequentially.
24+ pub async fn process_multi_statement_query (
25+ query : & str ,
26+ session_service : & DfSessionService ,
27+ ) -> Result < Vec < datafusion:: prelude:: DataFrame > > {
28+ let statements = split_sql_statements ( query) ?;
29+
30+ let mut results = Vec :: new ( ) ;
31+ for statement in statements {
32+ tracing:: info!( "Executing statement: {}" , statement) ;
33+ let df = session_service
34+ . sql ( & statement)
35+ . await
36+ . map_err ( |e| anyhow ! ( "Failed to execute statement '{}': {:?}" , statement, e) ) ?;
37+ results. push ( df) ;
38+ }
39+ Ok ( results)
40+ }
41+
42+ #[ cfg( test) ]
43+ mod tests {
44+ use super :: * ;
45+
46+ #[ test]
47+ fn test_split_sql_statements ( ) {
48+ let query = "SELECT 1; SELECT 2; INSERT INTO telemetry_events (id, projectId, timestamp) VALUES ('1', 'proj1', '2023-01-01T00:00:00Z')" ;
49+ let statements = split_sql_statements ( query) . unwrap ( ) ;
50+ assert_eq ! ( statements. len( ) , 3 ) ;
51+ assert_eq ! ( statements[ 0 ] , "SELECT 1" ) ;
52+ assert_eq ! ( statements[ 1 ] , "SELECT 2" ) ;
53+ assert_eq ! ( statements[ 2 ] , "INSERT INTO telemetry_events (id, projectId, timestamp) VALUES ('1', 'proj1', '2023-01-01T00:00:00Z')" ) ;
54+ }
55+ }
0 commit comments