Skip to content

Commit 1e86f83

Browse files
committed
Add depend strategy for reflections
1 parent eaf6ae8 commit 1e86f83

File tree

7 files changed

+98
-12
lines changed

7 files changed

+98
-12
lines changed

dbt/adapters/dremio/api/rest/client.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ def delete_tags(self, dataset_id: str, version: str):
207207

208208

209209
def get_reflections(self, dataset_id):
210-
url = UrlBuilder.get_reflection_url(self._parameters, dataset_id)
210+
url = UrlBuilder.get_reflections_from_dataset_url(self._parameters, dataset_id)
211211
return _get(
212212
url,
213213
self._parameters.authentication.get_headers(),
@@ -231,4 +231,11 @@ def update_reflection(self, reflection_id, payload):
231231
json=payload,
232232
ssl_verify=self._parameters.authentication.verify_ssl,
233233
)
234-
234+
235+
def get_reflection(self, reflection_id):
236+
url = UrlBuilder.get_reflection_url(self._parameters, reflection_id)
237+
return _get(
238+
url,
239+
self._parameters.authentication.get_headers(),
240+
ssl_verify=self._parameters.authentication.verify_ssl,
241+
)

dbt/adapters/dremio/api/rest/url_builder.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ def update_reflection_url(cls, parameters: Parameters, dataset_id):
185185
return url_path + endpoint
186186

187187
@classmethod
188-
def get_reflection_url(cls, parameters: Parameters, dataset_id):
188+
def get_reflections_from_dataset_url(cls, parameters: Parameters, dataset_id):
189189
url_path = parameters.base_url
190190
if type(parameters) is CloudParameters:
191191
url_path += UrlBuilder.CLOUD_DATASET_ENDPOINT.format(
@@ -196,3 +196,16 @@ def get_reflection_url(cls, parameters: Parameters, dataset_id):
196196

197197
endpoint = "/{}/reflection".format(dataset_id)
198198
return url_path + endpoint
199+
200+
@classmethod
201+
def get_reflection_url(cls, parameters: Parameters, reflection_id):
202+
url_path = parameters.base_url
203+
if type(parameters) is CloudParameters:
204+
url_path += UrlBuilder.CLOUD_REFLECTIONS_ENDPOINT.format(
205+
parameters.cloud_project_id
206+
)
207+
else:
208+
url_path += UrlBuilder.SOFTWARE_REFLECTIONS_ENDPOINT
209+
210+
endpoint = "/{}".format(reflection_id)
211+
return url_path + endpoint

dbt/adapters/dremio/connections.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ def create_reflection(self, name: str, reflection_type: str, anchor: DremioRelat
326326
date_dimensions: List[str], measures: List[str],
327327
computations: List[str], partition_by: List[str], partition_transform: List[str],
328328
partition_method: str, distribute_by: List[str], localsort_by: List[str],
329-
arrow_cache: bool) -> None:
329+
arrow_cache: bool, reflection_strategy: str, max_wait_time: int, check_interval: int) -> None:
330330
thread_connection = self.get_thread_connection()
331331
connection = self.open(thread_connection)
332332
rest_client = connection.handle.get_client()
@@ -357,13 +357,41 @@ def create_reflection(self, name: str, reflection_type: str, anchor: DremioRelat
357357
if reflection.get("name") == name:
358358
logger.debug(f"Reflection {name} already exists. Updating it")
359359
payload["tag"] = reflection.get("tag")
360-
rest_client.update_reflection(reflection.get("id"), payload)
360+
created_reflection = rest_client.update_reflection(reflection.get("id"), payload)
361361
updated = True
362362
break
363363

364364
if not updated:
365365
logger.debug(f"Reflection {name} does not exist. Creating it")
366-
rest_client.create_reflection(payload)
366+
created_reflection = rest_client.create_reflection(payload)
367+
368+
if reflection_strategy == "wait":
369+
reflection_id = created_reflection["id"]
370+
start_time = time.time()
371+
end_time = start_time + max_wait_time
372+
373+
while time.time() < end_time:
374+
reflection_info = rest_client.get_reflection(reflection_id)
375+
status = reflection_info["status"]["availability"]
376+
377+
if status == "AVAILABLE":
378+
return
379+
380+
time.sleep(check_interval)
381+
382+
logger.info(f"Reflection {name} did not become available within {max_wait_time} seconds, skipping wait")
383+
384+
elif reflection_strategy == "depend":
385+
reflection_id = created_reflection["id"]
386+
387+
while True:
388+
reflection_info = rest_client.get_reflection(reflection_id)
389+
status = reflection_info["status"]["availability"]
390+
391+
if status == "AVAILABLE":
392+
return
393+
394+
time.sleep(check_interval)
367395

368396
def _make_new_space_json(self, name) -> json:
369397
python_dict = {"entityType": "space", "name": name}

dbt/adapters/dremio/impl.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,10 @@ def process_tags(self, relation: DremioRelation, tags: list[str]) -> None:
207207
def create_reflection(self, name: str, type: str, anchor: DremioRelation, display: List[str], dimensions: List[str],
208208
date_dimensions: List[str], measures: List[str], computations: List[str],
209209
partition_by: List[str], partition_transform: List[str], partition_method: str,
210-
distribute_by: List[str], localsort_by: List[str], arrow_cache: bool) -> None:
210+
distribute_by: List[str], localsort_by: List[str], arrow_cache: bool, reflection_strategy: str, max_wait_time: int, check_interval: int) -> None:
211211
self.connections.create_reflection(name, type, anchor, display, dimensions, date_dimensions, measures,
212212
computations, partition_by, partition_transform, partition_method,
213-
distribute_by, localsort_by, arrow_cache)
213+
distribute_by, localsort_by, arrow_cache, reflection_strategy, max_wait_time, check_interval)
214214

