Skip to content

Commit 2a30a94

Browse files
authored
Add catalogs tracking (#12747)
* add tracking for catalog count * add catalog_type tracking * add changie * fix failing ut * fix type hints
1 parent fc2104d commit 2a30a94

File tree

14 files changed

+285
-32
lines changed

14 files changed

+285
-32
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Features
2+
body: Add catalogs.yml usage tracking
3+
time: 2026-03-30T19:59:59.334539+05:30
4+
custom:
5+
Author: ash2shukla
6+
Issue: "1122"

core/dbt/cli/main.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ def build(ctx, **kwargs):
213213
ctx.obj["flags"],
214214
ctx.obj["runtime_config"],
215215
ctx.obj["manifest"],
216+
catalogs=ctx.obj.get("catalogs"),
216217
)
217218

218219
results = task.run()
@@ -281,6 +282,7 @@ def docs_generate(ctx, **kwargs):
281282
ctx.obj["flags"],
282283
ctx.obj["runtime_config"],
283284
ctx.obj["manifest"],
285+
catalogs=ctx.obj.get("catalogs"),
284286
)
285287

286288
results = task.run()
@@ -352,6 +354,7 @@ def compile(ctx, **kwargs):
352354
ctx.obj["flags"],
353355
ctx.obj["runtime_config"],
354356
ctx.obj["manifest"],
357+
catalogs=ctx.obj.get("catalogs"),
355358
)
356359

357360
results = task.run()
@@ -402,6 +405,7 @@ def show(ctx, **kwargs):
402405
ctx.obj["flags"],
403406
ctx.obj["runtime_config"],
404407
ctx.obj["manifest"],
408+
catalogs=ctx.obj.get("catalogs"),
405409
)
406410

407411
results = task.run()
@@ -527,6 +531,7 @@ def list(ctx, **kwargs):
527531
ctx.obj["flags"],
528532
ctx.obj["runtime_config"],
529533
ctx.obj["manifest"],
534+
catalogs=ctx.obj.get("catalogs"),
530535
)
531536

532537
results = task.run()
@@ -593,6 +598,7 @@ def run(ctx, **kwargs):
593598
ctx.obj["flags"],
594599
ctx.obj["runtime_config"],
595600
ctx.obj["manifest"],
601+
catalogs=ctx.obj.get("catalogs"),
596602
)
597603

598604
results = task.run()
@@ -659,6 +665,7 @@ def clone(ctx, **kwargs):
659665
ctx.obj["flags"],
660666
ctx.obj["runtime_config"],
661667
ctx.obj["manifest"],
668+
catalogs=ctx.obj.get("catalogs"),
662669
)
663670

664671
results = task.run()
@@ -691,6 +698,7 @@ def run_operation(ctx, **kwargs):
691698
ctx.obj["flags"],
692699
ctx.obj["runtime_config"],
693700
ctx.obj["manifest"],
701+
catalogs=ctx.obj.get("catalogs"),
694702
)
695703

696704
results = task.run()
@@ -727,6 +735,7 @@ def seed(ctx, **kwargs):
727735
ctx.obj["flags"],
728736
ctx.obj["runtime_config"],
729737
ctx.obj["manifest"],
738+
catalogs=ctx.obj.get("catalogs"),
730739
)
731740
results = task.run()
732741
success = task.interpret_results(results)
@@ -761,6 +770,7 @@ def snapshot(ctx, **kwargs):
761770
ctx.obj["flags"],
762771
ctx.obj["runtime_config"],
763772
ctx.obj["manifest"],
773+
catalogs=ctx.obj.get("catalogs"),
764774
)
765775

766776
results = task.run()
@@ -804,6 +814,7 @@ def freshness(ctx, **kwargs):
804814
ctx.obj["flags"],
805815
ctx.obj["runtime_config"],
806816
ctx.obj["manifest"],
817+
catalogs=ctx.obj.get("catalogs"),
807818
)
808819

809820
results = task.run()
@@ -847,6 +858,7 @@ def test(ctx, **kwargs):
847858
ctx.obj["flags"],
848859
ctx.obj["runtime_config"],
849860
ctx.obj["manifest"],
861+
catalogs=ctx.obj.get("catalogs"),
850862
)
851863

852864
results = task.run()

