Skip to content

Commit 02ef801

Browse files
authored
feat: metabase startup queries (#160)
* feat: add initial support for metabase startup queries * feat: add pg_views/pg_matviews/pg_tables views * feat: improve support for current_user and privilege functions Signed-off-by: Ning Sun <[email protected]> * feat: pg_stat_user_tables * fix: refactor has_privilege udfs * fix: extended query for unsupported show statements * chore: update README for database clients --------- Signed-off-by: Ning Sun <[email protected]>
1 parent 7f91a89 commit 02ef801

File tree

10 files changed

+570
-112
lines changed

10 files changed

+570
-112
lines changed

README.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,21 @@ project.
2222
- Permission control
2323
- Built-in `pg_catalog` tables
2424
- Built-in postgres functions for common meta queries
25-
- [x] DBeaver compatibility
26-
- [x] pgcli compatibility
2725
- `datafusion-postgres-cli`: A cli tool starts a postgres compatible server for
2826
datafusion supported file formats, just like python's `SimpleHTTPServer`.
2927
- `arrow-pg`: A data type mapping, encoding/decoding library for arrow and
3028
postgres(pgwire) data types.
3129

3230
See `auth.rs` for complete implementation examples using `DfAuthSource`.
3331

32+
## Supported Database Clients
33+
34+
- Database Clients
35+
- [x] DBeaver
36+
- [x] pgcli
37+
- BI
38+
- [x] Metabase
39+
3440
## Quick Start
3541

3642
### The Library `datafusion-postgres`

datafusion-postgres/src/handlers.rs

Lines changed: 62 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ use std::sync::Arc;
33

44
use crate::auth::{AuthManager, Permission, ResourceType};
55
use crate::sql::{
6-
parse, rewrite, AliasDuplicatedProjectionRewrite, BlacklistSqlRewriter, FixArrayLiteral,
7-
PrependUnqualifiedPgTableName, RemoveTableFunctionQualifier, RemoveUnsupportedTypes,
8-
ResolveUnqualifiedIdentifer, RewriteArrayAnyAllOperation, SqlStatementRewriteRule,
6+
parse, rewrite, AliasDuplicatedProjectionRewrite, BlacklistSqlRewriter,
7+
CurrentUserVariableToSessionUserFunctionCall, FixArrayLiteral, PrependUnqualifiedPgTableName,
8+
RemoveTableFunctionQualifier, RemoveUnsupportedTypes, ResolveUnqualifiedIdentifer,
9+
RewriteArrayAnyAllOperation, SqlStatementRewriteRule,
910
};
1011
use async_trait::async_trait;
11-
use datafusion::arrow::datatypes::DataType;
12+
use datafusion::arrow::datatypes::{DataType, Field, Schema};
13+
use datafusion::common::ToDFSchema;
14+
use datafusion::error::DataFusionError;
1215
use datafusion::logical_expr::LogicalPlan;
1316
use datafusion::prelude::*;
1417
use datafusion::sql::parser::Statement;
@@ -107,6 +110,7 @@ impl DfSessionService {
107110
Arc::new(PrependUnqualifiedPgTableName),
108111
Arc::new(FixArrayLiteral),
109112
Arc::new(RemoveTableFunctionQualifier),
113+
Arc::new(CurrentUserVariableToSessionUserFunctionCall),
110114
];
111115
let parser = Arc::new(Parser {
112116
session_context: session_context.clone(),
@@ -420,13 +424,15 @@ impl DfSessionService {
420424
let resp = Self::mock_show_response("statement_timeout", &timeout_str)?;
421425
Ok(Some(Response::Query(resp)))
422426
}
423-
_ => Err(PgWireError::UserError(Box::new(
424-
pgwire::error::ErrorInfo::new(
425-
"ERROR".to_string(),
426-
"42704".to_string(),
427-
format!("Unrecognized SHOW command: {query_lower}"),
428-
),
429-
))),
427+
"show transaction isolation level" => {
428+
let resp = Self::mock_show_response("transaction_isolation", "read_committed")?;
429+
Ok(Some(Response::Query(resp)))
430+
}
431+
_ => {
432+
info!("Unsupported show statement: {query_lower}");
433+
let resp = Self::mock_show_response("unsupported_show_statement", "")?;
434+
Ok(Some(Response::Query(resp)))
435+
}
430436
}
431437
} else {
432438
Ok(None)
@@ -714,24 +720,15 @@ pub struct Parser {
714720
sql_rewrite_rules: Vec<Arc<dyn SqlStatementRewriteRule>>,
715721
}
716722

717-
#[async_trait]
718-
impl QueryParser for Parser {
719-
type Statement = (String, LogicalPlan);
720-
721-
async fn parse_sql<C>(
722-
&self,
723-
_client: &C,
724-
sql: &str,
725-
_types: &[Type],
726-
) -> PgWireResult<Self::Statement> {
727-
log::debug!("Received parse extended query: {sql}"); // Log for debugging
728-
723+
impl Parser {
724+
fn try_shortcut_parse_plan(&self, sql: &str) -> Result<Option<LogicalPlan>, DataFusionError> {
729725
// Check for transaction commands that shouldn't be parsed by DataFusion
730726
let sql_lower = sql.to_lowercase();
731727
let sql_trimmed = sql_lower.trim();
728+
732729
if matches!(
733730
sql_trimmed,
734-
"begin"
731+
"" | "begin"
735732
| "begin transaction"
736733
| "begin work"
737734
| "start transaction"
@@ -747,13 +744,50 @@ impl QueryParser for Parser {
747744
) {
748745
// Return a dummy plan for transaction commands - they'll be handled by transaction handler
749746
let dummy_schema = datafusion::common::DFSchema::empty();
750-
let dummy_plan = datafusion::logical_expr::LogicalPlan::EmptyRelation(
747+
return Ok(Some(LogicalPlan::EmptyRelation(
751748
datafusion::logical_expr::EmptyRelation {
752749
produce_one_row: false,
753-
schema: std::sync::Arc::new(dummy_schema),
750+
schema: Arc::new(dummy_schema),
754751
},
755-
);
756-
return Ok((sql.to_string(), dummy_plan));
752+
)));
753+
}
754+
755+
// show statement may not be supported by datafusion
756+
if sql_trimmed.starts_with("show") {
757+
// Return a dummy plan for transaction commands - they'll be handled by transaction handler
758+
let show_schema =
759+
Arc::new(Schema::new(vec![Field::new("show", DataType::Utf8, false)]));
760+
let df_schema = show_schema.to_dfschema()?;
761+
return Ok(Some(LogicalPlan::EmptyRelation(
762+
datafusion::logical_expr::EmptyRelation {
763+
produce_one_row: true,
764+
schema: Arc::new(df_schema),
765+
},
766+
)));
767+
}
768+
769+
Ok(None)
770+
}
771+
}
772+
773+
#[async_trait]
774+
impl QueryParser for Parser {
775+
type Statement = (String, LogicalPlan);
776+
777+
async fn parse_sql<C>(
778+
&self,
779+
_client: &C,
780+
sql: &str,
781+
_types: &[Type],
782+
) -> PgWireResult<Self::Statement> {
783+
log::debug!("Received parse extended query: {sql}"); // Log for debugging
784+
785+
// Check for transaction commands that shouldn't be parsed by DataFusion
786+
if let Some(plan) = self
787+
.try_shortcut_parse_plan(sql)
788+
.map_err(|e| PgWireError::ApiError(Box::new(e)))?
789+
{
790+
return Ok((sql.to_string(), plan));
757791
}
758792

759793
let mut statements = parse(sql).map_err(|e| PgWireError::ApiError(Box::new(e)))?;

0 commit comments

Comments
 (0)