215215

216216
COLUMNS_EQUAL_SQL = """

dbt/include/dremio/macros/materializations/reflection/create_reflection.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ USING target
3838
#}
3939

4040
{%- macro create_reflection(reflection_name, reflection_type, anchor,
41-
display=none, dimensions=none, date_dimensions=none, measures=none, computations=none, partition_by=none, partition_transform=none, partition_method=none, distribute_by=none, localsort_by=none, arrow_cache=false) %}
41+
display=none, dimensions=none, date_dimensions=none, measures=none, computations=none, partition_by=none, partition_transform=none, partition_method=none, distribute_by=none, localsort_by=none, arrow_cache=false, reflection_strategy=reflection_strategy, max_wait_time=max_wait_time, check_interval=check_interval) %}
4242

4343
{%- if reflection_type == 'raw' %}
4444
{% set reflection_type = 'RAW' %}
@@ -48,7 +48,7 @@ USING target
4848
{% do exceptions.CompilationError("invalid reflection type") %}
4949
{%- endif -%}
5050

51-
{% do adapter.create_reflection(reflection_name, reflection_type, anchor, display, dimensions, date_dimensions, measures, computations, partition_by, partition_transform, partition_method, distribute_by, localsort_by, arrow_cache) %}
51+
{% do adapter.create_reflection(reflection_name, reflection_type, anchor, display, dimensions, date_dimensions, measures, computations, partition_by, partition_transform, partition_method, distribute_by, localsort_by, arrow_cache, reflection_strategy, max_wait_time, check_interval) %}
5252

5353
SELECT 1
5454
{% endmacro -%}

dbt/include/dremio/macros/materializations/reflection/reflection.sql

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ See the License for the specific language governing permissions and
1313
limitations under the License.*/
1414

1515
{% materialization reflection, adapter='dremio' %}
16+
{%- set reflection_strategy = config.get('reflection_strategy', validator=validation.any[basestring]) or 'trigger' -%}
17+
18+
{% if reflection_strategy not in ['depend', 'trigger'] %}
19+
{% do exceptions.CompilationError("Invalid reflection strategy. Valid strategies are 'trigger', 'wait' or 'depend'") %}
20+
{%- endif -%}
1621

1722
{% set reflection_name = config.get('name', validator=validation.any[basetring]) or config.get('alias', validator=validation.any[basetring]) or model.name %}
1823
{% set raw_reflection_type = config.get('reflection_type', validator=validation.any[basestring]) or 'raw' %}
@@ -30,6 +35,8 @@ limitations under the License.*/
3035
{%- set distribute_by = config.get('distribute_by', validator=validation.any[basestring])-%}
3136
{%- set localsort_by = config.get('localsort_by', validator=validation.any[basestring]) -%}
3237
{%- set arrow_cache = config.get('arrow_cache') -%}
38+
{%- set max_wait_time = config.get('max_wait_time', validator=validation.any[int]) or 30 -%}
39+
{%- set check_interval = config.get('check_interval', validator=validation.any[int]) or 5 -%}
3340

3441
{% set relation = this %}
3542

@@ -115,7 +122,7 @@ limitations under the License.*/
115122
-- build model
116123
{% call statement('main') -%}
117124
{{ create_reflection(reflection_name, reflection_type, anchor,
118-
display=display, dimensions=dimensions, date_dimensions=date_dimensions, measures=measures, computations=computations, partition_by=partition_by, partition_transform=partition_transform, partition_method=partition_method, distribute_by=distribute_by, localsort_by=localsort_by, arrow_cache=arrow_cache) }}
125+
display=display, dimensions=dimensions, date_dimensions=date_dimensions, measures=measures, computations=computations, partition_by=partition_by, partition_transform=partition_transform, partition_method=partition_method, distribute_by=distribute_by, localsort_by=localsort_by, arrow_cache=arrow_cache, reflection_strategy=reflection_strategy, max_wait_time=max_wait_time, check_interval=check_interval) }}
119126
{%- endcall %}
120127

121128
{{ run_hooks(post_hooks) }}

tests/functional/adapter/dremio_specific/test_reflections.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
from dbt.adapters.dremio import DremioCredentials
44
from dbt.adapters.dremio.api.parameters import ParametersBuilder
55
from dbt.adapters.dremio.api.authentication import DremioAuthentication
6-
from dbt.tests.util import run_dbt
6+
from dbt.tests.util import run_dbt, run_dbt_and_capture
77
from pydantic.experimental.pipeline import transform
88

9+
from tests.utils.util import BUCKET
10+
911
view1_model = """
1012
SELECT IncidntNum, Category, Descript, DayOfWeek, TO_DATE("SF_incidents2016.json"."Date", 'YYYY-MM-DD', 1) AS "Date", "SF_incidents2016.json"."Time" AS "Time", PdDistrict, Resolution, Address, X, Y, Location, PdId
1113
FROM Samples."samples.dremio.com"."SF_incidents2016.json" AS "SF_incidents2016.json";
@@ -141,6 +143,25 @@
141143
-- depends_on: {{ ref('view1') }}
142144
"""
143145

146+
depend_strategy_timeout_reflection = """
147+
{{ config(alias='This will mock a timeout when using depend',
148+
materialized='reflection',
149+
reflection_strategy='depend',
150+
max_wait_time=1,
151+
display=['Date', 'DayOfWeek', 'PdDistrict', 'Category'],
152+
reflection_type='raw')}}
153+
-- depends_on: {{ ref('view1') }}
154+
"""
155+
156+
trigger_strategy_timeout_reflection = """
157+
{{ config(alias='This will mock a timeout when using trigger',
158+
materialized='reflection',
159+
reflection_strategy='trigger',
160+
max_wait_time=1,
161+
display=['Date', 'DayOfWeek', 'PdDistrict', 'Category'],
162+
reflection_type='raw')}}
163+
-- depends_on: {{ ref('view1') }}
164+
"""
144165

145166
class TestReflectionsDremio:
146167
@pytest.fixture(scope="class")
@@ -170,6 +191,8 @@ def models(self):
170191
"default_displays_model.sql": default_displays_model,
171192
"name_reflection_from_alias.sql": name_reflection_from_alias_model,
172193
"name_reflection_from_filename.sql": name_reflection_from_filename_model,
194+
"depend_strategy_timeout_reflection.sql": depend_strategy_timeout_reflection,
195+
"trigger_strategy_timeout_reflection.sql": trigger_strategy_timeout_reflection,
173196
}
174197

175198
def _create_path_list(self, database, schema):
@@ -407,3 +430,11 @@ def testNameReflectionFromFilename(self, project, client):
407430
assert "partitionFields" not in reflection
408431
assert "sortFields" not in reflection
409432
assert reflection["partitionDistributionStrategy"] == "STRIPED"
433+
434+
def testDependStrategyTimeoutReflection(self, project):
435+
(results, log_output) = run_dbt_and_capture(["run", "--select", "view1", "depend_strategy_timeout_reflection"])
436+
assert "did not become available within 1 seconds, skipping wait" in log_output
437+
438+
def testTriggerStrategyTimeoutReflection(self, project):
439+
(results, log_output) = run_dbt_and_capture(["run", "--select", "view1", "trigger_strategy_timeout_reflection"])
440+
assert "did not become available within 1 seconds, skipping wait" not in log_output

0 commit comments

Comments
 (0)