Skip to content

Commit 247c787

Browse files
authored
feat: keep a cache of generated oids (#114)
1 parent e655d53 commit 247c787

File tree

1 file changed

+108
-73
lines changed

1 file changed

+108
-73
lines changed

datafusion-postgres/src/pg_catalog.rs

Lines changed: 108 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::HashMap;
2+
use std::sync::atomic::{AtomicU32, Ordering};
13
use std::sync::Arc;
24

35
use async_trait::async_trait;
@@ -16,6 +18,8 @@ use datafusion::logical_expr::{ColumnarValue, ScalarUDF, Volatility};
1618
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1719
use datafusion::physical_plan::streaming::PartitionStream;
1820
use datafusion::prelude::{create_udf, SessionContext};
21+
use postgres_types::Oid;
22+
use tokio::sync::RwLock;
1923

2024
const PG_CATALOG_TABLE_PG_TYPE: &str = "pg_type";
2125
const PG_CATALOG_TABLE_PG_CLASS: &str = "pg_class";
@@ -208,10 +212,19 @@ impl PgTypesData {
208212
}
209213
}
210214

215+
#[derive(Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
216+
enum OidCacheKey {
217+
Schema(String),
218+
/// Table by schema and table name
219+
Table(String, String),
220+
}
221+
211222
// Create custom schema provider for pg_catalog
212223
#[derive(Debug)]
213224
pub struct PgCatalogSchemaProvider {
214225
catalog_list: Arc<dyn CatalogProviderList>,
226+
oid_counter: Arc<AtomicU32>,
227+
oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
215228
}
216229

