@@ -47,7 +47,7 @@ use tonic::{Request, Response, Status, async_trait, transport::Server};
4747
4848use crate :: {
4949 explain:: DistributedExplainExec ,
50- protobuf:: DistributedExplainExecNode as ProtoDistributedExplainExec ,
50+ protobuf:: DistributedExplainExecNode ,
5151 flight:: { FlightSqlHandler , FlightSqlServ } ,
5252 k8s:: get_worker_addresses,
5353 logging:: { debug, info, trace} ,
@@ -110,7 +110,10 @@ impl DfRayProxyHandler {
110110 Self { }
111111 }
112112
113- /// common planning steps shared by both query and its EXPLAIN
113+ /// Common planning steps shared by both query and its EXPLAIN
114+ ///
115+ /// Prepare a query by parsing the SQL, planning it, and distributing the
116+ /// physical plan into stages that can be executed by workers.
114117 async fn prepare_query_base ( & self , sql : & str , query_type : & str ) -> Result < QueryPlanBase > {
115118 debug ! ( "prepare_query_base: {} SQL = {}" , query_type, sql) ;
116119
@@ -162,7 +165,7 @@ impl DfRayProxyHandler {
162165 } )
163166 }
164167
165- /// Prepare an EXPLAIN query - handles parsing, planning, and generating all plan representations
168+ /// Prepare an EXPLAIN query
166169 /// This method only handles EXPLAIN queries (plan only). EXPLAIN ANALYZE queries are handled as regular queries because they need to be executed.
167170 pub async fn prepare_explain ( & self , sql : & str ) -> Result < QueryPlan > {
168171 // Validate that this is actually an EXPLAIN query (not EXPLAIN ANALYZE)
@@ -268,15 +271,15 @@ impl DfRayProxyHandler {
268271 /// EXPLAIN queries return comprehensive plan information including logical, physical,
269272 /// distributed plan, and execution stages for analysis and debugging purposes.
270273 async fn handle_explain_request ( & self , query : & str ) -> Result < Response < FlightInfo > , Status > {
271- let explain_plan = self
274+ let plans = self
272275 . prepare_explain ( query)
273276 . await
274277 . map_err ( |e| Status :: internal ( format ! ( "Could not prepare EXPLAIN query {e:?}" ) ) ) ?;
275278
276- debug ! ( "get flight info: EXPLAIN query id {}" , explain_plan . query_id) ;
279+ debug ! ( "get flight info: EXPLAIN query id {}" , plans . query_id) ;
277280
278- let proto_explain_data = explain_plan . explain_data . map ( |data| {
279- ProtoDistributedExplainExec {
281+ let explain_data = plans . explain_data . map ( |data| {
282+ DistributedExplainExecNode {
280283 schema : data. schema ( ) . as_ref ( ) . try_into ( ) . ok ( ) ,
281284 logical_plan : data. logical_plan ( ) . to_string ( ) ,
282285 physical_plan : data. physical_plan ( ) . to_string ( ) ,
@@ -286,11 +289,11 @@ impl DfRayProxyHandler {
286289 } ) ;
287290
288291 let flight_info = self . create_flight_info_response (
289- explain_plan . query_id ,
290- explain_plan . worker_addresses ,
291- explain_plan . final_stage_id ,
292- explain_plan . schema ,
293- proto_explain_data
292+ plans . query_id ,
293+ plans . worker_addresses ,
294+ plans . final_stage_id ,
295+ plans . schema ,
296+ explain_data
294297 ) ?;
295298
296299 trace ! ( "get_flight_info_statement done for EXPLAIN" ) ;
@@ -626,7 +629,7 @@ mod tests {
626629 /// Create a test handler for testing - bypasses worker discovery initialization
627630 fn create_test_handler ( ) -> DfRayProxyHandler {
628631 // Create the handler directly without calling new() to avoid worker discovery
629- // during test initialization. The init_test_env() function handles mock setup.
632+ // during test initialization.
630633 DfRayProxyHandler { }
631634 }
632635
@@ -642,17 +645,12 @@ mod tests {
642645 addrs
643646 }
644647
645- /// Mock worker addresses for testing - bypasses worker discovery
646- fn setup_mock_worker_addresses ( ) -> Vec < ( String , String ) > {
647- vec ! [
648- ( "mock_worker_1" . to_string( ) , "localhost:9001" . to_string( ) ) ,
649- ( "mock_worker_2" . to_string( ) , "localhost:9002" . to_string( ) ) ,
650- ]
651- }
652-
653648 /// Set up mock worker environment for testing
654649 fn setup_mock_worker_env ( ) {
655- let mock_addrs = setup_mock_worker_addresses ( ) ;
650+ let mock_addrs = vec ! [
651+ ( "mock_worker_1" . to_string( ) , "localhost:9001" . to_string( ) ) ,
652+ ( "mock_worker_2" . to_string( ) , "localhost:9002" . to_string( ) ) ,
653+ ] ;
656654 let mock_env_value = mock_addrs. iter ( )
657655 . map ( |( name, addr) | format ! ( "{}/{}" , name, addr) )
658656 . collect :: < Vec < _ > > ( )
0 commit comments