Skip to content

Commit 4da1988

Browse files
committed
refactor: use cache table oid for pg_attributes
1 parent 4d3fbc7 commit 4da1988

File tree

2 files changed

+43
-20
lines changed

2 files changed

+43
-20
lines changed

datafusion-postgres/src/pg_catalog.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,8 @@ impl SchemaProvider for PgCatalogSchemaProvider {
306306
PG_CATALOG_TABLE_PG_ATTRIBUTE => {
307307
let table = Arc::new(pg_attribute::PgAttributeTable::new(
308308
self.catalog_list.clone(),
309+
self.oid_counter.clone(),
310+
self.oid_cache.clone(),
309311
));
310312
Ok(Some(Arc::new(
311313
StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),

datafusion-postgres/src/pg_catalog/pg_attribute.rs

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::HashMap;
2+
use std::sync::atomic::{AtomicU32, Ordering};
13
use std::sync::Arc;
24

35
use datafusion::arrow::array::{
@@ -9,15 +11,25 @@ use datafusion::error::Result;
911
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
1012
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1113
use datafusion::physical_plan::streaming::PartitionStream;
14+
use postgres_types::Oid;
15+
use tokio::sync::RwLock;
1216

13-
#[derive(Debug)]
17+
use super::OidCacheKey;
18+
19+
#[derive(Debug, Clone)]
1420
pub(crate) struct PgAttributeTable {
1521
schema: SchemaRef,
1622
catalog_list: Arc<dyn CatalogProviderList>,
23+
oid_counter: Arc<AtomicU32>,
24+
oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
1725
}
1826

1927
impl PgAttributeTable {
20-
pub(crate) fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
28+
pub(crate) fn new(
29+
catalog_list: Arc<dyn CatalogProviderList>,
30+
oid_counter: Arc<AtomicU32>,
31+
oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
32+
) -> Self {
2133
// Define the schema for pg_attribute
2234
// This matches PostgreSQL's pg_attribute table columns
2335
let schema = Arc::new(Schema::new(vec![
@@ -52,14 +64,13 @@ impl PgAttributeTable {
5264
Self {
5365
schema,
5466
catalog_list,
67+
oid_counter,
68+
oid_cache,
5569
}
5670
}
5771

5872
/// Generate record batches based on the current state of the catalog
59-
async fn get_data(
60-
schema: SchemaRef,
61-
catalog_list: Arc<dyn CatalogProviderList>,
62-
) -> Result<RecordBatch> {
73+
async fn get_data(this: Self) -> Result<RecordBatch> {
6374
// Vectors to store column data
6475
let mut attrelids = Vec::new();
6576
let mut attnames = Vec::new();
@@ -88,19 +99,28 @@ impl PgAttributeTable {
8899
let mut attfdwoptions: Vec<Option<String>> = Vec::new();
89100
let mut attmissingvals: Vec<Option<String>> = Vec::new();
90101

91-
// Start OID counter (should be consistent with pg_class)
92-
// FIXME: oid
93-
let mut next_oid = 10000;
102+
let mut oid_cache = this.oid_cache.write().await;
103+
// Every time when call pg_catalog we generate a new cache and drop the
104+
// original one in case that schemas or tables were dropped.
105+
let mut swap_cache = HashMap::new();
94106

95-
// Iterate through all catalogs and schemas
96-
for catalog_name in catalog_list.catalog_names() {
97-
if let Some(catalog) = catalog_list.catalog(&catalog_name) {
107+
for catalog_name in this.catalog_list.catalog_names() {
108+
if let Some(catalog) = this.catalog_list.catalog(&catalog_name) {
98109
for schema_name in catalog.schema_names() {
99110
if let Some(schema_provider) = catalog.schema(&schema_name) {
100111
// Process all tables in this schema
101112
for table_name in schema_provider.table_names() {
102-
let table_oid = next_oid;
103-
next_oid += 1;
113+
let cache_key = OidCacheKey::Table(
114+
catalog_name.clone(),
115+
schema_name.clone(),
116+
table_name.clone(),
117+
);
118+
let table_oid = if let Some(oid) = oid_cache.get(&cache_key) {
119+
*oid
120+
} else {
121+
this.oid_counter.fetch_add(1, Ordering::Relaxed)
122+
};
123+
swap_cache.insert(cache_key, table_oid);
104124

105125
if let Some(table) = schema_provider.table(&table_name).await? {
106126
let table_schema = table.schema();
@@ -112,7 +132,7 @@ impl PgAttributeTable {
112132
let (pg_type_oid, type_len, by_val, align, storage) =
113133
Self::datafusion_to_pg_type(field.data_type());
114134

115-
attrelids.push(table_oid);
135+
attrelids.push(table_oid as i32);
116136
attnames.push(field.name().clone());
117137
atttypids.push(pg_type_oid);
118138
attstattargets.push(-1); // Default statistics target
@@ -146,6 +166,8 @@ impl PgAttributeTable {
146166
}
147167
}
148168

169+
*oid_cache = swap_cache;
170+
149171
// Create Arrow arrays from the collected data
150172
let arrays: Vec<ArrayRef> = vec![
151173
Arc::new(Int32Array::from(attrelids)),
@@ -177,7 +199,7 @@ impl PgAttributeTable {
177199
];
178200

179201
// Create a record batch
180-
let batch = RecordBatch::try_new(schema.clone(), arrays)?;
202+
let batch = RecordBatch::try_new(this.schema.clone(), arrays)?;
181203
Ok(batch)
182204
}
183205

@@ -217,11 +239,10 @@ impl PartitionStream for PgAttributeTable {
217239
}
218240

219241
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
220-
let catalog_list = self.catalog_list.clone();
221-
let schema = Arc::clone(&self.schema);
242+
let this = self.clone();
222243
Box::pin(RecordBatchStreamAdapter::new(
223-
schema.clone(),
224-
futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
244+
this.schema.clone(),
245+
futures::stream::once(async move { Self::get_data(this).await }),
225246
))
226247
}
227248
}

0 commit comments

Comments
 (0)