Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20250731-162638.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Hook prioritization (on-run-start & on-run-end)
time: 2025-07-31T16:26:38.632262+03:00
custom:
Author: ammarchalifah-bolt
Issue: "10592"
8 changes: 4 additions & 4 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ select =
W
F
ignore =
W503 # makes Flake8 work like black
W503
W504
E203 # makes Flake8 work like black
E704 # makes Flake8 work like black
E203
E704
E741
E501 # long line checking is done in black
E501
exclude = test/
per-file-ignores =
*/__init__.py: F401
1 change: 1 addition & 0 deletions core/dbt/artifacts/resources/v1/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class Hook(dbtClassMixin):
sql: str
transaction: bool = True
index: Optional[int] = None
priority: Optional[int] = None


@dataclass
Expand Down
1 change: 0 additions & 1 deletion core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from dbt.mp_context import get_mp_context
from dbt_common.events.base_types import EventMsg


@dataclass
class dbtRunnerResult:
"""Contains the result of an invocation of the dbtRunner"""
Expand Down
8 changes: 4 additions & 4 deletions core/dbt/config/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,8 @@ def create_project(self, rendered: RenderComponents) -> "Project":
vars_value = VarProvider(vars_dict)
# There will never be any project_env_vars when it's first created
project_env_vars: Dict[str, Any] = {}
on_run_start: List[str] = value_or(cfg.on_run_start, [])
on_run_end: List[str] = value_or(cfg.on_run_end, [])
on_run_start: List[Union[str, Dict[str, int]]] = value_or(cfg.on_run_start, [])
on_run_end: List[Union[str, Dict[str, int]]] = value_or(cfg.on_run_end, [])

