Skip to content
This repository was archived by the owner on Sep 2, 2025. It is now read-only.

Commit 9614b1c

Browse files
committed
Add an option to use INFORMATION_SCHEMA for partition info retrieval
1 parent eb58c7d commit 9614b1c

File tree

9 files changed

+518
-181
lines changed

9 files changed

+518
-181
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Features
2+
body: Add an option to use INFORMATION_SCHEMA for partition info retrieval
3+
time: 2023-08-07T23:55:39.31409+02:00
4+
custom:
5+
Author: Kayrnt
6+
Issue: "867"

dbt/adapters/bigquery/connections.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -599,14 +599,11 @@ def _bq_job_link(location, project_id, job_id) -> str:
599599
return f"https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults"
600600

601601
def get_partitions_metadata(self, table):
602-
def standard_to_legacy(table):
603-
return table.project + ":" + table.dataset + "." + table.identifier
602+
query_sql = f"SELECT * FROM `{table.project}.{table.dataset}.INFORMATION_SCHEMA.PARTITIONS` WHERE TABLE_NAME = '{table.identifier}'"
604603

605-
legacy_sql = "SELECT * FROM [" + standard_to_legacy(table) + "$__PARTITIONS_SUMMARY__]"
606-
607-
sql = self._add_query_comment(legacy_sql)
604+
sql = self._add_query_comment(query_sql)
608605
# auto_begin is ignored on bigquery, and only included for consistency
609-
_, iterator = self.raw_execute(sql, use_legacy_sql=True)
606+
_, iterator = self.raw_execute(sql)
610607
return self.get_table_from_response(iterator)
611608

612609
def copy_bq_table(self, source, destination, write_disposition):

dbt/adapters/bigquery/impl.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class PartitionConfig(dbtClassMixin):
7878
range: Optional[Dict[str, Any]] = None
7979
time_ingestion_partitioning: bool = False
8080
copy_partitions: bool = False
81+
partition_information: str = "model"
8182

8283
PARTITION_DATE = "_PARTITIONDATE"
8384
PARTITION_TIME = "_PARTITIONTIME"

dbt/include/bigquery/macros/etc.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@
66
{% do adapter.grant_access_to(entity, entity_type, role, grant_target_dict) %}
77
{% endmacro %}
88

