-
Notifications
You must be signed in to change notification settings - Fork 176
feat(incremental): copy multiple tables in parallel (#1237) #1413
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| kind: Features | ||
| body: copy tables and partitions in parallel | ||
| time: 2024-11-26T00:02:41.54479+01:00 | ||
| custom: | ||
| Author: AxelThevenot | ||
| Issue: "1237" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -409,7 +409,7 @@ def _agate_to_schema( | |
| return bq_schema | ||
|
|
||
| @available.parse(lambda *a, **k: "") | ||
| def copy_table(self, source, destination, materialization): | ||
| def copy_table(self, source, destination, materialization, partition_ids=None): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dbt internal convo: double check no issues with base adapter (don't think so but we have to check) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @VersusFacit |
||
| if materialization == "incremental": | ||
| write_disposition = WRITE_APPEND | ||
| elif materialization == "table": | ||
|
|
@@ -421,7 +421,7 @@ def copy_table(self, source, destination, materialization): | |
| f"{materialization}" | ||
| ) | ||
|
|
||
| self.connections.copy_bq_table(source, destination, write_disposition) | ||
| self.connections.copy_bq_table(source, destination, write_disposition, partition_ids) | ||
|
|
||
| return "COPY TABLE with materialization: {}".format(materialization) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,23 +18,27 @@ | |
|
|
||
| {% macro bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %} | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. internal dbt convo: plan to leave the the macro itself for the user interface (no breaking changes) but we figured this control flow can (and should) be pushed down into Python |
||
|
|
||
| {% set partition_ids = [] %} | ||
|
|
||
| {% for partition in partitions %} | ||
| {% if partition_by.data_type == 'int64' %} | ||
| {% set partition = partition | as_text %} | ||
| {% elif partition_by.granularity == 'hour' %} | ||
| {% set partition = partition.strftime("%Y%m%d%H") %} | ||
| {% set partition = partition.strftime('%Y%m%d%H') %} | ||
| {% elif partition_by.granularity == 'day' %} | ||
| {% set partition = partition.strftime("%Y%m%d") %} | ||
| {% set partition = partition.strftime('%Y%m%d') %} | ||
| {% elif partition_by.granularity == 'month' %} | ||
| {% set partition = partition.strftime("%Y%m") %} | ||
| {% set partition = partition.strftime('%Y%m') %} | ||
| {% elif partition_by.granularity == 'year' %} | ||
| {% set partition = partition.strftime("%Y") %} | ||
| {% set partition = partition.strftime('%Y') %} | ||
| {% endif %} | ||
| {% set tmp_relation_partitioned = api.Relation.create(database=tmp_relation.database, schema=tmp_relation.schema, identifier=tmp_relation.table ~ '$' ~ partition, type=tmp_relation.type) %} | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note to self: partition fqns now handled in Python (see copy_bq_table) |
||
| {% set target_relation_partitioned = api.Relation.create(database=target_relation.database, schema=target_relation.schema, identifier=target_relation.table ~ '$' ~ partition, type=target_relation.type) %} | ||
| {% do adapter.copy_table(tmp_relation_partitioned, target_relation_partitioned, "table") %} | ||
|
|
||
| {% do partition_ids.append(partition) %} | ||
|
|
||
| {% endfor %} | ||
|
|
||
| {% do adapter.copy_table(tmp_relation, target_relation, 'table', partition_ids) %} | ||
|
|
||
| {% endmacro %} | ||
|
|
||
| {% macro bq_insert_overwrite_sql( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.