Skip to content
This repository was archived by the owner on Sep 2, 2025. It is now read-only.

Commit 3510f76

Browse files
nickozilladbeatty10colin-rogers-dbtmikealfare
authored
Swap dataproc batch_id declaration to model config (#804)
* swap batch_id declaration to model config * address changie req, fix python submission * Update bug that is being resolved * implement dbeatty's suggestion * Update .changes/unreleased/Fixes-20230630-092618.yaml Co-authored-by: Doug Beatty <[email protected]> * Add 2 tests --------- Co-authored-by: Doug Beatty <[email protected]> Co-authored-by: colin-rogers-dbt <[email protected]> Co-authored-by: Mike Alfare <[email protected]>
1 parent dc56130 commit 3510f76

File tree

3 files changed

+98
-0
lines changed

3 files changed

+98
-0
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Features
2+
body: Change batch_id to model override
3+
time: 2023-06-30T09:26:18.854492+01:00
4+
custom:
5+
Author: nickozilla
6+
Issue: "671"

dbt/adapters/bigquery/python_submissions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,17 @@ def _get_job_client(self) -> dataproc_v1.BatchControllerClient:
115115
client_options=self.client_options, credentials=self.GoogleCredentials
116116
)
117117

118+
def _get_batch_id(self) -> str:
119+
return self.parsed_model["config"].get("batch_id")
120+
118121
def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
119122
batch = self._configure_batch()
120123
parent = f"projects/{self.credential.execution_project}/locations/{self.credential.dataproc_region}"
121124

122125
request = dataproc_v1.CreateBatchRequest(
123126
parent=parent,
124127
batch=batch,
128+
batch_id=self._get_batch_id(),
125129
)
126130
# make the request
127131
operation = self.job_client.create_batch(request=request) # type: ignore

tests/functional/adapter/test_python_model.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
import pytest
3+
import time
34
from dbt.tests.util import run_dbt, run_dbt_and_capture, write_file
45
import dbt.tests.adapter.python_model.test_python_model as dbt_tests
56

@@ -64,6 +65,93 @@ def model(dbt, spark):
6465
return spark.createDataFrame(data, schema=['test1', 'test3'])
6566
"""
6667

68+
models__python_array_batch_id_python = """
69+
import pandas
70+
71+
def model(dbt, spark):
72+
random_array = [
73+
[9001.3985362160208, -157.9871329592354],
74+
[-817.8786101352823, -528.9769041860632],
75+
[-886.6488625065194, 941.0504221837489],
76+
[6.69525238666165, 919.5903586746183],
77+
[754.3718741592056, -121.25678519054622],
78+
[-352.3158889341157, 254.9985130814921],
79+
[563.0633042715097, 833.2963094260072],
80+
]
81+
82+
df = pd.DataFrame(random_array, columns=["A", "B"])
83+
84+
df["C"] = df["A"] * df["B"]
85+
86+
final_df = df[["A", "B", "C"]]
87+
88+
return final_df
89+
"""
90+
91+
models__python_array_batch_id_yaml = """
92+
models:
93+
- name: python_array_batch_id
94+
description: A random table with a calculated column defined in python.
95+
config:
96+
batch_id: '{{ run_started_at.strftime("%Y-%m-%d-%H-%M-%S") }}-python-array'
97+
columns:
98+
- name: A
99+
description: Column A
100+
- name: B
101+
description: Column B
102+
- name: C
103+
description: Column C
104+
"""
105+
106+
custom_ts_id = str("custom-" + str(time.time()).replace(".", "-"))
107+
108+
models__bad_python_array_batch_id_yaml = f"""
109+
models:
110+
- name: python_array_batch_id
111+
description: A random table with a calculated column defined in python.
112+
config:
113+
batch_id: {custom_ts_id}-python-array
114+
columns:
115+
- name: A
116+
description: Column A
117+
- name: B
118+
description: Column B
119+
- name: C
120+
description: Column C
121+
"""
122+
123+
124+
class TestPythonBatchIdModels:
125+
@pytest.fixture(scope="class")
126+
def models(self):
127+
return {
128+
"python_array_batch_id.py": models__python_array_batch_id_python,
129+
"python_array_batch_id.yml": models__python_array_batch_id_yaml,
130+
}
131+
132+
def test_multiple_named_python_models(self, project):
133+
result, output = run_dbt_and_capture(["run"], expect_pass=True)
134+
time.sleep(5) # In case both runs are submitted simultaneously
135+
result_two, output_two = run_dbt_and_capture(["run"], expect_pass=True)
136+
assert len(result) == 1
137+
assert len(result_two) == 1
138+
139+
140+
class TestPythonDuplicateBatchIdModels:
141+
@pytest.fixture(scope="class")
142+
def models(self):
143+
return {
144+
"python_array_batch_id.py": models__python_array_batch_id_python,
145+
"python_array_batch_id.yml": models__bad_python_array_batch_id_yaml,
146+
}
147+
148+
def test_multiple_python_models_fixed_id(self, project):
149+
result, output = run_dbt_and_capture(["run"], expect_pass=True)
150+
result_two, output_two = run_dbt_and_capture(["run"], expect_pass=False)
151+
assert result_two[0].message.startswith("409 Already exists: Failed to create batch:")
152+
assert len(result) == 1
153+
assert len(result_two) == 1
154+
67155

68156
@pytest.mark.skip(reason=TEST_SKIP_MESSAGE)
69157
class TestChangingSchemaDataproc:

0 commit comments

Comments
 (0)