Skip to content

Commit af148a4

Browse files
committed
feat: implement oid cache for catalog/database
1 parent 5e6a050 commit af148a4

File tree

1 file changed

+84
-34
lines changed

1 file changed

+84
-34
lines changed

datafusion-postgres/src/pg_catalog.rs

Lines changed: 84 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,10 @@ impl PgTypesData {
214214

215215
#[derive(Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
216216
enum OidCacheKey {
217-
Schema(String),
217+
Catalog(String),
218+
Schema(String, String),
218219
/// Table by schema and table name
219-
Table(String, String),
220+
Table(String, String, String),
220221
}
221222

222223
// Create custom schema provider for pg_catalog
@@ -262,7 +263,11 @@ impl SchemaProvider for PgCatalogSchemaProvider {
262263
)))
263264
}
264265
PG_CATALOG_TABLE_PG_DATABASE => {
265-
let table = Arc::new(PgDatabaseTable::new(self.catalog_list.clone()));
266+
let table = Arc::new(PgDatabaseTable::new(
267+
self.catalog_list.clone(),
268+
self.oid_counter.clone(),
269+
self.oid_cache.clone(),
270+
));
266271
Ok(Some(Arc::new(
267272
StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
268273
)))
@@ -1156,10 +1161,19 @@ impl PgClassTable {
11561161

11571162
// Iterate through all catalogs and schemas
11581163
for catalog_name in this.catalog_list.catalog_names() {
1164+
let cache_key = OidCacheKey::Catalog(catalog_name.clone());
1165+
let catalog_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1166+
*oid
1167+
} else {
1168+
this.oid_counter.fetch_add(1, Ordering::Relaxed)
1169+
};
1170+
swap_cache.insert(cache_key, catalog_oid);
1171+
11591172
if let Some(catalog) = this.catalog_list.catalog(&catalog_name) {
11601173
for schema_name in catalog.schema_names() {
11611174
if let Some(schema) = catalog.schema(&schema_name) {
1162-
let cache_key = OidCacheKey::Schema(schema_name.clone());
1175+
let cache_key =
1176+
OidCacheKey::Schema(catalog_name.clone(), schema_name.clone());
11631177
let schema_oid = if let Some(oid) = oid_cache.get(&cache_key) {
11641178
*oid
11651179
} else {
@@ -1172,8 +1186,11 @@ impl PgClassTable {
11721186

11731187
// Now process all tables in this schema
11741188
for table_name in schema.table_names() {
1175-
let cache_key =
1176-
OidCacheKey::Table(schema_name.clone(), table_name.clone());
1189+
let cache_key = OidCacheKey::Table(
1190+
catalog_name.clone(),
1191+
schema_name.clone(),
1192+
table_name.clone(),
1193+
);
11771194
let table_oid = if let Some(oid) = oid_cache.get(&cache_key) {
11781195
*oid
11791196
} else {
@@ -1334,7 +1351,7 @@ impl PgNamespaceTable {
13341351
for catalog_name in this.catalog_list.catalog_names() {
13351352
if let Some(catalog) = this.catalog_list.catalog(&catalog_name) {
13361353
for schema_name in catalog.schema_names() {
1337-
let cache_key = OidCacheKey::Schema(schema_name.clone());
1354+
let cache_key = OidCacheKey::Schema(catalog_name.clone(), schema_name.clone());
13381355
let schema_oid = if let Some(oid) = oid_cache.get(&cache_key) {
13391356
*oid
13401357
} else {
@@ -1353,10 +1370,10 @@ impl PgNamespaceTable {
13531370

13541371
// remove all schema cache and table of the schema which is no longer exists
13551372
oid_cache.retain(|key, _| match key {
1356-
OidCacheKey::Schema(_) => false,
1357-
OidCacheKey::Table(schema_name, _) => {
1358-
schema_oid_cache.contains_key(&OidCacheKey::Schema(schema_name.clone()))
1359-
}
1373+
OidCacheKey::Catalog(..) => true,
1374+
OidCacheKey::Schema(..) => false,
1375+
OidCacheKey::Table(catalog, schema_name, _) => schema_oid_cache
1376+
.contains_key(&OidCacheKey::Schema(catalog.clone(), schema_name.clone())),
13601377
});
13611378
// add new schema cache
13621379
oid_cache.extend(schema_oid_cache);
@@ -1391,14 +1408,20 @@ impl PartitionStream for PgNamespaceTable {
13911408
}
13921409
}
13931410

1394-
#[derive(Debug)]
1411+
#[derive(Debug, Clone)]
13951412
struct PgDatabaseTable {
13961413
schema: SchemaRef,
13971414
catalog_list: Arc<dyn CatalogProviderList>,
1415+
oid_counter: Arc<AtomicU32>,
1416+
oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
13981417
}
13991418

14001419
impl PgDatabaseTable {
1401-
pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
1420+
pub fn new(
1421+
catalog_list: Arc<dyn CatalogProviderList>,
1422+
oid_counter: Arc<AtomicU32>,
1423+
oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
1424+
) -> Self {
14021425
// Define the schema for pg_database
14031426
// This matches PostgreSQL's pg_database table columns
14041427
let schema = Arc::new(Schema::new(vec![
@@ -1421,14 +1444,13 @@ impl PgDatabaseTable {
14211444
Self {
14221445
schema,
14231446
catalog_list,
1447+
oid_counter,
1448+
oid_cache,
14241449
}
14251450
}
14261451

14271452
/// Generate record batches based on the current state of the catalog
1428-
async fn get_data(
1429-
schema: SchemaRef,
1430-
catalog_list: Arc<dyn CatalogProviderList>,
1431-
) -> Result<RecordBatch> {
1453+
async fn get_data(this: PgDatabaseTable) -> Result<RecordBatch> {
14321454
// Vectors to store column data
14331455
let mut oids = Vec::new();
14341456
let mut datnames = Vec::new();
@@ -1445,15 +1467,22 @@ impl PgDatabaseTable {
14451467
let mut dattablespaces = Vec::new();
14461468
let mut datacles: Vec<Option<String>> = Vec::new();
14471469

1448-
// Start OID counter (this is simplistic and would need to be more robust in practice)
1449-
let mut next_oid = 16384; // Standard PostgreSQL starting OID for user databases
1470+
// to store all schema-oid mapping temporarily before adding to global oid cache
1471+
let mut catalog_oid_cache = HashMap::new();
14501472

1451-
// Add a record for each catalog (treating catalogs as "databases")
1452-
for catalog_name in catalog_list.catalog_names() {
1453-
let oid = next_oid;
1454-
next_oid += 1;
1473+
let mut oid_cache = this.oid_cache.write().await;
14551474

1456-
oids.push(oid);
1475+
// Add a record for each catalog (treating catalogs as "databases")
1476+
for catalog_name in this.catalog_list.catalog_names() {
1477+
let cache_key = OidCacheKey::Catalog(catalog_name.clone());
1478+
let catalog_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1479+
*oid
1480+
} else {
1481+
this.oid_counter.fetch_add(1, Ordering::Relaxed)
1482+
};
1483+
catalog_oid_cache.insert(cache_key, catalog_oid);
1484+
1485+
oids.push(catalog_oid as i32);
14571486
datnames.push(catalog_name.clone());
14581487
datdbas.push(10); // Default owner (assuming 10 = postgres user)
14591488
encodings.push(6); // 6 = UTF8 in PostgreSQL
@@ -1471,11 +1500,18 @@ impl PgDatabaseTable {
14711500

14721501
// Always include a "postgres" database entry if not already present
14731502
// (This is for compatibility with tools that expect it)
1474-
if !datnames.contains(&"postgres".to_string()) {
1475-
let oid = next_oid;
1476-
1477-
oids.push(oid);
1478-
datnames.push("postgres".to_string());
1503+
let default_datname = "postgres".to_string();
1504+
if !datnames.contains(&default_datname) {
1505+
let cache_key = OidCacheKey::Catalog(default_datname.clone());
1506+
let catalog_oid = if let Some(oid) = oid_cache.get(&cache_key) {
1507+
*oid
1508+
} else {
1509+
this.oid_counter.fetch_add(1, Ordering::Relaxed)
1510+
};
1511+
catalog_oid_cache.insert(cache_key, catalog_oid);
1512+
1513+
oids.push(catalog_oid as i32);
1514+
datnames.push(default_datname);
14791515
datdbas.push(10);
14801516
encodings.push(6);
14811517
datcollates.push("en_US.UTF-8".to_string());
@@ -1509,7 +1545,22 @@ impl PgDatabaseTable {
15091545
];
15101546

15111547
// Create a full record batch
1512-
let full_batch = RecordBatch::try_new(schema.clone(), arrays)?;
1548+
let full_batch = RecordBatch::try_new(this.schema.clone(), arrays)?;
1549+
1550+
// update cache
1551+
// remove all schema cache and table of the schema which is no longer exists
1552+
oid_cache.retain(|key, _| match key {
1553+
OidCacheKey::Catalog(..) => false,
1554+
OidCacheKey::Schema(catalog, ..) => {
1555+
catalog_oid_cache.contains_key(&OidCacheKey::Catalog(catalog.clone()))
1556+
}
1557+
OidCacheKey::Table(catalog, ..) => {
1558+
catalog_oid_cache.contains_key(&OidCacheKey::Catalog(catalog.clone()))
1559+
}
1560+
});
1561+
// add new schema cache
1562+
oid_cache.extend(catalog_oid_cache);
1563+
15131564
Ok(full_batch)
15141565
}
15151566
}
@@ -1520,11 +1571,10 @@ impl PartitionStream for PgDatabaseTable {
15201571
}
15211572

15221573
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1523-
let catalog_list = self.catalog_list.clone();
1524-
let schema = Arc::clone(&self.schema);
1574+
let this = self.clone();
15251575
Box::pin(RecordBatchStreamAdapter::new(
1526-
schema.clone(),
1527-
futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
1576+
this.schema.clone(),
1577+
futures::stream::once(async move { Self::get_data(this).await }),
15281578
))
15291579
}
15301580
}

0 commit comments

Comments
 (0)