Skip to content

Commit e19b1fe

Browse files
treff7eshsheth2
andauthored
fix(ingest/bigquery): Adding way to change api's batch size on schema init (#10255)
Co-authored-by: Harshal Sheth <[email protected]>
1 parent 3d94388 commit e19b1fe

File tree

5 files changed

+53
-5
lines changed

5 files changed

+53
-5
lines changed

metadata-ingestion/src/datahub/ingestion/graph/client.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1092,7 +1092,11 @@ def _make_schema_resolver(
10921092
)
10931093

10941094
def initialize_schema_resolver_from_datahub(
1095-
self, platform: str, platform_instance: Optional[str], env: str
1095+
self,
1096+
platform: str,
1097+
platform_instance: Optional[str],
1098+
env: str,
1099+
batch_size: int = 100,
10961100
) -> "SchemaResolver":
10971101
logger.info("Initializing schema resolver")
10981102
schema_resolver = self._make_schema_resolver(
@@ -1106,6 +1110,7 @@ def initialize_schema_resolver_from_datahub(
11061110
platform=platform,
11071111
platform_instance=platform_instance,
11081112
env=env,
1113+
batch_size=batch_size,
11091114
):
11101115
try:
11111116
schema_resolver.add_graphql_schema_metadata(urn, schema_info)

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,7 @@ def _init_schema_resolver(self) -> SchemaResolver:
489489
platform=self.platform,
490490
platform_instance=self.config.platform_instance,
491491
env=self.config.env,
492+
batch_size=self.config.schema_resolution_batch_size,
492493
)
493494
else:
494495
logger.warning(
@@ -1367,6 +1368,22 @@ def get_core_table_details(
13671368
table=table.table_id,
13681369
)
13691370

1371+
if table.table_type == "VIEW":
1372+
if (
1373+
not self.config.include_views
1374+
or not self.config.view_pattern.allowed(
1375+
table_identifier.raw_table_name()
1376+
)
1377+
):
1378+
self.report.report_dropped(table_identifier.raw_table_name())
1379+
continue
1380+
else:
1381+
if not self.config.table_pattern.allowed(
1382+
table_identifier.raw_table_name()
1383+
):
1384+
self.report.report_dropped(table_identifier.raw_table_name())
1385+
continue
1386+
13701387
_, shard = BigqueryTableIdentifier.get_table_and_shard(
13711388
table_identifier.table
13721389
)
@@ -1403,6 +1420,7 @@ def get_core_table_details(
14031420
continue
14041421

14051422
table_items[table.table_id] = table
1423+
14061424
# Adding maximum shards to the list of tables
14071425
table_items.update({value.table_id: value for value in sharded_tables.values()})
14081426

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,12 @@ class BigQueryV2Config(
280280
description="Option to exclude empty projects from being ingested.",
281281
)
282282

283+
schema_resolution_batch_size: int = Field(
284+
default=100,
285+
description="The number of tables to process in a batch when resolving schema from DataHub.",
286+
hidden_from_schema=True,
287+
)
288+
283289
@root_validator(skip_on_failure=True)
284290
def profile_default_settings(cls, values: Dict) -> Dict:
285291
# Extra default SQLAlchemy option for better connection pooling and threading.

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ def generate_partition_profiler_query(
159159
def get_workunits(
160160
self, project_id: str, tables: Dict[str, List[BigqueryTable]]
161161
) -> Iterable[MetadataWorkUnit]:
162-
profile_requests = []
162+
profile_requests: List[TableProfilerRequest] = []
163163

164164
for dataset in tables:
165165
for table in tables[dataset]:
@@ -174,10 +174,17 @@ def get_workunits(
174174
)
175175

176176
# Emit the profile work unit
177+
logger.debug(
178+
f"Creating profile request for table {normalized_table_name}"
179+
)
177180
profile_request = self.get_profile_request(table, dataset, project_id)
178181
if profile_request is not None:
179182
self.report.report_entity_profiled(profile_request.pretty_name)
180183
profile_requests.append(profile_request)
184+
else:
185+
logger.debug(
186+
f"Table {normalized_table_name} was not eliagible for profiling."
187+
)
181188

182189
if len(profile_requests) == 0:
183190
return

metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ def get_profile_request(
158158
size_in_bytes=table.size_in_bytes,
159159
rows_count=table.rows_count,
160160
):
161+
logger.debug(
162+
f"Dataset {dataset_name} was not eliagable for profiling due to last_altered, size in bytes or count of rows limit"
163+
)
161164
# Profile only table level if dataset is filtered from profiling
162165
# due to size limits alone
163166
if self.is_dataset_eligible_for_profiling(
@@ -245,6 +248,9 @@ def is_dataset_eligible_for_profiling(
245248
)
246249

247250
if not self.config.table_pattern.allowed(dataset_name):
251+
logger.debug(
252+
f"Table {dataset_name} is not allowed for profiling due to table pattern"
253+
)
248254
return False
249255

250256
last_profiled: Optional[int] = None
@@ -267,14 +273,14 @@ def is_dataset_eligible_for_profiling(
267273
self.config.profiling.profile_if_updated_since_days
268274
)
269275

270-
if not self.config.profile_pattern.allowed(dataset_name):
271-
return False
272-
273276
schema_name = dataset_name.rsplit(".", 1)[0]
274277
if (threshold_time is not None) and (
275278
last_altered is not None and last_altered < threshold_time
276279
):
277280
self.report.profiling_skipped_not_updated[schema_name] += 1
281+
logger.debug(
282+
f"Table {dataset_name} was skipped because it was not updated recently enough"
283+
)
278284
return False
279285

280286
if self.config.profiling.profile_table_size_limit is not None and (
@@ -283,13 +289,19 @@ def is_dataset_eligible_for_profiling(
283289
> self.config.profiling.profile_table_size_limit
284290
):
285291
self.report.profiling_skipped_size_limit[schema_name] += 1
292+
logger.debug(
293+
f"Table {dataset_name} is not allowed for profiling due to size limit"
294+
)
286295
return False
287296

288297
if self.config.profiling.profile_table_row_limit is not None and (
289298
rows_count is not None
290299
and rows_count > self.config.profiling.profile_table_row_limit
291300
):
292301
self.report.profiling_skipped_row_limit[schema_name] += 1
302+
logger.debug(
303+
f"Table {dataset_name} is not allowed for profiling due to row limit"
304+
)
293305
return False
294306

295307
return True

0 commit comments

Comments
 (0)