9+
{#
10+
This macro returns the partition metadata for provided table.
11+
The expected input is a table object (ie through a `source` or `ref`).
12+
The output contains the result from partitions information for your input table.
13+
The details of the retrieved columns can be found on https://cloud.google.com/bigquery/docs/managing-partitioned-tables
14+
It will leverage the INFORMATION_SCHEMA.PARTITIONS table.
15+
#}
916
{%- macro get_partitions_metadata(table) -%}
1017
{%- if execute -%}
1118
{%- set res = adapter.get_partitions_metadata(table) -%}

dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,68 @@
22

33
{#-- TODO: revisit partitioning with python models --#}
44
{%- if '_dbt_max_partition' in compiled_code and language == 'sql' -%}
5+
{%- if partition_by.partition_information == "information_schema" -%}
6+
{{ dbt_max_partition_from_information_schema_data_sql(relation, partition_by) }}
7+
{%- else -%}
8+
{{ dbt_max_partition_from_model_data_sql(relation, partition_by) }}
9+
{%- endif -%}
510

6-
declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default (
7-
select max({{ partition_by.field }}) from {{ this }}
11+
{%- endif -%}
12+
13+
{% endmacro %}
14+
15+
{% macro dbt_max_partition_from_model_data_sql(relation, partition_by) %}
16+
declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default (
17+
select max({{ partition_by.field }}) from {{ relation }}
818
where {{ partition_by.field }} is not null
9-
);
19+
);
20+
{% endmacro %}
21+
22+
{% macro max_partition_wrapper(field) %}
23+
MAX({{ field }}) AS max_partition
24+
{% endmacro %}
25+
26+
{% macro array_distinct_partition_wrapper(field) %}
27+
as struct
28+
-- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null
29+
array_agg(distinct {{ field }} IGNORE NULLS)
30+
{% endmacro %}
31+
32+
{% macro dbt_max_partition_from_information_schema_data_sql(relation, partition_by) %}
33+
declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default (
34+
{{ partition_from_information_schema_data_sql(relation, partition_by, max_partition_wrapper) }}
35+
);
36+
{% endmacro %}
37+
38+
{% macro partition_from_model_data_sql(relation, partition_by, field_function) %}
39+
select {{ field_function(partition_by.render_wrapped()) }}
40+
from {{ relation }}
41+
{% endmacro %}
42+
43+
{% macro partition_from_information_schema_data_sql(relation, partition_by, field_function) %}
1044

45+
{%- set data_type = partition_by.data_type -%}
46+
{%- set granularity = partition_by.granularity -%}
47+
48+
{# Format partition_id to match the declared variable type #}
49+
{%- if data_type | lower in ('date', 'timestamp', 'datetime') -%}
50+
{# Datetime using time partitioning require timestamp #}
51+
{%- if partition_by.time_ingestion_partitioning and partition_by.data_type == 'datetime' -%}
52+
{%- set data_type = 'timestamp' -%}
53+
{%- endif -%}
54+
{%- if granularity == "day" -%}
55+
{%- set format = "%Y%m%d" -%}
56+
{%- else -%}
57+
{%- set format = "%Y%m%d%H" -%}
58+
{%- endif -%}
59+
{%- set field = "parse_" ~ data_type ~ "('" ~ format ~ "', partition_id)" -%}
60+
{%- else -%}
61+
{%- set field = "CAST(partition_id AS INT64)" -%}
1162
{%- endif -%}
1263

64+
SELECT {{ field_function(field) }}
65+
FROM `{{relation.project}}.{{relation.dataset}}.INFORMATION_SCHEMA.PARTITIONS`
66+
WHERE TABLE_NAME = '{{relation.identifier}}'
67+
AND NOT(STARTS_WITH(partition_id, "__"))
68+
1369
{% endmacro %}

dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,7 @@
107107
{%- endcall %}
108108
{%- endif -%}
109109
{%- set partitions_sql -%}
110-
select distinct {{ partition_by.render_wrapped() }}
111-
from {{ tmp_relation }}
110+
{{ bq_dynamic_copy_partitions_affected_partitions_sql(tmp_relation, partition_by) }}
112111
{%- endset -%}
113112
{%- set partitions = run_query(partitions_sql).columns[0].values() -%}
114113
{# We copy the partitions #}
@@ -117,6 +116,19 @@
117116
drop table if exists {{ tmp_relation }}
118117
{% endmacro %}
119118

119+
{% macro distinct_partition_wrapper(field) %}
120+
distinct {{ field }} AS partition_ids
121+
{% endmacro %}
122+
123+
{% macro bq_dynamic_copy_partitions_affected_partitions_sql(tmp_relation, partition_by) %}
124+
{% if partition_by.partition_information == "information_schema" %}
125+
{{ partition_from_information_schema_data_sql(tmp_relation, partition_by, distinct_partition_wrapper) }}
126+
{% else %}
127+
select distinct {{ partition_by.render_wrapped() }}
128+
from {{ tmp_relation }}
129+
{% endif %}
130+
{% endmacro %}
131+
120132
{% macro bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) %}
121133
{%- if copy_partitions is true %}
122134
{{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }}
@@ -149,10 +161,12 @@
149161

150162
-- 2. define partitions to update
151163
set (dbt_partitions_for_replacement) = (
152-
select as struct
153-
-- IGNORE NULLS: this needs to be aligned to _dbt_max_partition, which ignores null
154-
array_agg(distinct {{ partition_field }} IGNORE NULLS)
155-
from {{ tmp_relation }}
164+
{%- if partition_by.partition_information == "information_schema" -%}
165+
{{ partition_from_information_schema_data_sql(tmp_relation, partition_by, array_distinct_partition_wrapper) }}
166+
{%- else -%}
167+
{# TODO fix datetime case to render_wrapped with timestamp #}
168+
{{ partition_from_model_data_sql(tmp_relation, partition_by, array_distinct_partition_wrapper) }}
169+
{%- endif -%}
156170
);
157171

158172
-- 3. run the merge statement

0 commit comments

Comments
 (0)