Skip to content

Commit 07f2193

Browse files
committed
feat: implement dbeaver startup queries one-by-one
1 parent 9c68a78 commit 07f2193

File tree

5 files changed

+55
-11
lines changed

5 files changed

+55
-11
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-postgres/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,6 @@ tokio = { version = "1.47", features = ["sync", "net"] }
2828
tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
2929
rustls-pemfile = "2.0"
3030
rustls-pki-types = "1.0"
31+
32+
[dev-dependencies]
33+
env_logger = "0.11"

datafusion-postgres/src/handlers.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use async_trait::async_trait;
66
use datafusion::arrow::datatypes::DataType;
77
use datafusion::logical_expr::LogicalPlan;
88
use datafusion::prelude::*;
9+
use log::warn;
910
use pgwire::api::auth::noop::NoopStartupHandler;
1011
use pgwire::api::auth::StartupHandler;
1112
use pgwire::api::portal::{Format, Portal};
@@ -198,14 +199,12 @@ impl DfSessionService {
198199
}
199200
} else {
200201
// pass SET query to datafusion
201-
let df = self
202-
.session_context
203-
.sql(query_lower)
204-
.await
205-
.map_err(|err| PgWireError::ApiError(Box::new(err)))?;
206-
207-
let resp = df::encode_dataframe(df, &Format::UnifiedText).await?;
208-
Ok(Some(Response::Query(resp)))
202+
if let Err(e) = self.session_context.sql(query_lower).await {
203+
warn!("SET statement {query_lower} is not supported by datafusion, error {e}, statement ignored");
204+
}
205+
206+
// Always return SET success
207+
Ok(Some(Response::Execution(Tag::new("SET"))))
209208
}
210209
} else {
211210
Ok(None)

datafusion-postgres/src/pg_catalog.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -869,6 +869,26 @@ pub fn create_format_type_udf() -> ScalarUDF {
869869
)
870870
}
871871

