Skip to content
This repository was archived by the owner on Sep 2, 2025. It is now read-only.

Commit b5336ae

Browse files
committed
feat(incremental): copy multiple tables in parallel (#1237)
1 parent 0995665 commit b5336ae

File tree

4 files changed

+48
-23
lines changed

4 files changed

+48
-23
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Features
2+
body: copy tables and partitions in parallel
3+
time: 2024-11-26T00:02:41.54479+01:00
4+
custom:
5+
Author: AxelThevenot
6+
Issue: "1237"

dbt/adapters/bigquery/connections.py

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -402,17 +402,14 @@ def standard_to_legacy(table):
402402
_, iterator = self.raw_execute(sql, use_legacy_sql=True)
403403
return self.get_table_from_response(iterator)
404404

405-
def copy_bq_table(self, source, destination, write_disposition) -> None:
405+
def copy_bq_table(self, source, destination, write_disposition, partition_ids=None) -> None:
406406
conn = self.get_thread_connection()
407407
client: Client = conn.handle
408408

409409
# -------------------------------------------------------------------------------
410-
# BigQuery allows to use copy API using two different formats:
411-
# 1. client.copy_table(source_table_id, destination_table_id)
412-
# where source_table_id = "your-project.source_dataset.source_table"
413-
# 2. client.copy_table(source_table_ids, destination_table_id)
414-
# where source_table_ids = ["your-project.your_dataset.your_table_name", ...]
415-
# Let's use uniform function call and always pass list there
410+
# BigQuery allows to use copy API on the same table in parallel
411+
# so each source (and if partition of each source if given) is copied
412+
# into the destination table in parallel.
416413
# -------------------------------------------------------------------------------
417414
if type(source) is not list:
418415
source = [source]
@@ -436,14 +433,32 @@ def copy_bq_table(self, source, destination, write_disposition) -> None:
436433
", ".join(source_ref.path for source_ref in source_ref_array),
437434
destination_ref.path,
438435
)
436+
439437
with self.exception_handler(msg):
440-
copy_job = client.copy_table(
441-
source_ref_array,
442-
destination_ref,
443-
job_config=CopyJobConfig(write_disposition=write_disposition),
444-
retry=self._retry.create_reopen_with_deadline(conn),
445-
)
446-
copy_job.result(timeout=self._retry.create_job_execution_timeout(fallback=300))
438+
439+
copy_jobs = []
440+
441+
# Runs all the copy jobs in parallel
442+
for source_ref in source_ref_array:
443+
444+
for partition_id in partition_ids or [None]:
445+
source_ref_partition = (
446+
f"{source_ref}${partition_id}" if partition_id else source_ref
447+
)
448+
destination_ref_partition = (
449+
f"{destination_ref}${partition_id}" if partition_id else destination_ref
450+
)
451+
copy_job = client.copy_table(
452+
source_ref_partition,
453+
destination_ref_partition,
454+
job_config=CopyJobConfig(write_disposition=write_disposition),
455+
retry=self._retry.create_reopen_with_deadline(conn),
456+
)
457+
copy_jobs.append(copy_job)
458+
459+
# Waits for the jobs to finish
460+
for copy_job in copy_jobs:
461+
copy_job.result(timeout=self._retry.create_job_execution_timeout(fallback=300))
447462

448463
def write_dataframe_to_table(
449464
self,

dbt/adapters/bigquery/impl.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ def _agate_to_schema(
409409
return bq_schema
410410

411411
@available.parse(lambda *a, **k: "")
412-
def copy_table(self, source, destination, materialization):
412+
def copy_table(self, source, destination, materialization, partition_ids=None):
413413
if materialization == "incremental":
414414
write_disposition = WRITE_APPEND
415415
elif materialization == "table":
@@ -421,7 +421,7 @@ def copy_table(self, source, destination, materialization):
421421
f"{materialization}"
422422
)
423423

424-
self.connections.copy_bq_table(source, destination, write_disposition)
424+
self.connections.copy_bq_table(source, destination, write_disposition, partition_ids)
425425

426426
return "COPY TABLE with materialization: {}".format(materialization)
427427

dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,27 @@
1818

1919
{% macro bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %}
2020

21+
{% set partition_ids = [] %}
22+
2123
{% for partition in partitions %}
2224
{% if partition_by.data_type == 'int64' %}
2325
{% set partition = partition | as_text %}
2426
{% elif partition_by.granularity == 'hour' %}
25-
{% set partition = partition.strftime("%Y%m%d%H") %}
27+
{% set partition = partition.strftime('%Y%m%d%H') %}
2628
{% elif partition_by.granularity == 'day' %}
27-
{% set partition = partition.strftime("%Y%m%d") %}
29+
{% set partition = partition.strftime('%Y%m%d') %}
2830
{% elif partition_by.granularity == 'month' %}
29-
{% set partition = partition.strftime("%Y%m") %}
31+
{% set partition = partition.strftime('%Y%m') %}
3032
{% elif partition_by.granularity == 'year' %}
31-
{% set partition = partition.strftime("%Y") %}
33+
{% set partition = partition.strftime('%Y') %}
3234
{% endif %}
33-
{% set tmp_relation_partitioned = api.Relation.create(database=tmp_relation.database, schema=tmp_relation.schema, identifier=tmp_relation.table ~ '$' ~ partition, type=tmp_relation.type) %}
34-
{% set target_relation_partitioned = api.Relation.create(database=target_relation.database, schema=target_relation.schema, identifier=target_relation.table ~ '$' ~ partition, type=target_relation.type) %}
35-
{% do adapter.copy_table(tmp_relation_partitioned, target_relation_partitioned, "table") %}
35+
36+
{% do partition_ids.append(partition) %}
37+
3638
{% endfor %}
3739

40+
{% do adapter.copy_table(tmp_relation, target_relation, 'table', partition_ids) %}
41+
3842
{% endmacro %}
3943

4044
{% macro bq_insert_overwrite_sql(

0 commit comments

Comments
 (0)