Skip to content

Commit 4d71aec

Browse files
authored
chore: make all CatalogInfo methods async (#163)
1 parent f2b3671 commit 4d71aec

File tree

6 files changed

+35
-21
lines changed

6 files changed

+35
-21
lines changed

datafusion-postgres/src/pg_catalog/catalog_info.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@ use datafusion::{
1010
/// Define the interface for retrieve catalog data for pg_catalog tables
1111
#[async_trait]
1212
pub trait CatalogInfo: Clone + Send + Sync + Debug + 'static {
13-
fn catalog_names(&self) -> Result<Vec<String>, DataFusionError>;
13+
async fn catalog_names(&self) -> Result<Vec<String>, DataFusionError>;
1414

15-
fn schema_names(&self, catalog_name: &str) -> Result<Option<Vec<String>>, DataFusionError>;
15+
async fn schema_names(
16+
&self,
17+
catalog_name: &str,
18+
) -> Result<Option<Vec<String>>, DataFusionError>;
1619

17-
fn table_names(
20+
async fn table_names(
1821
&self,
1922
catalog_name: &str,
2023
schema_name: &str,
@@ -37,15 +40,18 @@ pub trait CatalogInfo: Clone + Send + Sync + Debug + 'static {
3740

3841
#[async_trait]
3942
impl CatalogInfo for Arc<dyn CatalogProviderList> {
40-
fn catalog_names(&self) -> Result<Vec<String>, DataFusionError> {
43+
async fn catalog_names(&self) -> Result<Vec<String>, DataFusionError> {
4144
Ok(CatalogProviderList::catalog_names(self.as_ref()))
4245
}
4346

44-
fn schema_names(&self, catalog_name: &str) -> Result<Option<Vec<String>>, DataFusionError> {
47+
async fn schema_names(
48+
&self,
49+
catalog_name: &str,
50+
) -> Result<Option<Vec<String>>, DataFusionError> {
4551
Ok(self.catalog(catalog_name).map(|c| c.schema_names()))
4652
}
4753

48-
fn table_names(
54+
async fn table_names(
4955
&self,
5056
catalog_name: &str,
5157
schema_name: &str,

datafusion-postgres/src/pg_catalog/pg_attribute.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,13 @@ impl<C: CatalogInfo> PgAttributeTable<C> {
105105
// original one in case that schemas or tables were dropped.
106106
let mut swap_cache = HashMap::new();
107107

108-
for catalog_name in this.catalog_list.catalog_names()? {
109-
if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name)? {
108+
for catalog_name in this.catalog_list.catalog_names().await? {
109+
if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name).await? {
110110
for schema_name in schema_names {
111-
if let Some(table_names) =
112-
this.catalog_list.table_names(&catalog_name, &schema_name)?
111+
if let Some(table_names) = this
112+
.catalog_list
113+
.table_names(&catalog_name, &schema_name)
114+
.await?
113115
{
114116
// Process all tables in this schema
115117
for table_name in table_names {

datafusion-postgres/src/pg_catalog/pg_class.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ impl<C: CatalogInfo> PgClassTable<C> {
117117
let mut swap_cache = HashMap::new();
118118

119119
// Iterate through all catalogs and schemas
120-
for catalog_name in this.catalog_list.catalog_names()? {
120+
for catalog_name in this.catalog_list.catalog_names().await? {
121121
let cache_key = OidCacheKey::Catalog(catalog_name.clone());
122122
let catalog_oid = if let Some(oid) = oid_cache.get(&cache_key) {
123123
*oid
@@ -126,7 +126,7 @@ impl<C: CatalogInfo> PgClassTable<C> {
126126
};
127127
swap_cache.insert(cache_key, catalog_oid);
128128

129-
if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name)? {
129+
if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name).await? {
130130
for schema_name in schema_names {
131131
let cache_key = OidCacheKey::Schema(catalog_name.clone(), schema_name.clone());
132132
let schema_oid = if let Some(oid) = oid_cache.get(&cache_key) {
@@ -140,8 +140,10 @@ impl<C: CatalogInfo> PgClassTable<C> {
140140
// (In a full implementation, this would go in pg_namespace)
141141

142142
// Now process all tables in this schema
143-
if let Some(table_names) =
144-
this.catalog_list.table_names(&catalog_name, &schema_name)?
143+
if let Some(table_names) = this
144+
.catalog_list
145+
.table_names(&catalog_name, &schema_name)
146+
.await?
145147
{
146148
for table_name in table_names {
147149
let cache_key = OidCacheKey::Table(

datafusion-postgres/src/pg_catalog/pg_database.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ impl<C: CatalogInfo> PgDatabaseTable<C> {
8080
let mut oid_cache = this.oid_cache.write().await;
8181

8282
// Add a record for each catalog (treating catalogs as "databases")
83-
for catalog_name in this.catalog_list.catalog_names()? {
83+
for catalog_name in this.catalog_list.catalog_names().await? {
8484
let cache_key = OidCacheKey::Catalog(catalog_name.clone());
8585
let catalog_oid = if let Some(oid) = oid_cache.get(&cache_key) {
8686
*oid

datafusion-postgres/src/pg_catalog/pg_namespace.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ impl<C: CatalogInfo> PgNamespaceTable<C> {
6262
let mut oid_cache = this.oid_cache.write().await;
6363

6464
// Now add all schemas from DataFusion catalogs
65-
for catalog_name in this.catalog_list.catalog_names()? {
66-
if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name)? {
65+
for catalog_name in this.catalog_list.catalog_names().await? {
66+
if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name).await? {
6767
for schema_name in schema_names {
6868
let cache_key = OidCacheKey::Schema(catalog_name.clone(), schema_name.clone());
6969
let schema_oid = if let Some(oid) = oid_cache.get(&cache_key) {

datafusion-postgres/src/pg_catalog/pg_tables.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,15 @@ impl<C: CatalogInfo> PgTablesTable<C> {
4949
let mut row_security = Vec::new();
5050

5151
// Iterate through all catalogs and schemas
52-
for catalog_name in this.catalog_list.catalog_names()? {
53-
if let Some(catalog_schema_names) = this.catalog_list.schema_names(&catalog_name)? {
52+
for catalog_name in this.catalog_list.catalog_names().await? {
53+
if let Some(catalog_schema_names) =
54+
this.catalog_list.schema_names(&catalog_name).await?
55+
{
5456
for schema_name in catalog_schema_names {
55-
if let Some(catalog_table_names) =
56-
this.catalog_list.table_names(&catalog_name, &schema_name)?
57+
if let Some(catalog_table_names) = this
58+
.catalog_list
59+
.table_names(&catalog_name, &schema_name)
60+
.await?
5761
{
5862
// Now process all tables in this schema
5963
for table_name in catalog_table_names {

0 commit comments

Comments
 (0)