Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions dbt/adapters/dremio/api/rest/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def delete_tags(self, dataset_id: str, version: str):


def get_reflections(self, dataset_id):
url = UrlBuilder.get_reflection_url(self._parameters, dataset_id)
url = UrlBuilder.get_reflections_from_dataset_url(self._parameters, dataset_id)
return _get(
url,
self._parameters.authentication.get_headers(),
Expand All @@ -231,4 +231,11 @@ def update_reflection(self, reflection_id, payload):
json=payload,
ssl_verify=self._parameters.authentication.verify_ssl,
)


def get_reflection(self, reflection_id):
url = UrlBuilder.get_reflection_url(self._parameters, reflection_id)
return _get(
url,
self._parameters.authentication.get_headers(),
ssl_verify=self._parameters.authentication.verify_ssl,
)
15 changes: 14 additions & 1 deletion dbt/adapters/dremio/api/rest/url_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def update_reflection_url(cls, parameters: Parameters, dataset_id):
return url_path + endpoint

@classmethod
def get_reflection_url(cls, parameters: Parameters, dataset_id):
def get_reflections_from_dataset_url(cls, parameters: Parameters, dataset_id):
url_path = parameters.base_url
if type(parameters) is CloudParameters:
url_path += UrlBuilder.CLOUD_DATASET_ENDPOINT.format(
Expand All @@ -196,3 +196,16 @@ def get_reflection_url(cls, parameters: Parameters, dataset_id):

endpoint = "/{}/reflection".format(dataset_id)
return url_path + endpoint

@classmethod
def get_reflection_url(cls, parameters: Parameters, reflection_id):
url_path = parameters.base_url
if type(parameters) is CloudParameters:
url_path += UrlBuilder.CLOUD_REFLECTIONS_ENDPOINT.format(
parameters.cloud_project_id
)
else:
url_path += UrlBuilder.SOFTWARE_REFLECTIONS_ENDPOINT

endpoint = "/{}".format(reflection_id)
return url_path + endpoint
34 changes: 31 additions & 3 deletions dbt/adapters/dremio/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def create_reflection(self, name: str, reflection_type: str, anchor: DremioRelat
date_dimensions: List[str], measures: List[str],
computations: List[str], partition_by: List[str], partition_transform: List[str],
partition_method: str, distribute_by: List[str], localsort_by: List[str],
arrow_cache: bool) -> None:
arrow_cache: bool, reflection_strategy: str, max_wait_time: int, check_interval: int) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the connector check for constrains of the values provided here? For instance, a negative or 0 value of check_interval might lead to errors or accidental rate limiting by Dremio. (See DC rate limits here)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be adding a > 0 constraint to this

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we define the default values here or in the template? What do you think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are being defined in the reflection materialization, reflection.sql has the following lines:

 {%- set max_wait_time = config.get('max_wait_time', validator=validation.any[int]) or 30 -%}
 {%- set check_interval = config.get('check_interval', validator=validation.any[int]) or 5 -%}

This is on pair with how we do it in other situations

thread_connection = self.get_thread_connection()
connection = self.open(thread_connection)
rest_client = connection.handle.get_client()
Expand Down Expand Up @@ -357,13 +357,41 @@ def create_reflection(self, name: str, reflection_type: str, anchor: DremioRelat
if reflection.get("name") == name:
logger.debug(f"Reflection {name} already exists. Updating it")
payload["tag"] = reflection.get("tag")
rest_client.update_reflection(reflection.get("id"), payload)
created_reflection = rest_client.update_reflection(reflection.get("id"), payload)
updated = True
break

if not updated:
logger.debug(f"Reflection {name} does not exist. Creating it")
rest_client.create_reflection(payload)
created_reflection = rest_client.create_reflection(payload)

if reflection_strategy == "wait":
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use match pattern here. And put all possible values somewhere in enum

Updated. I saw a validation in the Jinja, let's add match here at least. If/elif seems like a bad design pattern

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly sure if I'm seeing what you're suggesting here?

reflection_id = created_reflection["id"]
start_time = time.time()
end_time = start_time + max_wait_time

while time.time() < end_time:
reflection_info = rest_client.get_reflection(reflection_id)
status = reflection_info["status"]["availability"]

if status == "AVAILABLE":
return

time.sleep(check_interval)

logger.info(f"Reflection {name} did not become available within {max_wait_time} seconds, skipping wait")

elif reflection_strategy == "depend":
reflection_id = created_reflection["id"]

while True:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we in this case give some state of the reflection - how much time left? What is the status? Is everything ok?
Because while true without any status update will be painful for customers.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could show the status -> combinedStatus that is given by the API response to our call. Docs here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could ask the question to our reflections team about it

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reached out to them 👍

reflection_info = rest_client.get_reflection(reflection_id)
status = reflection_info["status"]["availability"]

if status == "AVAILABLE":
return

time.sleep(check_interval)

def _make_new_space_json(self, name) -> json:
python_dict = {"entityType": "space", "name": name}
Expand Down
4 changes: 2 additions & 2 deletions dbt/adapters/dremio/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,10 @@ def process_tags(self, relation: DremioRelation, tags: list[str]) -> None:
def create_reflection(self, name: str, type: str, anchor: DremioRelation, display: List[str], dimensions: List[str],
date_dimensions: List[str], measures: List[str], computations: List[str],
partition_by: List[str], partition_transform: List[str], partition_method: str,
distribute_by: List[str], localsort_by: List[str], arrow_cache: bool) -> None:
distribute_by: List[str], localsort_by: List[str], arrow_cache: bool, reflection_strategy: str, max_wait_time: int, check_interval: int) -> None:
self.connections.create_reflection(name, type, anchor, display, dimensions, date_dimensions, measures,
computations, partition_by, partition_transform, partition_method,
distribute_by, localsort_by, arrow_cache)
distribute_by, localsort_by, arrow_cache, reflection_strategy, max_wait_time, check_interval)


