Skip to content

Commit 287a292

Browse files
authored
fix(ingest/redshift): Making Redshift source more verbose (#8109)
1 parent 764e053 commit 287a292

File tree

3 files changed

+65
-9
lines changed

3 files changed

+65
-9
lines changed

metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ def process_schemas(self, connection, database):
432432
for schema in RedshiftDataDictionary.get_schemas(
433433
conn=connection, database=database
434434
):
435-
logger.info(f"Schema: {database}.{schema.name}")
435+
logger.info(f"Processing schema: {database}.{schema.name}")
436436
if not self.config.schema_pattern.allowed(schema.name):
437437
self.report.report_dropped(f"{database}.{schema.name}")
438438
continue
@@ -478,11 +478,11 @@ def process_schema(
478478
)
479479

480480
if self.config.include_tables:
481-
logger.info("process tables")
482-
if not self.db_tables[schema.database]:
483-
return
484-
485-
if schema.name in self.db_tables[schema.database]:
481+
logger.info(f"Process tables in schema {database}.{schema.name}")
482+
if (
483+
self.db_tables[schema.database]
484+
and schema.name in self.db_tables[schema.database]
485+
):
486486
for table in self.db_tables[schema.database][schema.name]:
487487
table.columns = schema_columns[schema.name].get(table.name, [])
488488
yield from self._process_table(table, database=database)
@@ -492,10 +492,22 @@ def process_schema(
492492
)
493493
+ 1
494494
)
495+
logger.debug(
496+
f"Table processed: {schema.database}.{schema.name}.{table.name}"
497+
)
498+
else:
499+
logger.info(
500+
f"No tables in cache for {schema.database}.{schema.name}, skipping"
501+
)
502+
else:
503+
logger.info("Table processing disabled, skipping")
495504

496505
if self.config.include_views:
497-
logger.info("process views")
498-
if schema.name in self.db_views[schema.database]:
506+
logger.info(f"Process views in schema {schema.database}.{schema.name}")
507+
if (
508+
self.db_views[schema.database]
509+
and schema.name in self.db_views[schema.database]
510+
):
499511
for view in self.db_views[schema.database][schema.name]:
500512
view.columns = schema_columns[schema.name].get(view.name, [])
501513
yield from self._process_view(
@@ -508,6 +520,15 @@ def process_schema(
508520
)
509521
+ 1
510522
)
523+
logger.debug(
524+
f"Table processed: {schema.database}.{schema.name}.{view.name}"
525+
)
526+
else:
527+
logger.info(
528+
f"No views in cache for {schema.database}.{schema.name}, skipping"
529+
)
530+
else:
531+
logger.info("View processing disabled, skipping")
511532

512533
self.report.metadata_extraction_sec[report_key] = round(
513534
timer.elapsed_seconds(), 2
@@ -752,20 +773,47 @@ def cache_tables_and_views(self, connection, database):
752773
tables, views = RedshiftDataDictionary.get_tables_and_views(conn=connection)
753774
for schema in tables:
754775
if self.config.schema_pattern.allowed(f"{database}.{schema}"):
776+
logging.debug(
777+
f"Not caching tables for schema {database}.{schema} which is not allowed by schema_pattern"
778+
)
755779
self.db_tables[database][schema] = []
756780
for table in tables[schema]:
757781
if self.config.table_pattern.allowed(
758782
f"{database}.{schema}.{table.name}"
759783
):
760784
self.db_tables[database][schema].append(table)
785+
self.report.table_cached[f"{database}.{schema}"] = (
786+
self.report.table_cached.get(f"{database}.{schema}", 0) + 1
787+
)
788+
else:
789+
logging.debug(
790+
f"Table {database}.{schema}.{table.name} is filtered by table_pattern"
791+
)
792+
self.report.table_filtered[f"{database}.{schema}"] = (
793+
self.report.table_filtered.get(f"{database}.{schema}", 0)
794+
+ 1
795+
)
761796
for schema in views:
797+
logging.debug(
798+
f"Not caching views for schema {database}.{schema} which is not allowed by schema_pattern"
799+
)
762800
if self.config.schema_pattern.allowed(f"{database}.{schema}"):
763801
self.db_views[database][schema] = []
764802
for view in views[schema]:
765803
if self.config.view_pattern.allowed(
766804
f"{database}.{schema}.{view.name}"
767805
):
768806
self.db_views[database][schema].append(view)
807+
self.report.view_cached[f"{database}.{schema}"] = (
808+
self.report.view_cached.get(f"{database}.{schema}", 0) + 1
809+
)
810+
else:
811+
logging.debug(
812+
f"View {database}.{schema}.{table.name} is filtered by view_pattern"
813+
)
814+
self.report.view_filtered[f"{database}.{schema}"] = (
815+
self.report.view_filtered.get(f"{database}.{schema}", 0) + 1
816+
)
769817

770818
def get_all_tables(
771819
self,

metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ def get_tables_and_views(
170170
cur = RedshiftDataDictionary.get_query_result(conn, RedshiftQuery.list_tables)
171171
field_names = [i[0] for i in cur.description]
172172
db_tables = cur.fetchall()
173-
173+
logger.info(f"Fetched {len(db_tables)} tables/views from Redshift")
174174
for table in db_tables:
175175
schema = table[field_names.index("schema")]
176176
if table[field_names.index("tabletype")] not in [
@@ -241,6 +241,10 @@ def get_tables_and_views(
241241
comment=table[field_names.index("table_description")],
242242
)
243243
)
244+
for schema_key, schema_tables in tables.items():
245+
logger.info(f"In schema: {schema_key} discovered {len(tables)} tables")
246+
for schema_key, schema_views in views.items():
247+
logger.info(f"In schema: {schema_key} discovered {len(views)} views")
244248

245249
return tables, views
246250

metadata-ingestion/src/datahub/ingestion/source/redshift/report.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@ class RedshiftReport(ProfilingSqlReport):
1414
usage_extraction_sec: Dict[str, float] = field(default_factory=TopKDict)
1515
lineage_extraction_sec: Dict[str, float] = field(default_factory=TopKDict)
1616
table_processed: TopKDict[str, int] = field(default_factory=TopKDict)
17+
table_filtered: TopKDict[str, int] = field(default_factory=TopKDict)
18+
view_filtered: TopKDict[str, int] = field(default_factory=TopKDict)
1719
view_processed: TopKDict[str, int] = field(default_factory=TopKDict)
20+
table_cached: TopKDict[str, int] = field(default_factory=TopKDict)
21+
view_cached: TopKDict[str, int] = field(default_factory=TopKDict)
1822
metadata_extraction_sec: TopKDict[str, float] = field(default_factory=TopKDict)
1923
operational_metadata_extraction_sec: TopKDict[str, float] = field(
2024
default_factory=TopKDict

0 commit comments

Comments
 (0)