217230
#[async_trait]
@@ -229,13 +242,21 @@ impl SchemaProvider for PgCatalogSchemaProvider {
229242
PG_CATALOG_TABLE_PG_TYPE => Ok(Some(self.create_pg_type_table())),
230243
PG_CATALOG_TABLE_PG_AM => Ok(Some(self.create_pg_am_table())),
231244
PG_CATALOG_TABLE_PG_CLASS => {
232-
let table = Arc::new(PgClassTable::new(self.catalog_list.clone()));
245+
let table = Arc::new(PgClassTable::new(
246+
self.catalog_list.clone(),
247+
self.oid_counter.clone(),
248+
self.oid_cache.clone(),
249+
));
233250
Ok(Some(Arc::new(
234251
StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
235252
)))
236253
}
237254
PG_CATALOG_TABLE_PG_NAMESPACE => {
238-
let table = Arc::new(PgNamespaceTable::new(self.catalog_list.clone()));
255+
let table = Arc::new(PgNamespaceTable::new(
256+
self.catalog_list.clone(),
257+
self.oid_counter.clone(),
258+
self.oid_cache.clone(),
259+
));
239260
Ok(Some(Arc::new(
240261
StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
241262
)))
@@ -266,7 +287,11 @@ impl SchemaProvider for PgCatalogSchemaProvider {
266287

267288
impl PgCatalogSchemaProvider {
268289
pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> PgCatalogSchemaProvider {
269-
Self { catalog_list }
290+
Self {
291+
catalog_list,
292+
oid_counter: Arc::new(AtomicU32::new(0)),
293+
oid_cache: Arc::new(RwLock::new(HashMap::new())),
294+
}
270295
}
271296

272297
/// Create a populated pg_type table with standard PostgreSQL data types
@@ -1033,14 +1058,20 @@ impl PgProcData {
10331058
}
10341059
}
10351060

1036-
#[derive(Debug)]
1061+
#[derive(Debug, Clone)]
10371062
struct PgClassTable {
10381063
schema: SchemaRef,
10391064
catalog_list: Arc<dyn CatalogProviderList>,
1065+
oid_counter: Arc<AtomicU32>,
1066+
oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
10401067
}
10411068

10421069
impl PgClassTable {
1043-
fn new(catalog_list: Arc<dyn CatalogProviderList>) -> PgClassTable {
1070+
fn new(
1071+
catalog_list: Arc<dyn CatalogProviderList>,
1072+
oid_counter: Arc<AtomicU32>,
1073+
oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
1074+
) -> PgClassTable {
10441075
// Define the schema for pg_class
10451076
// This matches key columns from PostgreSQL's pg_class
10461077
let schema = Arc::new(Schema::new(vec![
@@ -1079,14 +1110,13 @@ impl PgClassTable {
10791110
Self {
10801111
schema,
10811112
catalog_list,
1113+
oid_counter,
1114+
oid_cache,
10821115
}
10831116
}
10841117

10851118
/// Generate record batches based on the current state of the catalog
1086-
async fn get_data(
1087-
schema: SchemaRef,
1088-
catalog_list: Arc<dyn CatalogProviderList>,
1089-
) -> Result<RecordBatch> {
1119+
async fn get_data(this: PgClassTable) -> Result<RecordBatch> {
10901120
// Vectors to store column data
10911121
let mut oids = Vec::new();
10921122
let mut relnames = Vec::new();
@@ -1119,24 +1149,37 @@ impl PgClassTable {
11191149
let mut relfrozenxids = Vec::new();
11201150
let mut relminmxids = Vec::new();
11211151

1122-
// Start OID counter (this is simplistic and would need to be more robust in practice)
1123-
let mut next_oid = 10000;
1152+
let mut oid_cache = this.oid_cache.write().await;
1153+
// Every time when call pg_catalog we generate a new cache and drop the
1154+
// original one in case that schemas or tables were dropped.
1155+
let mut swap_cache = HashMap::new();
11241156

11251157
// Iterate through all catalogs and schemas
1126-
for catalog_name in catalog_list.catalog_names() {
1127-
if let Some(catalog) = catalog_list.catalog(&catalog_name) {
1158+
for catalog_name in this.catalog_list.catalog_names() {
1159+
if let Some(catalog) = this.catalog_list.catalog(&catalog_name) {
11281160
for schema_name in catalog.schema_names() {
11291161
if let Some(schema) = catalog.schema(&schema_name) {
1130-
let schema_oid = next_oid;
1131-
next_oid += 1;
1162+
let cache_key = OidCacheKey::Schema(schema_name.clone());
1163+
let schema_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1164+
*oid
1165+
} else {
1166+
this.oid_counter.fetch_add(1, Ordering::Relaxed)
1167+
};
1168+
swap_cache.insert(cache_key, schema_oid);
11321169

11331170
// Add an entry for the schema itself (as a namespace)
11341171
// (In a full implementation, this would go in pg_namespace)
11351172

11361173
// Now process all tables in this schema
11371174
for table_name in schema.table_names() {
1138-
let table_oid = next_oid;
1139-
next_oid += 1;
1175+
let cache_key =
1176+
OidCacheKey::Table(schema_name.clone(), table_name.clone());
1177+
let table_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1178+
*oid
1179+
} else {
1180+
this.oid_counter.fetch_add(1, Ordering::Relaxed)
1181+
};
1182+
swap_cache.insert(cache_key, table_oid);
11401183

11411184
if let Some(table) = schema.table(&table_name).await? {
11421185
// Determine the correct table type based on the table provider and context
@@ -1147,14 +1190,14 @@ impl PgClassTable {
11471190
let column_count = table.schema().fields().len() as i16;
11481191

11491192
// Add table entry
1150-
oids.push(table_oid);
1193+
oids.push(table_oid as i32);
11511194
relnames.push(table_name.clone());
1152-
relnamespaces.push(schema_oid);
1195+
relnamespaces.push(schema_oid as i32);
11531196
reltypes.push(0); // Simplified: we're not tracking data types
11541197
reloftypes.push(None);
11551198
relowners.push(0); // Simplified: no owner tracking
11561199
relams.push(0); // Default access method
1157-
relfilenodes.push(table_oid); // Use OID as filenode
1200+
relfilenodes.push(table_oid as i32); // Use OID as filenode
11581201
reltablespaces.push(0); // Default tablespace
11591202
relpages.push(1); // Default page count
11601203
reltuples.push(0.0); // No row count stats
@@ -1184,6 +1227,8 @@ impl PgClassTable {
11841227
}
11851228
}
11861229

1230+
*oid_cache = swap_cache;
1231+
11871232
// Create Arrow arrays from the collected data
11881233
let arrays: Vec<ArrayRef> = vec![
11891234
Arc::new(Int32Array::from(oids)),
@@ -1219,7 +1264,7 @@ impl PgClassTable {
12191264
];
12201265

12211266
// Create a record batch
1222-
let batch = RecordBatch::try_new(schema.clone(), arrays)?;
1267+
let batch = RecordBatch::try_new(this.schema.clone(), arrays)?;
12231268

12241269
Ok(batch)
12251270
}
@@ -1231,23 +1276,28 @@ impl PartitionStream for PgClassTable {
12311276
}
12321277

12331278
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1234-
let catalog_list = self.catalog_list.clone();
1235-
let schema = Arc::clone(&self.schema);
1279+
let this = self.clone();
12361280
Box::pin(RecordBatchStreamAdapter::new(
1237-
schema.clone(),
1238-
futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
1281+
this.schema.clone(),
1282+
futures::stream::once(async move { PgClassTable::get_data(this).await }),
12391283
))
12401284
}
12411285
}
12421286

1243-
#[derive(Debug)]
1287+
#[derive(Debug, Clone)]
12441288
struct PgNamespaceTable {
12451289
schema: SchemaRef,
12461290
catalog_list: Arc<dyn CatalogProviderList>,
1291+
oid_counter: Arc<AtomicU32>,
1292+
oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
12471293
}
12481294

12491295
impl PgNamespaceTable {
1250-
pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
1296+
pub fn new(
1297+
catalog_list: Arc<dyn CatalogProviderList>,
1298+
oid_counter: Arc<AtomicU32>,
1299+
oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
1300+
) -> Self {
12511301
// Define the schema for pg_namespace
12521302
// This matches the columns from PostgreSQL's pg_namespace
12531303
let schema = Arc::new(Schema::new(vec![
@@ -1261,62 +1311,38 @@ impl PgNamespaceTable {
12611311
Self {
12621312
schema,
12631313
catalog_list,
1314+
oid_counter,
1315+
oid_cache,
12641316
}
12651317
}
12661318

12671319
/// Generate record batches based on the current state of the catalog
1268-
async fn get_data(
1269-
schema: SchemaRef,
1270-
catalog_list: Arc<dyn CatalogProviderList>,
1271-
) -> Result<RecordBatch> {
1320+
async fn get_data(this: PgNamespaceTable) -> Result<RecordBatch> {
12721321
// Vectors to store column data
12731322
let mut oids = Vec::new();
12741323
let mut nspnames = Vec::new();
12751324
let mut nspowners = Vec::new();
12761325
let mut nspacls: Vec<Option<String>> = Vec::new();
12771326
let mut options: Vec<Option<String>> = Vec::new();
12781327

1279-
// Start OID counter (should be consistent with the values used in pg_class)
1280-
let mut next_oid = 10000;
1328+
// to store all schema-oid mapping temporarily before adding to global oid cache
1329+
let mut schema_oid_cache = HashMap::new();
12811330

1282-
// Add standard PostgreSQL system schemas
1283-
// pg_catalog schema (OID 11)
1284-
oids.push(11);
1285-
nspnames.push("pg_catalog".to_string());
1286-
nspowners.push(10); // Default superuser
1287-
nspacls.push(None);
1288-
options.push(None);
1289-
1290-
// public schema (OID 2200)
1291-
oids.push(2200);
1292-
nspnames.push("public".to_string());
1293-
nspowners.push(10); // Default superuser
1294-
nspacls.push(None);
1295-
options.push(None);
1296-
1297-
// information_schema (OID 12)
1298-
oids.push(12);
1299-
nspnames.push("information_schema".to_string());
1300-
nspowners.push(10); // Default superuser
1301-
nspacls.push(None);
1302-
options.push(None);
1331+
let mut oid_cache = this.oid_cache.write().await;
13031332

13041333
// Now add all schemas from DataFusion catalogs
1305-
for catalog_name in catalog_list.catalog_names() {
1306-
if let Some(catalog) = catalog_list.catalog(&catalog_name) {
1334+
for catalog_name in this.catalog_list.catalog_names() {
1335+
if let Some(catalog) = this.catalog_list.catalog(&catalog_name) {
13071336
for schema_name in catalog.schema_names() {
1308-
// Skip schemas we've already added as system schemas
1309-
if schema_name == "pg_catalog"
1310-
|| schema_name == "public"
1311-
|| schema_name == "information_schema"
1312-
{
1313-
continue;
1314-
}
1315-
1316-
let schema_oid = next_oid;
1317-
next_oid += 1;
1318-
1319-
oids.push(schema_oid);
1337+
let cache_key = OidCacheKey::Schema(schema_name.clone());
1338+
let schema_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1339+
*oid
1340+
} else {
1341+
this.oid_counter.fetch_add(1, Ordering::Relaxed)
1342+
};
1343+
schema_oid_cache.insert(cache_key, schema_oid);
1344+
1345+
oids.push(schema_oid as i32);
13201346
nspnames.push(schema_name.clone());
13211347
nspowners.push(10); // Default owner
13221348
nspacls.push(None);
@@ -1325,6 +1351,16 @@ impl PgNamespaceTable {
13251351
}
13261352
}
13271353

1354+
// remove all schema cache and table of the schema which is no longer exists
1355+
oid_cache.retain(|key, _| match key {
1356+
OidCacheKey::Schema(_) => false,
1357+
OidCacheKey::Table(schema_name, _) => {
1358+
schema_oid_cache.contains_key(&OidCacheKey::Schema(schema_name.clone()))
1359+
}
1360+
});
1361+
// add new schema cache
1362+
oid_cache.extend(schema_oid_cache);
1363+
13281364
// Create Arrow arrays from the collected data
13291365
let arrays: Vec<ArrayRef> = vec![
13301366
Arc::new(Int32Array::from(oids)),
@@ -1335,7 +1371,7 @@ impl PgNamespaceTable {
13351371
];
13361372

13371373
// Create a full record batch
1338-
let batch = RecordBatch::try_new(schema.clone(), arrays)?;
1374+
let batch = RecordBatch::try_new(this.schema.clone(), arrays)?;
13391375

13401376
Ok(batch)
13411377
}
@@ -1347,11 +1383,10 @@ impl PartitionStream for PgNamespaceTable {
13471383
}
13481384

13491385
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1350-
let catalog_list = self.catalog_list.clone();
1351-
let schema = Arc::clone(&self.schema);
1386+
let this = self.clone();
13521387
Box::pin(RecordBatchStreamAdapter::new(
1353-
schema.clone(),
1354-
futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
1388+
this.schema.clone(),
1389+
futures::stream::once(async move { Self::get_data(this).await }),
13551390
))
13561391
}
13571392
}

0 commit comments

Comments
 (0)