COLUMNS_EQUAL_SQL = """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ USING target
#}

{%- macro create_reflection(reflection_name, reflection_type, anchor,
display=none, dimensions=none, date_dimensions=none, measures=none, computations=none, partition_by=none, partition_transform=none, partition_method=none, distribute_by=none, localsort_by=none, arrow_cache=false) %}
display=none, dimensions=none, date_dimensions=none, measures=none, computations=none, partition_by=none, partition_transform=none, partition_method=none, distribute_by=none, localsort_by=none, arrow_cache=false, reflection_strategy=reflection_strategy, max_wait_time=max_wait_time, check_interval=check_interval) %}

{%- if reflection_type == 'raw' %}
{% set reflection_type = 'RAW' %}
Expand All @@ -48,7 +48,7 @@ USING target
{% do exceptions.CompilationError("invalid reflection type") %}
{%- endif -%}

{% do adapter.create_reflection(reflection_name, reflection_type, anchor, display, dimensions, date_dimensions, measures, computations, partition_by, partition_transform, partition_method, distribute_by, localsort_by, arrow_cache) %}
{% do adapter.create_reflection(reflection_name, reflection_type, anchor, display, dimensions, date_dimensions, measures, computations, partition_by, partition_transform, partition_method, distribute_by, localsort_by, arrow_cache, reflection_strategy, max_wait_time, check_interval) %}

SELECT 1
{% endmacro -%}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ See the License for the specific language governing permissions and
limitations under the License.*/

{% materialization reflection, adapter='dremio' %}
{%- set reflection_strategy = config.get('reflection_strategy', validator=validation.any[basestring]) or 'trigger' -%}

{% if reflection_strategy not in ['trigger', 'wait', 'depend'] %}
{% do exceptions.CompilationError("Invalid reflection strategy. Valid strategies are 'trigger', 'wait' or 'depend'") %}
{%- endif -%}

{% set reflection_name = config.get('name', validator=validation.any[basetring]) or config.get('alias', validator=validation.any[basetring]) or model.name %}
{% set raw_reflection_type = config.get('reflection_type', validator=validation.any[basestring]) or 'raw' %}
Expand All @@ -30,6 +35,8 @@ limitations under the License.*/
{%- set distribute_by = config.get('distribute_by', validator=validation.any[basestring])-%}
{%- set localsort_by = config.get('localsort_by', validator=validation.any[basestring]) -%}
{%- set arrow_cache = config.get('arrow_cache') -%}
{%- set max_wait_time = config.get('max_wait_time', validator=validation.any[int]) or 30 -%}
{%- set check_interval = config.get('check_interval', validator=validation.any[int]) or 5 -%}

{% set relation = this %}

Expand Down Expand Up @@ -115,7 +122,7 @@ limitations under the License.*/
-- build model
{% call statement('main') -%}
{{ create_reflection(reflection_name, reflection_type, anchor,
display=display, dimensions=dimensions, date_dimensions=date_dimensions, measures=measures, computations=computations, partition_by=partition_by, partition_transform=partition_transform, partition_method=partition_method, distribute_by=distribute_by, localsort_by=localsort_by, arrow_cache=arrow_cache) }}
display=display, dimensions=dimensions, date_dimensions=date_dimensions, measures=measures, computations=computations, partition_by=partition_by, partition_transform=partition_transform, partition_method=partition_method, distribute_by=distribute_by, localsort_by=localsort_by, arrow_cache=arrow_cache, reflection_strategy=reflection_strategy, max_wait_time=max_wait_time, check_interval=check_interval) }}
{%- endcall %}

{{ run_hooks(post_hooks) }}
Expand Down
32 changes: 31 additions & 1 deletion tests/functional/adapter/dremio_specific/test_reflections.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from dbt.adapters.dremio import DremioCredentials
from dbt.adapters.dremio.api.parameters import ParametersBuilder
from dbt.adapters.dremio.api.authentication import DremioAuthentication
from dbt.tests.util import run_dbt
from dbt.tests.util import run_dbt, run_dbt_and_capture
from pydantic.experimental.pipeline import transform


view1_model = """
SELECT IncidntNum, Category, Descript, DayOfWeek, TO_DATE("SF_incidents2016.json"."Date", 'YYYY-MM-DD', 1) AS "Date", "SF_incidents2016.json"."Time" AS "Time", PdDistrict, Resolution, Address, X, Y, Location, PdId
FROM Samples."samples.dremio.com"."SF_incidents2016.json" AS "SF_incidents2016.json";
Expand Down Expand Up @@ -141,6 +142,25 @@
-- depends_on: {{ ref('view1') }}
"""

wait_strategy_timeout_reflection = """
{{ config(alias='This will mock a timeout when using wait',
materialized='reflection',
reflection_strategy='wait',
max_wait_time=1,
display=['Date', 'DayOfWeek', 'PdDistrict', 'Category'],
reflection_type='raw')}}
-- depends_on: {{ ref('view1') }}
"""

trigger_strategy_timeout_reflection = """
{{ config(alias='This will mock a timeout when using trigger',
materialized='reflection',
reflection_strategy='trigger',
max_wait_time=1,
display=['Date', 'DayOfWeek', 'PdDistrict', 'Category'],
reflection_type='raw')}}
-- depends_on: {{ ref('view1') }}
"""

class TestReflectionsDremio:
@pytest.fixture(scope="class")
Expand Down Expand Up @@ -170,6 +190,8 @@ def models(self):
"default_displays_model.sql": default_displays_model,
"name_reflection_from_alias.sql": name_reflection_from_alias_model,
"name_reflection_from_filename.sql": name_reflection_from_filename_model,
"wait_strategy_timeout_reflection.sql": wait_strategy_timeout_reflection,
"trigger_strategy_timeout_reflection.sql": trigger_strategy_timeout_reflection,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize testing depend is not that easy, but there must be a way. Is it possible to create a reflection that will never be finish materializing and then try to depend on it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of depend is that it will be stuck at the job running that reflection until it is complete. Even if we could make a reflection that would never materialize we'd just be stuck waiting for it forever

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea would've been to assert that the function hasn't returned after a few seconds and then killing the thread. But I don't know how complex we want this to get

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree with Simon that we should have guarantees that it will end. Waiting without a proof for a green result may be frustrating for customers.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be the wait strategy where the customer would set the maximum time they'd want to wait. Should we remove depend in that case? Or have it act as wait but throw an error instead of just skipping?

}

def _create_path_list(self, database, schema):
Expand Down Expand Up @@ -407,3 +429,11 @@ def testNameReflectionFromFilename(self, project, client):
assert "partitionFields" not in reflection
assert "sortFields" not in reflection
assert reflection["partitionDistributionStrategy"] == "STRIPED"

def testWaitStrategyTimeoutReflection(self, project):
(results, log_output) = run_dbt_and_capture(["run", "--select", "view1", "wait_strategy_timeout_reflection"])
assert "did not become available within 1 seconds, skipping wait" in log_output
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get a positive test case for the wait strategy as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could but I was avoiding adding it as a reflection going through is already tested by all the other reflection tests + it has the potential to become flaky if it takes longer than expected to be materialized

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without a proper mocks these tests could be all eventually flaky


def testTriggerStrategyTimeoutReflection(self, project):
(results, log_output) = run_dbt_and_capture(["run", "--select", "view1", "trigger_strategy_timeout_reflection"])
assert "did not become available within 1 seconds, skipping wait" not in log_output