872+
pub fn create_session_user_udf() -> ScalarUDF {
873+
let func = move |_args: &[ColumnarValue]| {
874+
let mut builder = StringBuilder::new();
875+
// TODO: return real user
876+
builder.append_value("postgres");
877+
878+
let array: ArrayRef = Arc::new(builder.finish());
879+
880+
Ok(ColumnarValue::Array(array))
881+
};
882+
883+
create_udf(
884+
"session_user",
885+
vec![],
886+
DataType::Utf8,
887+
Volatility::Stable,
888+
Arc::new(func),
889+
)
890+
}
891+
872892
/// Install pg_catalog and postgres UDFs to current `SessionContext`
873893
pub fn setup_pg_catalog(
874894
session_context: &SessionContext,
@@ -892,6 +912,7 @@ pub fn setup_pg_catalog(
892912
session_context.register_udf(create_has_table_privilege_2param_udf());
893913
session_context.register_udf(create_pg_table_is_visible());
894914
session_context.register_udf(create_format_type_udf());
915+
session_context.register_udf(create_session_user_udf());
895916

896917
Ok(())
897918
}

datafusion-postgres/tests/dbeaver.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,32 @@ mod common;
33
use common::*;
44
use pgwire::api::query::SimpleQueryHandler;
55

6+
const DBEAVER_QUERIES: &[&str] = &[
7+
"SET extra_float_digits = 3",
8+
"SET application_name = 'PostgreSQL JDBC Driver'",
9+
"SET application_name = 'DBeaver 25.1.5 - Main <postgres>'",
10+
"SELECT current_schema(),session_user",
11+
"SELECT n.oid,n.*,d.description FROM pg_catalog.pg_namespace n LEFT OUTER JOIN pg_catalog.pg_description d ON d.objoid=n.oid AND d.objsubid=0 AND d.classoid='pg_namespace' ORDER BY nspsname",
12+
"SELECT n.nspsname = ANY(current_schemas(true)), n.nspsname, t.typname FROM pg_catalog.pg_type t JOIN pg_catalog.pg_namespace n ON t.typrelid = n.oid WHERE pg_type.oid = 1034",
13+
"SHOW search_path",
14+
"SELECT db.oid,db.* FROM pg_catalog.pg_database db WHERE datname='postgres'",
15+
"SELECT * FROM pg_catalog.pg_settinngs where name='standard_conforming_strings'",
16+
"SELECT string_agg(word, ',' ) from pg_catalog.pg_get_keywords() where word <> ALL ('{a,abs,absolute,action,ada,add,admin,after,all,allocate,alter,aIways,and,any,are,array,as,asc,asenstitive,assertion,assignment,asymmetric,at,atomic,attribute,attributes,authorization,avg,before,begin,bernoulli,between,bigint,binary,blob,boolean,both,breaadth,by,c,call,called,cardinaliity,cascade,cascaded,case,cast,catalog,catalog_name,ceil,ceiling,chain,char,char_length,character,character_length,character_set_catalog,character_set_name,character_set_schema,characteristics,characters,check,checkeed,class_origin,clob,close,coalesce,coboI,code_units,collate,collation,collaition_catalog,collaition_name,collaition_schema,collect,colum,column_name,command_function,command_function_code,commit,committed,condiition,condiition_number,connect,connection_name,constraint,constraint_catalog,constraint_name,constraint_schema,constraints,constructors,contains,continue,convert,corr,correspondiing,count,covar_pop,covar_samp,create,cross,cube,cume_dist,current,current_collation,current_date,current_default_transfom_group,current_path,current_role,current_time,current_timestamp,current_transfom_group_for_type,current_user,cursor,cursor_name,cycle,data,date,datetime_interval_code,datetime_interval_precision,day,deallocate,dec,decimaI,declare,default,defaults,not,null,nullable,nullif,nulls,number,numeric,object,octeet_length,octets,of,old,on,only,open,option,options,or,order,ordering,ordinaliity,others,out,outer,output,over,overlaps,overlay,overriding,pad,parameter,parameter_mode,parameter_name,parameter_ordinal_position,parameter_speciific_catalog,parameter_speciific_name,parameter_speciific_schema,partiaI,partitioon,pascal,path,percent_rank,percentile_cont,percentile_disc,placing,pli,position,power,preceding,precision,prepare,preseerv,primary,prior,privileges,procedure,public,range,rank,read,reads,real,recursivve,ref,references,referencing,regr_avgx,regr_avgy,regr_count,regr_intercept,regr_r2,regr_slope,regr_sxx,regr_sxy,regr_sy y,relative,release,repeatable,restart,result,retun,returned_cardinality,returned_length,returned_octeet_length,returned_sqlstate,returns,revoe,right,role,rollback,rollup,routine,routine_catalog,routine_name,routine_schema,row,row_count,row_number,rows,savepoint,scale,schema,schema_name,scope_catalog,scope_name,scope_schema,scroll,search,second,section,security,select,self,sensitive,sequence,seriializeable,server_name,session,session_user,set,sets,similar,simple,size,smalIint,some,source,space,specifiic,speciific_name,speciifictype,sql,sqlexception,sqlstate,sqlwarning,sqrt,start,state,statement,static,stddev_pop,stddev_samp,structure,style,subclass_origin,submultiset,substring,sum,symmetric,system,system_user,table,table_name,tablesample,temporary,then,ties,time,timesamp,timezone_hour,timezone_minute,to,top_level_count,trailing,transaction,transaction_active,transactions_committed,transactions_rolled_back,transfor,transforms,translate,translation,treat,trigger,trigger_catalog,trigger_name,trigger_schema,trim,true,type,unbounde,undefined,uncommitted,under,union,unique,unknown,unnaamed,unnest,update,upper,usage,user,user_defined_type_catalog,user_defined_type_code,user_defined_type_name,user_defined_type_schema,using,value,values,var_pop,var_samp,varchar,varying,view,when,whenever,where,width_bucket,window,with,within,without,work,write,year,zone",
17+
"SELECT version()",
18+
"SELECT * FROM pg_catalog.pg_enum WHERE 1<>1 LIMIT 1",
19+
"SELECT reltype FROM pg_catalog.pg_class WHERE 1<>1 LIMIT 1",
20+
"SELECT t.oid,t.*,c.relkind,format_type(nullif(t.typbasetype, 0), t.typtypmod) as base_type_name, d.description FROM pg_catalog.pg_type t LEFT OUTER JOIN pg_catalog.pg_type et ON et.oid=t.typelem LEFT OUTER JOIN pg_catalog.pg_class c ON c.oid=t.typrelid LEFT OUTER JOIN pg_catalog.pg_description d ON t.oid=d.objoid WHERE t.typname IS NOT NULL AND (c.relkind IS NULL OR c.relkind = 'c') AND (et.typcategory IS NULL OR et.typcategory <> 'C')",
21+
];
22+
623
#[tokio::test]
724
pub async fn test_dbeaver_startup_sql() {
25+
env_logger::init();
826
let service = setup_handlers();
927
let mut client = MockClient::new();
1028

11-
SimpleQueryHandler::do_query(&service, &mut client, "SELECT 1")
12-
.await
13-
.expect("failed to run sql");
29+
for query in DBEAVER_QUERIES {
30+
SimpleQueryHandler::do_query(&service, &mut client, query)
31+
.await
32+
.expect(&format!("failed to run sql: {query}"));
33+
}
1434
}

0 commit comments

Comments
 (0)