Skip to content

Commit 44b9fa4

Browse files
committed
Add support for pg_replication_slots
Add support for pg_replication_slots to the catalog, reducing the number of failures in the pg_admin startup queries.
1 parent 298e06d commit 44b9fa4

File tree

2 files changed

+37
-1
lines changed

2 files changed

+37
-1
lines changed

datafusion-postgres/src/pg_catalog.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub mod pg_class;
3030
pub mod pg_database;
3131
pub mod pg_get_expr_udf;
3232
pub mod pg_namespace;
33+
pub mod pg_replication_slot;
3334
pub mod pg_settings;
3435
pub mod pg_tables;
3536
pub mod pg_views;
@@ -100,6 +101,7 @@ const PG_CATALOG_VIEW_PG_VIEWS: &str = "pg_views";
100101
const PG_CATALOG_VIEW_PG_MATVIEWS: &str = "pg_matviews";
101102
const PG_CATALOG_VIEW_PG_TABLES: &str = "pg_tables";
102103
const PG_CATALOG_VIEW_PG_STAT_USER_TABELS: &str = "pg_stat_user_tables";
104+
const PG_CATALOG_VIEW_PG_REPLICATION_SLOTS: &str = "pg_replication_slots";
103105

104106
pub const PG_CATALOG_TABLES: &[&str] = &[
105107
PG_CATALOG_TABLE_PG_AGGREGATE,
@@ -167,6 +169,7 @@ pub const PG_CATALOG_TABLES: &[&str] = &[
167169
PG_CATALOG_VIEW_PG_VIEWS,
168170
PG_CATALOG_VIEW_PG_MATVIEWS,
169171
PG_CATALOG_VIEW_PG_STAT_USER_TABELS,
172+
PG_CATALOG_VIEW_PG_REPLICATION_SLOTS,
170173
];
171174

172175
#[derive(Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
@@ -352,7 +355,10 @@ impl<C: CatalogInfo> SchemaProvider for PgCatalogSchemaProvider<C> {
352355
PG_CATALOG_VIEW_PG_STAT_USER_TABELS => {
353356
Ok(Some(Arc::new(pg_views::pg_stat_user_tables()?)))
354357
}
355-
358+
PG_CATALOG_VIEW_PG_REPLICATION_SLOTS => {
359+
let table = pg_replication_slot::pg_replication_slots()?;
360+
Ok(Some(table))
361+
}
356362
_ => Ok(None),
357363
}
358364
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use crate::pg_catalog::empty_table::EmptyTable;
2+
use datafusion::arrow::datatypes::{DataType, Field, Schema};
3+
use datafusion::catalog::TableProvider;
4+
use datafusion::error::Result;
5+
use std::sync::Arc;
6+
7+
pub(crate) fn pg_replication_slots() -> Result<Arc<dyn TableProvider>> {
8+
let schema = Arc::new(Schema::new(vec![
9+
Field::new("slot_name", DataType::Utf8, true),
10+
Field::new("plugin", DataType::Utf8, true),
11+
Field::new("slot_type", DataType::Utf8, true),
12+
Field::new("datoid", DataType::Int32, true),
13+
Field::new("database", DataType::Utf8, true),
14+
Field::new("temporary", DataType::Boolean, false),
15+
Field::new("active", DataType::Boolean, false),
16+
Field::new("active_pid", DataType::Int32, true),
17+
Field::new("xmin", DataType::Int32, true),
18+
Field::new("catalog_xmin", DataType::Int32, true),
19+
Field::new("restart_lsn", DataType::Utf8, true), // TODO: is this the correct type to use?
20+
Field::new("confirmed_flush_lsn", DataType::Utf8, true), // TODO: is this the correct type to use?
21+
Field::new("wal_status", DataType::Utf8, true),
22+
Field::new("safe_wal_size", DataType::Int64, true),
23+
Field::new("two_phase", DataType::Boolean, false),
24+
Field::new("conflicting", DataType::Boolean, false),
25+
]));
26+
27+
let table = EmptyTable::new(schema).try_into_memtable()?;
28+
29+
Ok(Arc::new(table))
30+
}

0 commit comments

Comments
 (0)