-
Notifications
You must be signed in to change notification settings - Fork 205
Ele 4724 dbt fusion support #1961
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughIntroduces DbtFusionRunner and a RunnerMethod-based dynamic runner selection; makes dbt executable path configurable; expands command results to include stderr and logs both stdout and stderr; updates subprocess runner accordingly; replaces certain macro .call_macro() usages with direct calls and consolidates table-creation macros. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Factory
participant RunnerClass
participant DbtExecutable
User->>Factory: create_dbt_runner(runner_method?)
Factory->>Factory: get_dbt_runner_method()
Factory->>Factory: get_dbt_runner_class(method)
Factory->>RunnerClass: instantiate selected runner
User->>RunnerClass: run_operation(command, capture_output)
RunnerClass->>DbtExecutable: execute (path from _get_dbt_command_name)
DbtExecutable-->>RunnerClass: stdout (output), stderr
RunnerClass->>RunnerClass: parse stdout -> results
alt stderr present & log_errors
RunnerClass->>RunnerClass: log stderr (error-level)
end
RunnerClass-->>User: return DbtCommandResult (stdout/output, stderr, success, result_obj)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Comment |
|
👋 @haritamar |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (1)
elementary/clients/dbt/factory.py (1)
55-56: Fix Yoda condition for better readability.Apply this diff to improve readability:
- if DBT_VERSION >= version.Version("1.5.0"): + if version.Version("1.5.0") <= DBT_VERSION: return RunnerMethod.API
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
elementary/clients/dbt/command_line_dbt_runner.py(3 hunks)elementary/clients/dbt/dbt_fusion_runner.py(1 hunks)elementary/clients/dbt/factory.py(3 hunks)elementary/clients/dbt/subprocess_dbt_runner.py(2 hunks)tests/tests_with_db/dbt_project/macros/materializations.sql(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (4)
elementary/clients/dbt/dbt_fusion_runner.py (1)
elementary/clients/dbt/subprocess_dbt_runner.py (2)
SubprocessDbtRunner(18-80)_get_dbt_command_name(54-55)
elementary/clients/dbt/subprocess_dbt_runner.py (2)
elementary/clients/dbt/dbt_fusion_runner.py (1)
_get_dbt_command_name(7-8)elementary/clients/dbt/command_line_dbt_runner.py (2)
run(217-242)DbtCommandResult(25-28)
elementary/clients/dbt/factory.py (4)
elementary/clients/dbt/command_line_dbt_runner.py (1)
CommandLineDbtRunner(31-331)elementary/clients/dbt/dbt_fusion_runner.py (1)
DbtFusionRunner(6-8)elementary/clients/dbt/subprocess_dbt_runner.py (1)
SubprocessDbtRunner(18-80)elementary/clients/dbt/api_dbt_runner.py (1)
APIDbtRunner(26-66)
elementary/clients/dbt/command_line_dbt_runner.py (1)
elementary/clients/dbt/dbt_log.py (1)
parse_dbt_output(32-40)
🪛 Ruff (0.12.2)
elementary/clients/dbt/factory.py
55-55: Yoda condition detected
Rewrite as version.Version("1.5.0") <= DBT_VERSION
(SIM300)
🔇 Additional comments (4)
tests/tests_with_db/dbt_project/macros/materializations.sql (1)
3-16: Good simplification of macro invocations.The removal of
.call_macro()method calls makes the code more readable and idiomatic. This is a clean refactor that maintains the same functionality.elementary/clients/dbt/subprocess_dbt_runner.py (1)
30-56: Good improvements for extensibility and error handling.The introduction of
_get_dbt_command_name()method and stderr capture are valuable improvements:
- Enables subclasses like
DbtFusionRunnerto customize the dbt executable path- Capturing stderr provides better error diagnostics
elementary/clients/dbt/command_line_dbt_runner.py (1)
28-28: Good enhancement of error handling with stderr support.The addition of stderr support to
DbtCommandResultand the separate handling of stdout/stderr streams improves error diagnostics while maintaining proper separation of concerns.Also applies to: 197-213
elementary/clients/dbt/factory.py (1)
15-71: Excellent refactoring of the runner selection logic.The introduction of the
RunnerMethodenum and the separation of concerns intoget_dbt_runner_method()andget_dbt_runner_class()functions improves:
- Code modularity and testability
- Extensibility for adding new runner types
- Type safety with the enum
| def _get_dbt_command_name(self) -> str: | ||
| return os.path.expanduser("~/.local/bin/dbt") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider making the dbt path configurable for better portability.
The hardcoded path ~/.local/bin/dbt may not exist on all systems. Consider making this configurable via an environment variable or configuration parameter to improve portability across different environments.
Apply this diff to make the path configurable:
class DbtFusionRunner(SubprocessDbtRunner):
def _get_dbt_command_name(self) -> str:
- return os.path.expanduser("~/.local/bin/dbt")
+ dbt_fusion_path = os.getenv("DBT_FUSION_PATH", "~/.local/bin/dbt")
+ return os.path.expanduser(dbt_fusion_path)Additionally, consider validating that the executable exists:
def _get_dbt_command_name(self) -> str:
dbt_fusion_path = os.getenv("DBT_FUSION_PATH", "~/.local/bin/dbt")
expanded_path = os.path.expanduser(dbt_fusion_path)
if not os.path.isfile(expanded_path) or not os.access(expanded_path, os.X_OK):
raise FileNotFoundError(f"dbt executable not found or not executable at: {expanded_path}")
return expanded_path🤖 Prompt for AI Agents
In elementary/clients/dbt/dbt_fusion_runner.py around lines 7 to 8, the dbt
executable path is hardcoded as "~/.local/bin/dbt", which reduces portability.
Modify the method to read the path from an environment variable
"DBT_FUSION_PATH" with a default fallback to "~/.local/bin/dbt". Expand the user
path and add a check to verify that the file exists and is executable, raising a
FileNotFoundError if not. Return the validated expanded path.
| log_format: str, | ||
| ) -> DbtCommandResult: | ||
| try: | ||
| print([self._get_dbt_command_name()] + dbt_command_args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace print statement with proper logging.
The print statement should use the logger and respect the quiet flag for consistency with the rest of the codebase.
Apply this diff to use proper logging:
- print([self._get_dbt_command_name()] + dbt_command_args)
+ command_to_run = [self._get_dbt_command_name()] + dbt_command_args
+ if not quiet:
+ logger.info(f"Executing command: {' '.join(command_to_run)}")
+ else:
+ logger.debug(f"Executing command: {' '.join(command_to_run)}")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| print([self._get_dbt_command_name()] + dbt_command_args) | |
| # replace debug print with proper logging | |
| - print([self._get_dbt_command_name()] + dbt_command_args) | |
| + command_to_run = [self._get_dbt_command_name()] + dbt_command_args | |
| + if not quiet: | |
| + logger.info(f"Executing command: {' '.join(command_to_run)}") | |
| + else: | |
| + logger.debug(f"Executing command: {' '.join(command_to_run)}") |
🤖 Prompt for AI Agents
In elementary/clients/dbt/subprocess_dbt_runner.py at line 28, replace the print
statement with a logger call that respects the quiet flag. Use the existing
logger to log the command at an appropriate level (e.g., debug or info) only if
quiet mode is not enabled, ensuring consistency with the rest of the codebase's
logging practices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🔭 Outside diff range comments (1)
tests/tests_with_db/dbt_project/macros/create_all_types_table.sql (1)
29-34: BigQuery macro: trailing comma will break compilation
CURRENT_TIMESTAMP() as timestamp_col,(line 32) leaves a dangling comma before the closing{% endset %}.
BigQuery does not accept a trailing comma in theSELECTlist, so the model will fail to compile / run.- CURRENT_TIMESTAMP() as timestamp_col, + CURRENT_TIMESTAMP() as timestamp_col
🧹 Nitpick comments (1)
tests/tests_with_db/dbt_project/macros/create_all_types_table.sql (1)
5-8: High duplication across warehouse-specific macrosThe only varying pieces across the 4 warehouse implementations are:
- the SQL body,
- the adapter dispatch prefix.
Everything else (database/schema lookup, relation creation,
edr_create_table_asinvocation) is duplicated.
A tiny wrapper could eliminate that noise and keep each macro focused solely on its column definitions.Example sketch:
{% macro _create_all_types(relation_name, sql_query) %} {% set database_name, schema_name = elementary.get_package_database_and_schema('elementary') %} {% set _, relation = dbt.get_or_create_relation(database=database_name, schema=schema_name, identifier=relation_name, type='table') %} {% do elementary.edr_create_table_as(True, relation, sql_query) %} {% endmacro %}Each warehouse macro would then call the helper with its custom
sql_query.
This cuts ~20 lines per macro and centralises any future tweaks.Also applies to: 37-41, 85-89, 125-129
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
tests/tests_with_db/dbt_project/macros/create_all_types_table.sql(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: code-quality
🔇 Additional comments (1)
tests/tests_with_db/dbt_project/macros/create_all_types_table.sql (1)
34-34: Verify edr_create_table_as argument orderAll four macros call
elementary.edr_create_table_as(false, relation, sql_query).Please double-check that the helper’s signature is
(replace, relation, sql)in that exact order.
If the first boolean controls “if_exists → replace”, setting it tofalsewill keep a stale table from previous test runs, which defeats the point of deterministic integration tests.If the intent is “always recreate”, pass
true(or the constantTruefor better readability in Jinja).Also applies to: 82-82, 122-122, 180-180
653f183 to
6804551
Compare
6804551 to
04cabab
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (10)
tests/tests_with_db/dbt_project/macros/materializations.sql (2)
3-6: Direct macro calls: ensure existence check uses 'is defined'.When switching from .call_macro to direct calls, guard for missing macros to avoid UndefinedError at render time.
Apply this diff:
-{% do return(dbt.materialization_test_default()) %} +{% do return(dbt.materialization_test_default()) %}And for the Snowflake block below, update the existence check (see next comment).
11-16: Fix macro existence check for direct calls.Use Jinja’s “is defined” when testing for optional macros.
- {% if dbt.materialization_test_snowflake %} + {% if dbt.materialization_test_snowflake is defined %} {% do return(dbt.materialization_test_snowflake()) %} {% else %} {% do return(dbt.materialization_test_default()) %} {% endif %}elementary/clients/dbt/subprocess_dbt_runner.py (3)
28-39: Prefer text mode, robust decoding, and RUF005 fix.Use text=True to avoid manual decoding and adopt star expansion per Ruff.
- result = subprocess.run( - [self._get_dbt_command_name()] + dbt_command_args, + result = subprocess.run( + [self._get_dbt_command_name(), *dbt_command_args], check=self.raise_on_failure, - capture_output=capture_output or quiet, + capture_output=capture_output or quiet, + text=True, + encoding="utf-8", + errors="replace", env=self._get_command_env(), cwd=self.project_dir, ) success = result.returncode == 0 - output = result.stdout.decode() if result.stdout else None - stderr = result.stderr.decode() if result.stderr else None + output = result.stdout or None + stderr = result.stderr or None
41-49: Also parse and surface stderr on failure.On CalledProcessError, include stderr logs for better diagnostics.
- logs = ( - list(parse_dbt_output(err.output.decode(), log_format)) - if err.output - else [] - ) + logs = [] + if getattr(err, "output", None): + logs.extend(list(parse_dbt_output(err.output, log_format))) + if getattr(err, "stderr", None): + logs.extend(list(parse_dbt_output(err.stderr, log_format))) - if capture_output and (log_output or is_debug()): + if (capture_output or quiet) and (log_output or is_debug()): for log in logs: logger.info(log.msg)Note: relies on text=True above; drop .decode().
53-55: Make CLI name overridable via env for flexibility.Allow overriding “dbt” path without subclassing.
- def _get_dbt_command_name(self) -> str: - return "dbt" + def _get_dbt_command_name(self) -> str: + return os.getenv("EDR_DBT_CLI", "dbt")elementary/clients/dbt/factory.py (5)
50-57: Normalize and validate DBT_RUNNER_METHOD robustly.Handle case-insensitivity and invalid values with a safe fallback and warning.
-def get_dbt_runner_method() -> RunnerMethod: - runner_method = os.getenv("DBT_RUNNER_METHOD") - if runner_method: - return RunnerMethod(runner_method) +def get_dbt_runner_method() -> RunnerMethod: + runner_method = os.getenv("DBT_RUNNER_METHOD") + if runner_method: + try: + return RunnerMethod(runner_method.strip().lower()) + except ValueError: + # Fall through to default resolution + from elementary.utils.log import get_logger + get_logger(__name__).warning( + f"Invalid DBT_RUNNER_METHOD='{runner_method}', falling back to default" + )
61-66: Guard API runner import for dbt<1.5 with clearer error.Provide an explicit message instead of a raw ImportError.
- if runner_method == RunnerMethod.API: - # Import it internally since it will fail if the dbt version is below 1.5.0 - from elementary.clients.dbt.api_dbt_runner import APIDbtRunner - - return APIDbtRunner + if runner_method == RunnerMethod.API: + # Import internally; dbt<1.5 lacks the CLI API + try: + from elementary.clients.dbt.api_dbt_runner import APIDbtRunner + except Exception as e: + raise RuntimeError( + f"API runner is unavailable for dbt version {DBT_VERSION}" + ) from e + return APIDbtRunner
34-47: Don’t run deps for Fusion runner even if force_dbt_deps=True.Fusion runner no-ops _run_deps_if_needed; forcing deps would invoke unsupported deps. Ignore the flag for Fusion.
- runner_class = get_dbt_runner_class(runner_method) - return runner_class( + runner_class = get_dbt_runner_class(runner_method) + force_flag = False if runner_method == RunnerMethod.FUSION else force_dbt_deps + return runner_class( project_dir=project_dir, profiles_dir=profiles_dir, target=target, raise_on_failure=raise_on_failure, env_vars=env_vars, vars=vars, secret_vars=secret_vars, allow_macros_without_package_prefix=allow_macros_without_package_prefix, run_deps_if_needed=run_deps_if_needed, - force_dbt_deps=force_dbt_deps, + force_dbt_deps=force_flag, )
71-71: Minor: tighten ValueError message (TRY003).Keep the message compact or use a dedicated exception type. Not blocking.
- raise ValueError(f"Invalid runner method: {runner_method}") + raise ValueError(f"Invalid runner method: {runner_method!s}")
15-19: Enum looks good; consider documenting intended defaults.Optional: brief docstring noting default resolution (API for dbt>=1.5).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (7)
elementary/clients/dbt/api_dbt_runner.py(1 hunks)elementary/clients/dbt/command_line_dbt_runner.py(2 hunks)elementary/clients/dbt/dbt_fusion_runner.py(1 hunks)elementary/clients/dbt/factory.py(3 hunks)elementary/clients/dbt/subprocess_dbt_runner.py(2 hunks)tests/tests_with_db/dbt_project/macros/create_all_types_table.sql(4 hunks)tests/tests_with_db/dbt_project/macros/materializations.sql(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- elementary/clients/dbt/command_line_dbt_runner.py
- elementary/clients/dbt/dbt_fusion_runner.py
- tests/tests_with_db/dbt_project/macros/create_all_types_table.sql
- elementary/clients/dbt/api_dbt_runner.py
🧰 Additional context used
🧬 Code graph analysis (2)
elementary/clients/dbt/factory.py (4)
elementary/clients/dbt/command_line_dbt_runner.py (1)
CommandLineDbtRunner(31-325)elementary/clients/dbt/dbt_fusion_runner.py (1)
DbtFusionRunner(8-14)elementary/clients/dbt/subprocess_dbt_runner.py (1)
SubprocessDbtRunner(18-79)elementary/clients/dbt/api_dbt_runner.py (1)
APIDbtRunner(26-68)
elementary/clients/dbt/subprocess_dbt_runner.py (2)
elementary/clients/dbt/dbt_fusion_runner.py (1)
_get_dbt_command_name(9-10)elementary/clients/dbt/command_line_dbt_runner.py (1)
DbtCommandResult(25-28)
🪛 Ruff (0.13.1)
elementary/clients/dbt/factory.py
71-71: Avoid specifying long messages outside the exception class
(TRY003)
elementary/clients/dbt/subprocess_dbt_runner.py
29-29: Consider [self._get_dbt_command_name(), *dbt_command_args] instead of concatenation
Replace with [self._get_dbt_command_name(), *dbt_command_args]
(RUF005)
🔇 Additional comments (1)
elementary/clients/dbt/subprocess_dbt_runner.py (1)
28-30: (FYI) Prior “print vs logger” issue is resolved.No stray print remains; logging is handled in the base runner.
| result = subprocess.run( | ||
| ["dbt"] + dbt_command_args, | ||
| [self._get_dbt_command_name()] + dbt_command_args, | ||
| check=self.raise_on_failure, | ||
| capture_output=capture_output or quiet, | ||
| env=self._get_command_env(), | ||
| cwd=self.project_dir, | ||
| ) | ||
| success = result.returncode == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add an execution timeout to avoid hung subprocesses.
Long/hung dbt processes will block indefinitely; add a configurable timeout.
- result = subprocess.run(
+ result = subprocess.run(
[self._get_dbt_command_name(), *dbt_command_args],
check=self.raise_on_failure,
capture_output=capture_output or quiet,
text=True,
encoding="utf-8",
errors="replace",
env=self._get_command_env(),
cwd=self.project_dir,
+ timeout=float(os.getenv("EDR_DBT_SUBPROCESS_TIMEOUT_SEC", "0")) or None,
)Optionally document EDR_DBT_SUBPROCESS_TIMEOUT_SEC in README.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| result = subprocess.run( | |
| ["dbt"] + dbt_command_args, | |
| [self._get_dbt_command_name()] + dbt_command_args, | |
| check=self.raise_on_failure, | |
| capture_output=capture_output or quiet, | |
| env=self._get_command_env(), | |
| cwd=self.project_dir, | |
| ) | |
| success = result.returncode == 0 | |
| result = subprocess.run( | |
| [self._get_dbt_command_name(), *dbt_command_args], | |
| check=self.raise_on_failure, | |
| capture_output=capture_output or quiet, | |
| text=True, | |
| encoding="utf-8", | |
| errors="replace", | |
| env=self._get_command_env(), | |
| cwd=self.project_dir, | |
| timeout=float(os.getenv("EDR_DBT_SUBPROCESS_TIMEOUT_SEC", "0")) or None, | |
| ) | |
| success = result.returncode == 0 |
🧰 Tools
🪛 Ruff (0.13.1)
28-28: subprocess call: check for execution of untrusted input
(S603)
29-29: Consider [self._get_dbt_command_name(), *dbt_command_args] instead of concatenation
Replace with [self._get_dbt_command_name(), *dbt_command_args]
(RUF005)
🤖 Prompt for AI Agents
In elementary/clients/dbt/subprocess_dbt_runner.py around lines 28-35, the
subprocess.run call can hang indefinitely; add a configurable timeout by reading
EDR_DBT_SUBPROCESS_TIMEOUT_SEC from the environment (fallback to a sensible
default, e.g., 300s), convert to int, and pass it as the timeout argument to
subprocess.run; handle subprocess.TimeoutExpired by logging/raising a clear
error or marking the run as failed and ensuring the child process is cleaned up;
update any callers/tests accordingly and optionally document the new
EDR_DBT_SUBPROCESS_TIMEOUT_SEC setting in the README.
null
Summary by CodeRabbit
New Features
Improvements
Refactor