Skip to content

Commit 3693c2f

Browse files
authored
chore: initial telemetry impl (#118)
Initial telemetry impl for anonymous inference events
1 parent 6e6efc0 commit 3693c2f

File tree

7 files changed

+675
-8
lines changed

7 files changed

+675
-8
lines changed

README.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,37 @@ data-designer config list # View current settings
117117

118118
---
119119

120+
## Telemetry
121+
122+
Data Designer collects telemetry to help us improve the library for developers. We collect:
123+
124+
* The names of models used
125+
* The count of input tokens
126+
* The count of output tokens
127+
128+
**No user or device information is collected.** This data is not used to track any individual user behavior. It is used to see an aggregation of which models are the most popular for SDG. We will share this usage data with the community.
129+
130+
Specifically, a model name that is defined a `ModelConfig` object, is what will be collected. In the below example config:
131+
132+
```python
133+
ModelConfig(
134+
alias="nv-reasoning",
135+
model="openai/gpt-oss-20b",
136+
provider="nvidia",
137+
inference_parameters=InferenceParameters(
138+
temperature=0.3,
139+
top_p=0.9,
140+
max_tokens=4096,
141+
),
142+
)
143+
```
144+
145+
The value `openai/gpt-oss-20b` would be collected.
146+
147+
To disable telemetry capture, set `NEMO_TELEMETRY_ENABLED=false`.
148+
149+
---
150+
120151
## License
121152

122153
Apache License 2.0 – see [LICENSE](LICENSE) for details.

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ env = [
9898
"DISABLE_DATA_DESIGNER_PLUGINS=true",
9999
]
100100

101+
[tool.coverage.run]
102+
omit = [
103+
"src/data_designer/engine/models/telemetry.py",
104+
]
105+
101106
[tool.uv]
102107
package = true
103108
required-version = ">=0.7.10"

src/data_designer/engine/dataset_builders/column_wise_builder.py

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
# SPDX-License-Identifier: Apache-2.0
3+
from __future__ import annotations
34

45
import functools
6+
import importlib.metadata
57
import json
68
import logging
79
import time
10+
import uuid
811
from pathlib import Path
9-
from typing import Callable
12+
from typing import TYPE_CHECKING, Callable
1013

1114
import pandas as pd
1215

@@ -35,14 +38,21 @@
3538
from data_designer.engine.dataset_builders.utils.dataset_batch_manager import (
3639
DatasetBatchManager,
3740
)
41+
from data_designer.engine.models.telemetry import InferenceEvent, NemoSourceEnum, TaskStatusEnum, TelemetryHandler
3842
from data_designer.engine.processing.processors.base import Processor
3943
from data_designer.engine.processing.processors.drop_columns import DropColumnsProcessor
4044
from data_designer.engine.registry.data_designer_registry import DataDesignerRegistry
4145
from data_designer.engine.resources.resource_provider import ResourceProvider
4246

47+
if TYPE_CHECKING:
48+
from data_designer.engine.models.usage import ModelUsageStats
49+
4350
logger = logging.getLogger(__name__)
4451

4552

53+
_CLIENT_VERSION: str = importlib.metadata.version("data_designer")
54+
55+
4656
class ColumnWiseDatasetBuilder:
4757
def __init__(
4858
self,
@@ -89,11 +99,12 @@ def build(
8999

90100
generators = self._initialize_generators()
91101
start_time = time.perf_counter()
102+
group_id = uuid.uuid4().hex
92103

93104
self.batch_manager.start(num_records=num_records, buffer_size=buffer_size)
94105
for batch_idx in range(self.batch_manager.num_batches):
95106
logger.info(f"⏳ Processing batch {batch_idx + 1} of {self.batch_manager.num_batches}")
96-
self._run_batch(generators)
107+
self._run_batch(generators, batch_mode="batch", group_id=group_id)
97108
df_batch = self._run_processors(
98109
stage=BuildStage.POST_BATCH,
99110
dataframe=self.batch_manager.get_current_batch(as_dataframe=True),
@@ -114,10 +125,10 @@ def build_preview(self, *, num_records: int) -> pd.DataFrame:
114125
self._run_model_health_check_if_needed()
115126

116127
generators = self._initialize_generators()
117-
128+
group_id = uuid.uuid4().hex
118129
start_time = time.perf_counter()
119130
self.batch_manager.start(num_records=num_records, buffer_size=num_records)
120-
self._run_batch(generators, save_partial_results=False)
131+
self._run_batch(generators, batch_mode="preview", save_partial_results=False, group_id=group_id)
121132
dataset = self.batch_manager.get_current_batch(as_dataframe=True)
122133
self.batch_manager.reset()
123134

@@ -143,7 +154,10 @@ def _initialize_generators(self) -> list[ColumnGenerator]:
143154
for config in self._column_configs
144155
]
145156

146-
def _run_batch(self, generators: list[ColumnGenerator], *, save_partial_results: bool = True) -> None:
157+
def _run_batch(
158+
self, generators: list[ColumnGenerator], *, batch_mode: str, save_partial_results: bool = True, group_id: str
159+
) -> None:
160+
pre_batch_snapshot = self._resource_provider.model_registry.get_model_usage_snapshot()
147161
for generator in generators:
148162
generator.log_pre_generation()
149163
try:
@@ -166,6 +180,12 @@ def _run_batch(self, generators: list[ColumnGenerator], *, save_partial_results:
166180
)
167181
raise DatasetGenerationError(f"🛑 Failed to process {column_error_str}:\n{e}")
168182

183+
try:
184+
usage_deltas = self._resource_provider.model_registry.get_usage_deltas(pre_batch_snapshot)
185+
self._emit_batch_inference_events(batch_mode, usage_deltas, group_id)
186+
except Exception:
187+
pass
188+
169189
def _run_from_scratch_column_generator(self, generator: ColumnGenerator) -> None:
170190
df = generator.generate_from_scratch(self.batch_manager.num_records_batch)
171191
self.batch_manager.add_records(df.to_dict(orient="records"))
@@ -289,3 +309,25 @@ def _write_configs(self) -> None:
289309
json_file_name="model_configs.json",
290310
configs=self._resource_provider.model_registry.model_configs.values(),
291311
)
312+
313+
def _emit_batch_inference_events(
314+
self, batch_mode: str, usage_deltas: dict[str, ModelUsageStats], group_id: str
315+
) -> None:
316+
if not usage_deltas:
317+
return
318+
319+
events = [
320+
InferenceEvent(
321+
nemo_source=NemoSourceEnum.DATADESIGNER,
322+
task=batch_mode,
323+
task_status=TaskStatusEnum.SUCCESS,
324+
model=model_name,
325+
input_tokens=delta.token_usage.input_tokens,
326+
output_tokens=delta.token_usage.output_tokens,
327+
)
328+
for model_name, delta in usage_deltas.items()
329+
]
330+
331+
with TelemetryHandler(source_client_version=_CLIENT_VERSION, session_id=group_id) as telemetry_handler:
332+
for event in events:
333+
telemetry_handler.enqueue(event)

src/data_designer/engine/models/registry.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from data_designer.engine.model_provider import ModelProvider, ModelProviderRegistry
1010
from data_designer.engine.models.facade import ModelFacade
1111
from data_designer.engine.models.litellm_overrides import apply_litellm_patches
12+
from data_designer.engine.models.usage import ModelUsageStats, RequestUsageStats, TokenUsageStats
1213
from data_designer.engine.secret_resolver import SecretResolver
1314

1415
logger = logging.getLogger(__name__)
@@ -25,7 +26,7 @@ def __init__(
2526
self._secret_resolver = secret_resolver
2627
self._model_provider_registry = model_provider_registry
2728
self._model_configs = {}
28-
self._models = {}
29+
self._models: dict[str, ModelFacade] = {}
2930
self._set_model_configs(model_configs)
3031

3132
@property
@@ -69,6 +70,31 @@ def get_model_usage_stats(self, total_time_elapsed: float) -> dict[str, dict]:
6970
if model.usage_stats.has_usage
7071
}
7172

73+
def get_model_usage_snapshot(self) -> dict[str, ModelUsageStats]:
74+
return {
75+
model.model_name: model.usage_stats.model_copy(deep=True)
76+
for model in self._models.values()
77+
if model.usage_stats.has_usage
78+
}
79+
80+
def get_usage_deltas(self, snapshot: dict[str, ModelUsageStats]) -> dict[str, ModelUsageStats]:
81+
deltas = {}
82+
for model_name, current in self.get_model_usage_snapshot().items():
83+
prev = snapshot.get(model_name)
84+
delta_input = current.token_usage.input_tokens - (prev.token_usage.input_tokens if prev else 0)
85+
delta_output = current.token_usage.output_tokens - (prev.token_usage.output_tokens if prev else 0)
86+
delta_successful = current.request_usage.successful_requests - (
87+
prev.request_usage.successful_requests if prev else 0
88+
)
89+
delta_failed = current.request_usage.failed_requests - (prev.request_usage.failed_requests if prev else 0)
90+
91+
if delta_input > 0 or delta_output > 0 or delta_successful > 0 or delta_failed > 0:
92+
deltas[model_name] = ModelUsageStats(
93+
token_usage=TokenUsageStats(input_tokens=delta_input, output_tokens=delta_output),
94+
request_usage=RequestUsageStats(successful_requests=delta_successful, failed_requests=delta_failed),
95+
)
96+
return deltas
97+
7298
def get_model_provider(self, *, model_alias: str) -> ModelProvider:
7399
model_config = self.get_model_config(model_alias=model_alias)
74100
return self._model_provider_registry.get_provider(model_config.provider)

0 commit comments

Comments
 (0)