Skip to content

Commit bf4472d

Browse files
committed
Add function current_schema and also tests for current_catalog and etc.
1 parent 0cd0ce6 commit bf4472d

File tree

3 files changed

+54
-0
lines changed

3 files changed

+54
-0
lines changed

src/sql/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,5 +256,19 @@ mod tests {
256256
assert_eq!(column_id.value(3), 2);
257257
assert_eq!(column_count.value(3), 3);
258258
assert_eq!(column_description.value(3), "NNNNNN");
259+
260+
let mut stream = executor
261+
.execute_sql("select current_catalog, current_database() as database, current_schema(), inet_client_port()")
262+
.await
263+
.unwrap();
264+
let record = stream.next().await.unwrap().unwrap();
265+
let column_catalog = record.column(0).as_any().downcast_ref::<StringArray>().unwrap();
266+
let column_database = record.column(1).as_any().downcast_ref::<StringArray>().unwrap();
267+
let column_schema = record.column(2).as_any().downcast_ref::<StringArray>().unwrap();
268+
let column_client_port = record.column(3).as_any().downcast_ref::<Int32Array>().unwrap();
269+
assert_eq!(column_catalog.value(0), "test1");
270+
assert_eq!(column_database.value(0), "test1");
271+
assert_eq!(column_schema.value(0), "public");
272+
assert_eq!(column_client_port.value(0), 0);
259273
}
260274
}

src/sql/postgresql/functions/mod.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,45 @@ use datafusion::logical_expr::{
2828

2929
use crate::sql::context::SqlContext;
3030

31+
#[derive(Debug)]
32+
pub struct CurrentSchema {
33+
context: Arc<SqlContext>,
34+
signature: Signature,
35+
aliases: Vec<String>,
36+
}
37+
38+
impl CurrentSchema {
39+
pub fn new(context: Arc<SqlContext>) -> Self {
40+
Self { context, signature: Signature::new(TypeSignature::Nullary, Volatility::Stable), aliases: vec![] }
41+
}
42+
}
43+
44+
impl ScalarUDFImpl for CurrentSchema {
45+
fn as_any(&self) -> &(dyn Any + 'static) {
46+
self
47+
}
48+
49+
fn name(&self) -> &str {
50+
"current_schema"
51+
}
52+
53+
fn signature(&self) -> &Signature {
54+
&self.signature
55+
}
56+
57+
fn aliases(&self) -> &[String] {
58+
&self.aliases
59+
}
60+
61+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType, DataFusionError> {
62+
Ok(DataType::Utf8)
63+
}
64+
65+
fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue, DataFusionError> {
66+
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(self.context.current_schema().to_string().into())))
67+
}
68+
}
69+
3170
#[derive(Debug)]
3271
pub struct CurrentCatalog {
3372
context: Arc<SqlContext>,

src/sql/postgresql/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ use crate::tablet::TabletClient;
7474
pub fn create_all_scalar_functions(context: &Arc<SqlContext>) -> Vec<Arc<ScalarUDF>> {
7575
vec![
7676
Arc::new(ScalarUDF::new_from_impl(self::functions::CurrentCatalog::new(context.clone()))),
77+
Arc::new(ScalarUDF::new_from_impl(self::functions::CurrentSchema::new(context.clone()))),
7778
Arc::new(ScalarUDF::new_from_impl(self::functions::CurrentUser::new(context.clone()))),
7879
Arc::new(ScalarUDF::new_from_impl(self::functions::InetClientPort::new(context.clone()))),
7980
]

0 commit comments

Comments
 (0)