Skip to content

Commit 26b2413

Browse files
authored
feat: use exported data for pg_catalog (#133)
* feat: use exported data for pg_catalog * chore: remove analyze script * refactor: move some table definition to dedicated sources * refactor: use cache table oid for pg_attributes * feat: made generated table public api
1 parent cb261e5 commit 26b2413

File tree

70 files changed

+2086
-1648
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+2086
-1648
lines changed

datafusion-postgres/src/pg_catalog.rs

Lines changed: 761 additions & 1648 deletions
Large diffs are not rendered by default.
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
use std::collections::HashMap;
2+
use std::sync::atomic::{AtomicU32, Ordering};
3+
use std::sync::Arc;
4+
5+
use datafusion::arrow::array::{
6+
ArrayRef, BooleanArray, Int16Array, Int32Array, RecordBatch, StringArray,
7+
};
8+
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
9+
use datafusion::catalog::CatalogProviderList;
10+
use datafusion::error::Result;
11+
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
12+
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
13+
use datafusion::physical_plan::streaming::PartitionStream;
14+
use postgres_types::Oid;
15+
use tokio::sync::RwLock;
16+
17+
use super::OidCacheKey;
18+
19+
#[derive(Debug, Clone)]
20+
pub(crate) struct PgAttributeTable {
21+
schema: SchemaRef,
22+
catalog_list: Arc<dyn CatalogProviderList>,
23+
oid_counter: Arc<AtomicU32>,
24+
oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
25+
}
26+
27+
impl PgAttributeTable {
28+
pub(crate) fn new(
29+
catalog_list: Arc<dyn CatalogProviderList>,
30+
oid_counter: Arc<AtomicU32>,
31+
oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
32+
) -> Self {
33+
// Define the schema for pg_attribute
34+
// This matches PostgreSQL's pg_attribute table columns
35+
let schema = Arc::new(Schema::new(vec![
36+
Field::new("attrelid", DataType::Int32, false), // OID of the relation this column belongs to
37+
Field::new("attname", DataType::Utf8, false), // Column name
38+
Field::new("atttypid", DataType::Int32, false), // OID of the column data type
39+
Field::new("attstattarget", DataType::Int32, false), // Statistics target
40+
Field::new("attlen", DataType::Int16, false), // Length of the type
41+
Field::new("attnum", DataType::Int16, false), // Column number (positive for regular columns)
42+
Field::new("attndims", DataType::Int32, false), // Number of dimensions for array types
43+
Field::new("attcacheoff", DataType::Int32, false), // Cache offset
44+
Field::new("atttypmod", DataType::Int32, false), // Type-specific modifier
45+
Field::new("attbyval", DataType::Boolean, false), // True if the type is pass-by-value
46+
Field::new("attalign", DataType::Utf8, false), // Type alignment
47+
Field::new("attstorage", DataType::Utf8, false), // Storage type
48+
Field::new("attcompression", DataType::Utf8, true), // Compression method
49+
Field::new("attnotnull", DataType::Boolean, false), // True if column cannot be null
50+
Field::new("atthasdef", DataType::Boolean, false), // True if column has a default value
51+
Field::new("atthasmissing", DataType::Boolean, false), // True if column has missing values
52+
Field::new("attidentity", DataType::Utf8, false), // Identity column type
53+
Field::new("attgenerated", DataType::Utf8, false), // Generated column type
54+
Field::new("attisdropped", DataType::Boolean, false), // True if column has been dropped
55+
Field::new("attislocal", DataType::Boolean, false), // True if column is local to this relation
56+
Field::new("attinhcount", DataType::Int32, false), // Number of direct inheritance ancestors
57+
Field::new("attcollation", DataType::Int32, false), // OID of collation
58+
Field::new("attacl", DataType::Utf8, true), // Access privileges
59+
Field::new("attoptions", DataType::Utf8, true), // Attribute-level options
60+
Field::new("attfdwoptions", DataType::Utf8, true), // Foreign data wrapper options
61+
Field::new("attmissingval", DataType::Utf8, true), // Missing value for added columns
62+
]));
63+
64+
Self {
65+
schema,
66+
catalog_list,
67+
oid_counter,
68+
oid_cache,
69+
}
70+
}
71+
72+
/// Generate record batches based on the current state of the catalog
73+
async fn get_data(this: Self) -> Result<RecordBatch> {
74+
// Vectors to store column data
75+
let mut attrelids = Vec::new();
76+
let mut attnames = Vec::new();
77+
let mut atttypids = Vec::new();
78+
let mut attstattargets = Vec::new();
79+
let mut attlens = Vec::new();
80+
let mut attnums = Vec::new();
81+
let mut attndimss = Vec::new();
82+
let mut attcacheoffs = Vec::new();
83+
let mut atttymods = Vec::new();
84+
let mut attbyvals = Vec::new();
85+
let mut attaligns = Vec::new();
86+
let mut attstorages = Vec::new();
87+
let mut attcompressions: Vec<Option<String>> = Vec::new();
88+
let mut attnotnulls = Vec::new();
89+
let mut atthasdefs = Vec::new();
90+
let mut atthasmissings = Vec::new();
91+
let mut attidentitys = Vec::new();
92+
let mut attgenerateds = Vec::new();
93+
let mut attisdroppeds = Vec::new();
94+
let mut attislocals = Vec::new();
95+
let mut attinhcounts = Vec::new();
96+
let mut attcollations = Vec::new();
97+
let mut attacls: Vec<Option<String>> = Vec::new();
98+
let mut attoptions: Vec<Option<String>> = Vec::new();
99+
let mut attfdwoptions: Vec<Option<String>> = Vec::new();
100+
let mut attmissingvals: Vec<Option<String>> = Vec::new();
101+
102+
let mut oid_cache = this.oid_cache.write().await;
103+
// Every time when call pg_catalog we generate a new cache and drop the
104+
// original one in case that schemas or tables were dropped.
105+
let mut swap_cache = HashMap::new();
106+
107+
for catalog_name in this.catalog_list.catalog_names() {
108+
if let Some(catalog) = this.catalog_list.catalog(&catalog_name) {
109+
for schema_name in catalog.schema_names() {
110+
if let Some(schema_provider) = catalog.schema(&schema_name) {
111+
// Process all tables in this schema
112+
for table_name in schema_provider.table_names() {
113+
let cache_key = OidCacheKey::Table(
114+
catalog_name.clone(),
115+
schema_name.clone(),
116+
table_name.clone(),
117+
);
118+
let table_oid = if let Some(oid) = oid_cache.get(&cache_key) {
119+
*oid
120+
} else {
121+
this.oid_counter.fetch_add(1, Ordering::Relaxed)
122+
};
123+
swap_cache.insert(cache_key, table_oid);
124+
125+
if let Some(table) = schema_provider.table(&table_name).await? {
126+
let table_schema = table.schema();
127+
128+
// Add column entries for this table
129+
for (column_idx, field) in table_schema.fields().iter().enumerate()
130+
{
131+
let attnum = (column_idx + 1) as i16; // PostgreSQL column numbers start at 1
132+
let (pg_type_oid, type_len, by_val, align, storage) =
133+
Self::datafusion_to_pg_type(field.data_type());
134+
135+
attrelids.push(table_oid as i32);
136+
attnames.push(field.name().clone());
137+
atttypids.push(pg_type_oid);
138+
attstattargets.push(-1); // Default statistics target
139+
attlens.push(type_len);
140+
attnums.push(attnum);
141+
attndimss.push(0); // No array support for now
142+
attcacheoffs.push(-1); // Not cached
143+
atttymods.push(-1); // No type modifiers
144+
attbyvals.push(by_val);
145+
attaligns.push(align.to_string());
146+
attstorages.push(storage.to_string());
147+
attcompressions.push(None); // No compression
148+
attnotnulls.push(!field.is_nullable());
149+
atthasdefs.push(false); // No default values
150+
atthasmissings.push(false); // No missing values
151+
attidentitys.push("".to_string()); // No identity columns
152+
attgenerateds.push("".to_string()); // No generated columns
153+
attisdroppeds.push(false); // Not dropped
154+
attislocals.push(true); // Local to this relation
155+
attinhcounts.push(0); // No inheritance
156+
attcollations.push(0); // Default collation
157+
attacls.push(None); // No ACLs
158+
attoptions.push(None); // No options
159+
attfdwoptions.push(None); // No FDW options
160+
attmissingvals.push(None); // No missing values
161+
}
162+
}
163+
}
164+
}
165+
}
166+
}
167+
}
168+
169+
*oid_cache = swap_cache;
170+
171+
// Create Arrow arrays from the collected data
172+
let arrays: Vec<ArrayRef> = vec![
173+
Arc::new(Int32Array::from(attrelids)),
174+
Arc::new(StringArray::from(attnames)),
175+
Arc::new(Int32Array::from(atttypids)),
176+
Arc::new(Int32Array::from(attstattargets)),
177+
Arc::new(Int16Array::from(attlens)),
178+
Arc::new(Int16Array::from(attnums)),
179+
Arc::new(Int32Array::from(attndimss)),
180+
Arc::new(Int32Array::from(attcacheoffs)),
181+
Arc::new(Int32Array::from(atttymods)),
182+
Arc::new(BooleanArray::from(attbyvals)),
183+
Arc::new(StringArray::from(attaligns)),
184+
Arc::new(StringArray::from(attstorages)),
185+
Arc::new(StringArray::from_iter(attcompressions.into_iter())),
186+
Arc::new(BooleanArray::from(attnotnulls)),
187+
Arc::new(BooleanArray::from(atthasdefs)),
188+
Arc::new(BooleanArray::from(atthasmissings)),
189+
Arc::new(StringArray::from(attidentitys)),
190+
Arc::new(StringArray::from(attgenerateds)),
191+
Arc::new(BooleanArray::from(attisdroppeds)),
192+
Arc::new(BooleanArray::from(attislocals)),
193+
Arc::new(Int32Array::from(attinhcounts)),
194+
Arc::new(Int32Array::from(attcollations)),
195+
Arc::new(StringArray::from_iter(attacls.into_iter())),
196+
Arc::new(StringArray::from_iter(attoptions.into_iter())),
197+
Arc::new(StringArray::from_iter(attfdwoptions.into_iter())),
198+
Arc::new(StringArray::from_iter(attmissingvals.into_iter())),
199+
];
200+
201+
// Create a record batch
202+
let batch = RecordBatch::try_new(this.schema.clone(), arrays)?;
203+
Ok(batch)
204+
}
205+
206+
/// Map DataFusion data types to PostgreSQL type information
207+
fn datafusion_to_pg_type(data_type: &DataType) -> (i32, i16, bool, &'static str, &'static str) {
208+
match data_type {
209+
DataType::Boolean => (16, 1, true, "c", "p"), // bool
210+
DataType::Int8 => (18, 1, true, "c", "p"), // char
211+
DataType::Int16 => (21, 2, true, "s", "p"), // int2
212+
DataType::Int32 => (23, 4, true, "i", "p"), // int4
213+
DataType::Int64 => (20, 8, true, "d", "p"), // int8
214+
DataType::UInt8 => (21, 2, true, "s", "p"), // Treat as int2
215+
DataType::UInt16 => (23, 4, true, "i", "p"), // Treat as int4
216+
DataType::UInt32 => (20, 8, true, "d", "p"), // Treat as int8
217+
DataType::UInt64 => (1700, -1, false, "i", "m"), // Treat as numeric
218+
DataType::Float32 => (700, 4, true, "i", "p"), // float4
219+
DataType::Float64 => (701, 8, true, "d", "p"), // float8
220+
DataType::Utf8 => (25, -1, false, "i", "x"), // text
221+
DataType::LargeUtf8 => (25, -1, false, "i", "x"), // text
222+
DataType::Binary => (17, -1, false, "i", "x"), // bytea
223+
DataType::LargeBinary => (17, -1, false, "i", "x"), // bytea
224+
DataType::Date32 => (1082, 4, true, "i", "p"), // date
225+
DataType::Date64 => (1082, 4, true, "i", "p"), // date
226+
DataType::Time32(_) => (1083, 8, true, "d", "p"), // time
227+
DataType::Time64(_) => (1083, 8, true, "d", "p"), // time
228+
DataType::Timestamp(_, _) => (1114, 8, true, "d", "p"), // timestamp
229+
DataType::Decimal128(_, _) => (1700, -1, false, "i", "m"), // numeric
230+
DataType::Decimal256(_, _) => (1700, -1, false, "i", "m"), // numeric
231+
_ => (25, -1, false, "i", "x"), // Default to text for unknown types
232+
}
233+
}
234+
}
235+
236+
impl PartitionStream for PgAttributeTable {
237+
fn schema(&self) -> &SchemaRef {
238+
&self.schema
239+
}
240+
241+
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
242+
let this = self.clone();
243+
Box::pin(RecordBatchStreamAdapter::new(
244+
this.schema.clone(),
245+
futures::stream::once(async move { Self::get_data(this).await }),
246+
))
247+
}
248+
}

0 commit comments

Comments
 (0)