Skip to content

Commit 0b8d952

Browse files
committed
feat: implment extended query part
1 parent 59a9eea commit 0b8d952

File tree

6 files changed

+96
-30
lines changed

6 files changed

+96
-30
lines changed

datafusion-postgres/src/handlers.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use pgwire::messages::response::TransactionStatus;
2525

2626
use crate::auth::AuthManager;
2727
use crate::client;
28+
use crate::hooks::set_show::SetShowHook;
2829
use crate::hooks::QueryHook;
2930
use arrow_pg::datatypes::df;
3031
use arrow_pg::datatypes::{arrow_schema_to_pg_fields, into_pg_type};
@@ -43,12 +44,18 @@ pub struct HandlerFactory {
4344
}
4445

4546
impl HandlerFactory {
46-
pub fn new(
47+
pub fn new(session_context: Arc<SessionContext>, auth_manager: Arc<AuthManager>) -> Self {
48+
let session_service =
49+
Arc::new(DfSessionService::new(session_context, auth_manager.clone()));
50+
HandlerFactory { session_service }
51+
}
52+
53+
pub fn new_with_hooks(
4754
session_context: Arc<SessionContext>,
4855
auth_manager: Arc<AuthManager>,
4956
query_hooks: Vec<Arc<dyn QueryHook>>,
5057
) -> Self {
51-
let session_service = Arc::new(DfSessionService::new(
58+
let session_service = Arc::new(DfSessionService::new_with_hooks(
5259
session_context,
5360
auth_manager.clone(),
5461
query_hooks,
@@ -98,6 +105,14 @@ impl DfSessionService {
98105
pub fn new(
99106
session_context: Arc<SessionContext>,
100107
auth_manager: Arc<AuthManager>,
108+
) -> DfSessionService {
109+
let hooks: Vec<Arc<dyn QueryHook>> = vec![Arc::new(SetShowHook)];
110+
Self::new_with_hooks(session_context, auth_manager, hooks)
111+
}
112+
113+
pub fn new_with_hooks(
114+
session_context: Arc<SessionContext>,
115+
auth_manager: Arc<AuthManager>,
101116
query_hooks: Vec<Arc<dyn QueryHook>>,
102117
) -> DfSessionService {
103118
let parser = Arc::new(Parser {
@@ -601,7 +616,6 @@ impl Parser {
601616

602617
// show statement may not be supported by datafusion
603618
if sql_trimmed.starts_with("show") {
604-
// Return a dummy plan for transaction commands - they'll be handled by transaction handler
605619
let show_schema =
606620
Arc::new(Schema::new(vec![Field::new("show", DataType::Utf8, false)]));
607621
let df_schema = show_schema.to_dfschema()?;

datafusion-postgres/src/hooks/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use async_trait::async_trait;
55
use datafusion::common::ParamValues;
66
use datafusion::logical_expr::LogicalPlan;
77
use datafusion::prelude::SessionContext;
8-
use datafusion::sql::sqlparser;
8+
use datafusion::sql::sqlparser::ast::Statement;
99
use pgwire::api::results::Response;
1010
use pgwire::api::ClientInfo;
1111
use pgwire::error::PgWireResult;
@@ -15,23 +15,23 @@ pub trait QueryHook: Send + Sync {
1515
/// called in simple query handler to return response directly
1616
async fn handle_simple_query(
1717
&self,
18-
statement: &sqlparser::ast::Statement,
18+
statement: &Statement,
1919
session_context: &SessionContext,
2020
client: &mut (dyn ClientInfo + Send + Sync),
2121
) -> Option<PgWireResult<Response>>;
2222

2323
/// called at extended query parse phase, for generating `LogicalPlan`from statement
2424
async fn handle_extended_parse_query(
2525
&self,
26-
statement: &sqlparser::ast::Statement,
26+
sql: &Statement,
2727
session_context: &SessionContext,
2828
client: &(dyn ClientInfo + Send + Sync),
2929
) -> Option<PgWireResult<LogicalPlan>>;
3030

3131
/// called at extended query execute phase, for query execution
3232
async fn handle_extended_query(
3333
&self,
34-
statement: &sqlparser::ast::Statement,
34+
statement: &Statement,
3535
logical_plan: &LogicalPlan,
3636
params: &ParamValues,
3737
session_context: &SessionContext,

datafusion-postgres/src/hooks/set_show.rs

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
use std::sync::Arc;
22

33
use async_trait::async_trait;
4-
use datafusion::common::ParamValues;
4+
use datafusion::arrow::datatypes::{DataType, Field, Schema};
5+
use datafusion::common::{ParamValues, ToDFSchema};
56
use datafusion::logical_expr::LogicalPlan;
67
use datafusion::prelude::SessionContext;
78
use datafusion::sql::sqlparser::ast::Statement;
89
use log::{info, warn};
9-
use pgwire::api::results::{
10-
DataRowEncoder, DescribePortalResponse, DescribeResponse, DescribeStatementResponse,
11-
FieldFormat, FieldInfo, QueryResponse, Response, Tag,
12-
};
10+
use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo, QueryResponse, Response, Tag};
1311
use pgwire::api::ClientInfo;
1412
use pgwire::error::{PgWireError, PgWireResult};
1513
use postgres_types::Type;
@@ -48,22 +46,66 @@ impl QueryHook for SetShowHook {
4846

4947
async fn handle_extended_parse_query(
5048
&self,
51-
statement: &Statement,
52-
session_context: &SessionContext,
53-
client: &(dyn ClientInfo + Send + Sync),
49+
stmt: &Statement,
50+
_session_context: &SessionContext,
51+
_client: &(dyn ClientInfo + Send + Sync),
5452
) -> Option<PgWireResult<LogicalPlan>> {
55-
None
53+
let sql_lower = stmt.to_string().to_lowercase();
54+
let sql_trimmed = sql_lower.trim();
55+
56+
if sql_trimmed.starts_with("show") {
57+
let show_schema =
58+
Arc::new(Schema::new(vec![Field::new("show", DataType::Utf8, false)]));
59+
let result = show_schema
60+
.to_dfschema()
61+
.map(|df_schema| {
62+
LogicalPlan::EmptyRelation(datafusion::logical_expr::EmptyRelation {
63+
produce_one_row: true,
64+
schema: Arc::new(df_schema),
65+
})
66+
})
67+
.map_err(|e| PgWireError::ApiError(Box::new(e)));
68+
Some(result)
69+
} else if sql_trimmed.starts_with("set") {
70+
let show_schema = Arc::new(Schema::new(Vec::<Field>::new()));
71+
let result = show_schema
72+
.to_dfschema()
73+
.map(|df_schema| {
74+
LogicalPlan::EmptyRelation(datafusion::logical_expr::EmptyRelation {
75+
produce_one_row: true,
76+
schema: Arc::new(df_schema),
77+
})
78+
})
79+
.map_err(|e| PgWireError::ApiError(Box::new(e)));
80+
Some(result)
81+
} else {
82+
None
83+
}
5684
}
5785

5886
async fn handle_extended_query(
5987
&self,
6088
statement: &Statement,
61-
logical_plan: &LogicalPlan,
62-
params: &ParamValues,
89+
_logical_plan: &LogicalPlan,
90+
_params: &ParamValues,
6391
session_context: &SessionContext,
6492
client: &mut (dyn ClientInfo + Send + Sync),
6593
) -> Option<PgWireResult<Response>> {
66-
None
94+
match statement {
95+
Statement::Set { .. } => {
96+
let query = statement.to_string();
97+
let query_lower = query.to_lowercase();
98+
99+
try_respond_set_statements(client, &query_lower, session_context).await
100+
}
101+
Statement::ShowVariable { .. } | Statement::ShowStatus { .. } => {
102+
let query = statement.to_string();
103+
let query_lower = query.to_lowercase();
104+
105+
try_respond_show_statements(client, &query_lower, session_context).await
106+
}
107+
_ => None,
108+
}
67109
}
68110
}
69111

datafusion-postgres/src/lib.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,25 @@ pub async fn serve(
8989
auth_manager: Arc<AuthManager>,
9090
) -> Result<(), std::io::Error> {
9191
// Create the handler factory with authentication
92-
let factory = Arc::new(HandlerFactory::new(session_context, auth_manager, vec![]));
92+
let factory = Arc::new(HandlerFactory::new(session_context, auth_manager));
93+
94+
serve_with_handlers(factory, opts).await
95+
}
96+
97+
/// Serve the Datafusion `SessionContext` with Postgres protocol, using custom
98+
/// query processing hooks.
99+
pub async fn serve_with_hooks(
100+
session_context: Arc<SessionContext>,
101+
opts: &ServerOptions,
102+
auth_manager: Arc<AuthManager>,
103+
hooks: Vec<Arc<dyn QueryHook>>,
104+
) -> Result<(), std::io::Error> {
105+
// Create the handler factory with authentication
106+
let factory = Arc::new(HandlerFactory::new_with_hooks(
107+
session_context,
108+
auth_manager,
109+
hooks,
110+
));
93111

94112
serve_with_handlers(factory, opts).await
95113
}

datafusion-postgres/src/testing.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,7 @@ pub fn setup_handlers() -> DfSessionService {
2121
)
2222
.expect("Failed to setup sesession context");
2323

24-
DfSessionService::new(
25-
Arc::new(session_context),
26-
Arc::new(AuthManager::new()),
27-
vec![],
28-
)
24+
DfSessionService::new(Arc::new(session_context), Arc::new(AuthManager::new()))
2925
}
3026

3127
#[derive(Debug, Default)]

datafusion-postgres/tests/common/mod.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,7 @@ pub fn setup_handlers() -> DfSessionService {
2121
)
2222
.expect("Failed to setup sesession context");
2323

24-
DfSessionService::new(
25-
Arc::new(session_context),
26-
Arc::new(AuthManager::new()),
27-
vec![],
28-
)
24+
DfSessionService::new(Arc::new(session_context), Arc::new(AuthManager::new()))
2925
}
3026

3127
#[derive(Debug, Default)]

0 commit comments

Comments
 (0)