core/dbt/cli/requires.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,8 @@ def setup_manifest(ctx: Context, write: bool = True, write_perf_info: bool = Fal
411411
catalogs = load_catalogs(flags.PROJECT_DIR, ctx.obj["project"].project_name, flags.VARS)
412412
active_integrations = [get_active_write_integration(catalog) for catalog in catalogs]
413413

414+
ctx.obj["catalogs"] = catalogs
415+
414416
# if a manifest has already been set on the context, don't overwrite it
415417
if ctx.obj.get("manifest") is None:
416418
ctx.obj["manifest"] = parse_manifest(

core/dbt/compilation.py

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,18 @@
33
import os
44
import pickle
55
from collections import defaultdict, deque
6-
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
6+
from typing import (
7+
Any,
8+
Dict,
9+
Iterable,
10+
List,
11+
Literal,
12+
Optional,
13+
Sequence,
14+
Set,
15+
Tuple,
16+
Union,
17+
)
718

819
import networkx as nx # type: ignore
920
import sqlparse
@@ -50,11 +61,18 @@
5061

5162
graph_file_name = "graph.gpickle"
5263

64+
StatsKeyType = Union[NodeType, Literal["catalogs"]]
65+
5366

54-
def print_compile_stats(stats: Dict[NodeType, int]):
67+
def print_compile_stats(stats: Dict[StatsKeyType, int]):
5568
# create tracking event for resource_counts
5669
if dbt.tracking.active_user is not None:
57-
resource_counts = {k.pluralize(): v for k, v in stats.items()}
70+
resource_counts = {}
71+
for k, v in stats.items():
72+
if isinstance(k, NodeType):
73+
resource_counts[k.pluralize()] = v
74+
else:
75+
resource_counts[k] = v
5876
dbt.tracking.track_resource_counts(resource_counts)
5977

6078
# do not include resource types that are not actually defined in the project
@@ -72,8 +90,10 @@ def _node_enabled(node: ManifestNode):
7290
return True
7391

7492

75-
def _generate_stats(manifest: Manifest) -> Dict[NodeType, int]:
76-
stats: Dict[NodeType, int] = defaultdict(int)
93+
def _generate_stats(
94+
manifest: Manifest, catalogs: Optional[Sequence[Any]] = None
95+
) -> Dict[StatsKeyType, int]:
96+
stats: Dict[StatsKeyType, int] = defaultdict(int)
7797
for node in manifest.nodes.values():
7898
if _node_enabled(node):
7999
stats[node.resource_type] += 1
@@ -87,6 +107,8 @@ def _generate_stats(manifest: Manifest) -> Dict[NodeType, int]:
87107
stats[NodeType.SemanticModel] += len(manifest.semantic_models)
88108
stats[NodeType.SavedQuery] += len(manifest.saved_queries)
89109
stats[NodeType.Unit] += len(manifest.unit_tests)
110+
if catalogs is not None:
111+
stats["catalogs"] = len(catalogs)
90112

91113
# TODO: should we be counting dimensions + entities?
92114

@@ -705,7 +727,13 @@ def _compile_relation_for_foreign_key_constraint_to(
705727
# This method doesn't actually "compile" any of the nodes. That is done by the
706728
# "compile_node" method. This creates a Linker and builds the networkx graph,
707729
# writes out the graph.gpickle file, and prints the stats, returning a Graph object.
708-
def compile(self, manifest: Manifest, write=True, add_test_edges=False) -> Graph:
730+
def compile(
731+
self,
732+
manifest: Manifest,
733+
write=True,
734+
add_test_edges=False,
735+
catalogs: Optional[Sequence[Any]] = None,
736+
) -> Graph:
709737
self.initialize()
710738
linker = Linker()
711739
linker.link_graph(manifest)
@@ -737,14 +765,12 @@ def compile(self, manifest: Manifest, write=True, add_test_edges=False) -> Graph
737765
)
738766
)
739767

740-
stats = _generate_stats(manifest)
741-
742768
if write:
743769
self.write_graph_file(linker, manifest)
744770

745771
# Do not print these for list command
746772
if self.config.args.which != "list":
747-
stats = _generate_stats(manifest)
773+
stats = _generate_stats(manifest, catalogs)
748774
print_compile_stats(stats)
749775

750776
return Graph(linker.graph)

core/dbt/task/base.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import dbt.exceptions
1212
import dbt_common.exceptions.base
1313
from dbt import tracking
14+
from dbt.artifacts.resources import Catalog
1415
from dbt.artifacts.resources.types import NodeType
1516
from dbt.artifacts.schemas.results import (
1617
NodeStatus,
@@ -119,12 +120,17 @@ def move_to_nearest_project_dir(project_dir: Optional[str]) -> Path:
119120
# holding a manifest, and moving direcories.
120121
class ConfiguredTask(BaseTask):
121122
def __init__(
122-
self, args: Flags, config: RuntimeConfig, manifest: Optional[Manifest] = None
123+
self,
124+
args: Flags,
125+
config: RuntimeConfig,
126+
manifest: Optional[Manifest] = None,
127+
catalogs: Optional[List[Catalog]] = None,
123128
) -> None:
124129
super().__init__(args)
125130
self.config = config
126131
self.graph: Optional[Graph] = None
127132
self.manifest = manifest
133+
self.catalogs = catalogs
128134
self.compiler = Compiler(self.config)
129135

130136
def compile_manifest(self) -> None:
@@ -133,7 +139,7 @@ def compile_manifest(self) -> None:
133139

134140
start_compile_manifest = time.perf_counter()
135141

136-
self.graph = self.compiler.compile(self.manifest)
142+
self.graph = self.compiler.compile(self.manifest, catalogs=self.catalogs)
137143

138144
compile_time = time.perf_counter() - start_compile_manifest
139145
if dbt.tracking.active_user is not None:

core/dbt/task/build.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import Dict, Iterable, List, Optional, Set, Type
22

33
from dbt.adapters.base import BaseRelation
4+
from dbt.artifacts.resources import Catalog
45
from dbt.artifacts.schemas.results import NodeStatus
56
from dbt.artifacts.schemas.run import RunResult
67
from dbt.cli.flags import Flags
@@ -49,8 +50,14 @@ class BuildTask(RunTask):
4950
}
5051
ALL_RESOURCE_VALUES = frozenset({x for x in RUNNER_MAP.keys()})
5152

52-
def __init__(self, args: Flags, config: RuntimeConfig, manifest: Manifest) -> None:
53-
super().__init__(args, config, manifest)
53+
def __init__(
54+
self,
55+
args: Flags,
56+
config: RuntimeConfig,
57+
manifest: Manifest,
58+
catalogs: Optional[List[Catalog]] = None,
59+
) -> None:
60+
super().__init__(args, config, manifest, catalogs=catalogs)
5461
self.selected_unit_tests: Set = set()
5562
self.model_to_unit_test_map: Dict[str, List] = {}
5663

@@ -214,4 +221,6 @@ def get_runner_type(self, node) -> Optional[Type[BaseRunner]]:
214221
def compile_manifest(self) -> None:
215222
if self.manifest is None:
216223
raise DbtInternalError("compile_manifest called before manifest was loaded")
217-
self.graph: Graph = self.compiler.compile(self.manifest, add_test_edges=True)
224+
self.graph: Graph = self.compiler.compile(
225+
self.manifest, add_test_edges=True, catalogs=self.catalogs
226+
)

core/dbt/task/freshness.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,8 @@ def node_is_match(self, node):
197197

198198

199199
class FreshnessTask(RunTask):
200-
def __init__(self, args, config, manifest) -> None:
201-
super().__init__(args, config, manifest)
200+
def __init__(self, args, config, manifest, catalogs) -> None:
201+
super().__init__(args, config, manifest, catalogs)
202202

203203
if self.args.output:
204204
deprecations.warn(

core/dbt/task/list.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
2-
from typing import Iterator, List
2+
from typing import Iterator, List, Optional
33

4+
from dbt.artifacts.resources import Catalog
45
from dbt.cli.flags import Flags
56
from dbt.config.runtime import RuntimeConfig
67
from dbt.contracts.graph.manifest import Manifest
@@ -56,8 +57,14 @@ class ListTask(GraphRunnableTask):
5657
)
5758
)
5859

