Skip to content

Commit 0006fe1

Browse files
committed
introduce QueryHook
1 parent e9189fb commit 0006fe1

File tree

2 files changed

+35
-7
lines changed

2 files changed

+35
-7
lines changed

datafusion-postgres/src/handlers.rs

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@ use tokio::sync::Mutex;
2323
use arrow_pg::datatypes::df;
2424
use arrow_pg::datatypes::{arrow_schema_to_pg_fields, into_pg_type};
2525

26+
/// Statement type represents a parsed SQL query with its logical plan
27+
pub type Statement = (String, LogicalPlan);
28+
2629
#[async_trait]
2730
pub trait QueryHook: Send + Sync {
2831
async fn handle_query(
2932
&self,
30-
query: &str,
33+
statement: &Statement,
3134
session_context: &SessionContext,
3235
client: &dyn ClientInfo,
3336
) -> Option<PgWireResult<Vec<Response<'static>>>>;
@@ -387,11 +390,25 @@ impl SimpleQueryHandler for DfSessionService {
387390
}
388391
}
389392

390-
// Check query hook first
393+
// Parse query into logical plan for hook
391394
if let Some(hook) = &self.query_hook {
392-
if let Some(result) = hook.handle_query(query, &self.session_context, client).await {
393-
return result;
395+
// Create logical plan from query
396+
let state = self.session_context.state();
397+
let logical_plan_result = state.create_logical_plan(query).await;
398+
399+
if let Ok(logical_plan) = logical_plan_result {
400+
// Optimize the logical plan
401+
let optimized_result = state.optimize(&logical_plan);
402+
403+
if let Ok(optimized) = optimized_result {
404+
// Create Statement tuple and call hook
405+
let statement = (query.to_string(), optimized);
406+
if let Some(result) = hook.handle_query(&statement, &self.session_context, client).await {
407+
return result;
408+
}
409+
}
394410
}
411+
// If parsing or optimization fails, we'll continue with normal processing
395412
}
396413

397414
let df_result = self.session_context.sql(query).await;
@@ -443,7 +460,7 @@ impl SimpleQueryHandler for DfSessionService {
443460

444461
#[async_trait]
445462
impl ExtendedQueryHandler for DfSessionService {
446-
type Statement = (String, LogicalPlan);
463+
type Statement = Statement;
447464
type QueryParser = Parser;
448465

449466
fn query_parser(&self) -> Arc<Self::QueryParser> {
@@ -513,6 +530,17 @@ impl ExtendedQueryHandler for DfSessionService {
513530
.to_string();
514531
log::debug!("Received execute extended query: {}", query); // Log for debugging
515532

533+
// Check query hook first
534+
if let Some(hook) = &self.query_hook {
535+
if let Some(result) = hook.handle_query(&portal.statement.statement, &self.session_context, client).await {
536+
// Convert Vec<Response> to single Response
537+
// For extended query, we expect a single response
538+
if let Some(response) = result?.into_iter().next() {
539+
return Ok(response);
540+
}
541+
}
542+
}
543+
516544
// Check permissions for the query (skip for SET and SHOW statements)
517545
if !query.starts_with("set") && !query.starts_with("show") {
518546
self.check_query_permission(client, &portal.statement.statement.0)
@@ -553,7 +581,7 @@ pub struct Parser {
553581

554582
#[async_trait]
555583
impl QueryParser for Parser {
556-
type Statement = (String, LogicalPlan);
584+
type Statement = Statement;
557585

558586
async fn parse_sql<C>(
559587
&self,

datafusion-postgres/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use tokio_rustls::TlsAcceptor;
2020

2121
use crate::auth::AuthManager;
2222
use handlers::HandlerFactory;
23-
pub use handlers::{DfSessionService, Parser, QueryHook};
23+
pub use handlers::{DfSessionService, Parser, QueryHook, Statement};
2424

2525
/// re-exports
2626
pub use arrow_pg;

0 commit comments

Comments
 (0)