Skip to content

Commit 26fb0db

Browse files
authored
Implement pg_stat_gssapi & pg_backend_pid() (#185)
Implement the pg_stat_gssapi table, which will always contain a single record showing only the current user. Also, implement pg_backend_pid function. Both of these always use `BACKEND_PID` as the pid, which is currently hard coded to 1. These additions help towards making pgadmin startup queries run.
1 parent 32aa5b6 commit 26fb0db

File tree

2 files changed

+96
-1
lines changed

2 files changed

+96
-1
lines changed

datafusion-postgres/src/pg_catalog.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use std::sync::Arc;
44

55
use async_trait::async_trait;
66
use datafusion::arrow::array::{
7-
as_boolean_array, ArrayRef, AsArray, BooleanBuilder, RecordBatch, StringArray, StringBuilder,
7+
as_boolean_array, ArrayRef, AsArray, BooleanBuilder, Int32Builder, RecordBatch, StringArray,
8+
StringBuilder,
89
};
910
use datafusion::arrow::datatypes::{DataType, Field, Int32Type, SchemaRef};
1011
use datafusion::arrow::ipc::reader::FileReader;
@@ -32,6 +33,7 @@ pub mod pg_get_expr_udf;
3233
pub mod pg_namespace;
3334
pub mod pg_replication_slot;
3435
pub mod pg_settings;
36+
pub mod pg_stat_gssapi;
3537
pub mod pg_tables;
3638
pub mod pg_views;
3739

@@ -100,6 +102,7 @@ const PG_CATALOG_VIEW_PG_SETTINGS: &str = "pg_settings";
100102
const PG_CATALOG_VIEW_PG_VIEWS: &str = "pg_views";
101103
const PG_CATALOG_VIEW_PG_MATVIEWS: &str = "pg_matviews";
102104
const PG_CATALOG_VIEW_PG_TABLES: &str = "pg_tables";
105+
const PG_CATALOG_VIEW_PG_STAT_GSSAPI: &str = "pg_stat_gssapi";
103106
const PG_CATALOG_VIEW_PG_STAT_USER_TABLES: &str = "pg_stat_user_tables";
104107
const PG_CATALOG_VIEW_PG_REPLICATION_SLOTS: &str = "pg_replication_slots";
105108

@@ -339,6 +342,13 @@ impl<C: CatalogInfo> SchemaProvider for PgCatalogSchemaProvider<C> {
339342
vec![table],
340343
)?)))
341344
}
345+
PG_CATALOG_VIEW_PG_STAT_GSSAPI => {
346+
let table = Arc::new(pg_stat_gssapi::PgStatGssApiTable::new());
347+
Ok(Some(Arc::new(StreamingTable::try_new(
348+
Arc::clone(table.schema()),
349+
vec![table],
350+
)?)))
351+
}
342352
PG_CATALOG_VIEW_PG_TABLES => {
343353
let table = Arc::new(pg_tables::PgTablesTable::new(self.catalog_list.clone()));
344354
Ok(Some(Arc::new(StreamingTable::try_new(
@@ -1162,6 +1172,25 @@ pub fn create_pg_encoding_to_char_udf() -> ScalarUDF {
11621172
)
11631173
}
11641174

1175+
pub fn create_pg_backend_pid_udf() -> ScalarUDF {
1176+
let func = move |_args: &[ColumnarValue]| {
1177+
let mut builder = Int32Builder::new();
1178+
builder.append_value(BACKEND_PID);
1179+
let array: ArrayRef = Arc::new(builder.finish());
1180+
Ok(ColumnarValue::Array(array))
1181+
};
1182+
1183+
create_udf(
1184+
"pg_backend_pid",
1185+
vec![],
1186+
DataType::Int32,
1187+
Volatility::Stable,
1188+
Arc::new(func),
1189+
)
1190+
}
1191+
1192+
const BACKEND_PID: i32 = 1;
1193+
11651194
/// Install pg_catalog and postgres UDFs to current `SessionContext`
11661195
pub fn setup_pg_catalog(
11671196
session_context: &SessionContext,
@@ -1207,6 +1236,7 @@ pub fn setup_pg_catalog(
12071236
session_context.register_udf(create_pg_relation_is_publishable_udf());
12081237
session_context.register_udf(create_pg_get_statisticsobjdef_columns_udf());
12091238
session_context.register_udf(create_pg_encoding_to_char_udf());
1239+
session_context.register_udf(create_pg_backend_pid_udf());
12101240

12111241
Ok(())
12121242
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use datafusion::arrow::array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
2+
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
3+
use datafusion::error::Result;
4+
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
5+
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
6+
use datafusion::physical_plan::streaming::PartitionStream;
7+
use std::sync::Arc;
8+
9+
use crate::pg_catalog::BACKEND_PID;
10+
11+
#[derive(Debug, Clone)]
12+
pub(crate) struct PgStatGssApiTable {
13+
schema: SchemaRef,
14+
}
15+
16+
impl PgStatGssApiTable {
17+
pub(crate) fn new() -> Self {
18+
let schema = Arc::new(Schema::new(vec![
19+
Field::new("pid", DataType::Int32, true),
20+
Field::new("gss_authenticated", DataType::Boolean, false),
21+
Field::new("principal", DataType::Utf8, true),
22+
Field::new("encrypted", DataType::Boolean, false),
23+
Field::new("credentials_delegated", DataType::Boolean, false),
24+
]));
25+
26+
Self { schema }
27+
}
28+
29+
/// Generate record batches based on the current state of the catalog
30+
async fn get_data(this: Self) -> Result<RecordBatch> {
31+
let pid = vec![BACKEND_PID];
32+
let gss_authenticated = vec![false];
33+
let principal: Vec<Option<String>> = vec![None];
34+
let encrypted = vec![false];
35+
let credentials_delegated = vec![false];
36+
37+
// Create Arrow arrays from the collected data
38+
let arrays: Vec<ArrayRef> = vec![
39+
Arc::new(Int32Array::from(pid)),
40+
Arc::new(BooleanArray::from(gss_authenticated)),
41+
Arc::new(StringArray::from(principal)),
42+
Arc::new(BooleanArray::from(encrypted)),
43+
Arc::new(BooleanArray::from(credentials_delegated)),
44+
];
45+
46+
// Create a record batch
47+
let batch = RecordBatch::try_new(this.schema.clone(), arrays)?;
48+
49+
Ok(batch)
50+
}
51+
}
52+
53+
impl PartitionStream for PgStatGssApiTable {
54+
fn schema(&self) -> &SchemaRef {
55+
&self.schema
56+
}
57+
58+
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
59+
let this = self.clone();
60+
Box::pin(RecordBatchStreamAdapter::new(
61+
this.schema.clone(),
62+
futures::stream::once(async move { PgStatGssApiTable::get_data(this).await }),
63+
))
64+
}
65+
}

0 commit comments

Comments
 (0)