59-
def __init__(self, args: Flags, config: RuntimeConfig, manifest: Manifest) -> None:
60-
super().__init__(args, config, manifest)
60+
def __init__(
61+
self,
62+
args: Flags,
63+
config: RuntimeConfig,
64+
manifest: Manifest,
65+
catalogs: Optional[List[Catalog]] = None,
66+
) -> None:
67+
super().__init__(args, config, manifest, catalogs=catalogs)
6168
if self.args.models:
6269
if self.args.select:
6370
raise DbtRuntimeError('"models" and "select" are mutually exclusive arguments')

core/dbt/task/retry.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from dbt.cli.flags import Flags
88
from dbt.cli.types import Command as CliCommand
99
from dbt.config import RuntimeConfig
10+
from dbt.config.catalogs import load_catalogs
1011
from dbt.constants import RUN_RESULTS_FILE_NAME
1112
from dbt.contracts.state import load_result_state
1213
from dbt.flags import get_flags, set_flags
@@ -118,7 +119,12 @@ def __init__(self, args: Flags, config: RuntimeConfig) -> None:
118119

119120
# Parse manifest using resolved config/flags
120121
manifest = parse_manifest(retry_config, False, True, retry_flags.write_json, []) # type: ignore
121-
super().__init__(args, retry_config, manifest)
122+
catalogs = load_catalogs(
123+
str(retry_config.project_root),
124+
retry_config.project_name,
125+
retry_config.cli_vars,
126+
)
127+
super().__init__(args, retry_config, manifest, catalogs=catalogs)
122128
self.task_class = TASK_DICT.get(self.previous_command_name) # type: ignore
123129

124130
def run(self):
@@ -169,6 +175,7 @@ def get_graph_queue(self):
169175
get_flags(),
170176
self.config,
171177
self.manifest,
178+
catalogs=self.catalogs,
172179
)
173180

174181
if self.task_class == RunTask or self.task_class == BuildTask:

0 commit comments

Comments
 (0)