query_comment = _query_comment_from_cfg(cfg.query_comment)
packages: PackageConfig = package_config_from_data(
Expand Down Expand Up @@ -632,8 +632,8 @@ class Project:
packages_specified_path: str
quoting: Dict[str, Any]
models: Dict[str, Any]
on_run_start: List[str]
on_run_end: List[str]
on_run_start: List[Union[str, Dict[str, int]]]
on_run_end: List[Union[str, Dict[str, int]]]
dispatch: List[Dict[str, Any]]
seeds: Dict[str, Any]
snapshots: Dict[str, Any]
Expand Down
4 changes: 4 additions & 0 deletions core/dbt/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@
RUN_RESULTS_FILE_NAME = "run_results.json"
CATALOG_FILENAME = "catalog.json"
SOURCE_RESULT_FILE_NAME = "sources.json"

# Hook priority constants
DEFAULT_HOOK_PRIORITY = 50
PROJECT_HOOK_PRIORITY = 100
60 changes: 60 additions & 0 deletions core/dbt/include/jsonschemas/project/0.0.110.json
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,36 @@
{
"$ref": "#/definitions/StringOrArrayOfStrings"
},
{
"anyOf": [
{
"type": "string"
},
{
"type": "array",
"items": {
"type": "string"
}
},
{
"type": "array",
"items": {
"type": "object",
"properties": {
"sql": {
"type": "string"
},
"priority": {
"type": "integer"
}
},
"required": [
"sql"
]
}
}
]
},
{
"type": "null"
}
Expand All @@ -155,6 +185,36 @@
{
"$ref": "#/definitions/StringOrArrayOfStrings"
},
{
"anyOf": [
{
"type": "string"
},
{
"type": "array",
"items": {
"type": "string"
}
},
{
"type": "array",
"items": {
"type": "object",
"properties": {
"sql": {
"type": "string"
},
"priority": {
"type": "integer"
}
},
"required": [
"sql"
]
}
}
]
},
{
"type": "null"
}
Expand Down
34 changes: 34 additions & 0 deletions core/dbt/include/jsonschemas/project/0.0.85.json
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,23 @@
},
{
"type": "null"
},
{
"type": "array",
"items": {
"type": "object",
"properties": {
"sql": {
"type": "string"
},
"priority": {
"type": "integer"
}
},
"required": [
"sql"
]
}
}
]
},
Expand All @@ -134,6 +151,23 @@
},
{
"type": "null"
},
{
"type": "array",
"items": {
"type": "object",
"properties": {
"sql": {
"type": "string"
},
"priority": {
"type": "integer"
}
},
"required": [
"sql"
]
}
}
]
},
Expand Down
10 changes: 9 additions & 1 deletion core/dbt/include/jsonschemas/resources/0.0.110.json
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,16 @@
"boolean",
"null"
]
},
"priority": {
"type": [
"integer",
"null"
]
}
}
},
"additionalProperties": false,
"required": ["sql", "transaction"]
},
"Hooks": {
"anyOf": [
Expand Down
10 changes: 9 additions & 1 deletion core/dbt/include/jsonschemas/resources/0.0.85.json
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,16 @@
"boolean",
"null"
]
},
"priority": {
"type": [
"integer",
"null"
]
}
}
},
"additionalProperties": false,
"required": ["sql", "transaction"]
},
"Hooks": {
"anyOf": [
Expand Down
9 changes: 8 additions & 1 deletion core/dbt/include/jsonschemas/resources/latest.json
Original file line number Diff line number Diff line change
Expand Up @@ -2076,9 +2076,16 @@
"boolean",
"null"
]
},
"priority": {
"type": [
"integer",
"null"
]
}
},
"additionalProperties": false
"additionalProperties": false,
"required": ["sql", "transaction"]
},
"Hooks": {
"anyOf": [
Expand Down
53 changes: 47 additions & 6 deletions core/dbt/parser/hooks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dataclasses import dataclass
from typing import Iterable, Iterator, List, Tuple, Union
from typing import Any, Dict, Iterable, Iterator, List, Tuple, Union

from dbt.constants import DEFAULT_HOOK_PRIORITY
from dbt.context.context_config import ContextConfig
from dbt.contracts.files import FilePath
from dbt.contracts.graph.nodes import HookNode
Expand All @@ -17,6 +18,7 @@ class HookBlock(FileBlock):
value: str
index: int
hook_type: RunHookType
priority: int = DEFAULT_HOOK_PRIORITY

@property
def contents(self):
Expand All @@ -33,14 +35,35 @@ def __init__(self, project, source_file, hook_type) -> None:
self.source_file = source_file
self.hook_type = hook_type

def _hook_list(self, hooks: Union[str, List[str], Tuple[str, ...]]) -> List[str]:
def _hook_list(
self, hooks: Union[str, List[Union[str, Dict]], Tuple[Union[str, Dict], ...]]
) -> List[Dict[str, Any]]:
"""Convert hook definitions to a standardized list format."""
if isinstance(hooks, tuple):
hooks = list(hooks)
elif not isinstance(hooks, list):
hooks = [hooks]
return hooks

def get_hook_defs(self) -> List[str]:
# Standardize format - ensure all hooks have a consistent structure
result = []
for hook in hooks:
if isinstance(hook, str):
# Convert string to dict format with default priority
result.append({"sql": hook, "priority": DEFAULT_HOOK_PRIORITY})
elif isinstance(hook, dict) and "sql" in hook:
# Ensure required keys exist with defaults
hook_dict = {
"sql": hook["sql"],
"priority": hook.get("priority", DEFAULT_HOOK_PRIORITY),
}
result.append(hook_dict)
elif isinstance(hook, dict):
# Backward compatibility for any other dict format
result.append({"sql": str(hook), "priority": DEFAULT_HOOK_PRIORITY})

return result

def get_hook_defs(self) -> List[Dict[str, Any]]:
if self.hook_type == RunHookType.Start:
hooks = self.project.on_run_start
elif self.hook_type == RunHookType.End:
Expand All @@ -56,12 +79,22 @@ def get_hook_defs(self) -> List[str]:
def __iter__(self) -> Iterator[HookBlock]:
hooks = self.get_hook_defs()
for index, hook in enumerate(hooks):
# Extract SQL and priority
if isinstance(hook, dict):
sql = hook.get("sql", "")
priority = hook.get("priority", DEFAULT_HOOK_PRIORITY)
else:
# Fallback for any unexpected format
sql = str(hook)
priority = DEFAULT_HOOK_PRIORITY

yield HookBlock(
file=self.source_file,
project=self.project.project_name,
value=hook,
value=sql,
index=index,
hook_type=self.hook_type,
priority=priority,
)


Expand Down Expand Up @@ -98,7 +131,8 @@ def _create_parsetime_node(
**kwargs,
) -> HookNode:

return super()._create_parsetime_node(
# Create the node using the parent method
node = super()._create_parsetime_node(
block=block,
path=path,
config=config,
Expand All @@ -108,6 +142,13 @@ def _create_parsetime_node(
tags=[str(block.hook_type)],
)

# Store the priority in the node's meta
if not node.config.meta:
node.config.meta = {}
node.config.meta["hook_priority"] = block.priority

return node

@property
def resource_type(self) -> NodeType:
return NodeType.Operation
Expand Down
20 changes: 17 additions & 3 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dbt.cli.flags import Flags
from dbt.clients.jinja import MacroGenerator
from dbt.config import RuntimeConfig
from dbt.constants import DEFAULT_HOOK_PRIORITY, PROJECT_HOOK_PRIORITY
from dbt.context.providers import generate_runtime_model_context
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import BatchContext, HookNode, ModelNode, ResultNode
Expand Down Expand Up @@ -929,11 +930,24 @@ def _submit_batch(

return relation_exists

def _hook_keyfunc(self, hook: HookNode) -> Tuple[str, Optional[int]]:
def _hook_keyfunc(self, hook: HookNode) -> Tuple[int, str, Optional[int]]:
"""Sort hooks by priority, then package name, then index"""
package_name = hook.package_name
if package_name == self.config.project_name:

# Default priority
priority = DEFAULT_HOOK_PRIORITY

# Get priority from node meta if available
if hook.config and hook.config.meta and "hook_priority" in hook.config.meta:
priority = hook.config.meta["hook_priority"]

# Special case for project hooks - if no explicit priority,
# make them run last (preserving backward compatibility)
if package_name == self.config.project_name and priority == DEFAULT_HOOK_PRIORITY:
package_name = BiggestName("")
return package_name, hook.index
priority = PROJECT_HOOK_PRIORITY

return priority, package_name, hook.index

def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]:

Expand Down
Loading
Loading