Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,37 @@ data-designer config list # View current settings

---

## Telemetry

Data Designer collects telemetry to help us improve the library for developers. We collect:

* The names of models used
* The count of input tokens
* The count of output tokens

**No user or device information is collected.** This data is not used to track any individual user behavior. It is used to see an aggregation of which models are the most popular for SDG. We will share this usage data with the community.

Specifically, a model name that is defined a `ModelConfig` object, is what will be collected. In the below example config:

```python
ModelConfig(
alias="nv-reasoning",
model="openai/gpt-oss-20b",
provider="nvidia",
inference_parameters=InferenceParameters(
temperature=0.3,
top_p=0.9,
max_tokens=4096,
),
)
```

The value `openai/gpt-oss-20b` would be collected.

To disable telemetry capture, set `NEMO_TELEMETRY_ENABLED=false`.

---

## License

Apache License 2.0 – see [LICENSE](LICENSE) for details.
Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ env = [
"DISABLE_DATA_DESIGNER_PLUGINS=true",
]

[tool.coverage.run]
omit = [
"src/data_designer/engine/models/telemetry.py",
]

[tool.uv]
package = true
required-version = ">=0.7.10"
Expand Down
52 changes: 47 additions & 5 deletions src/data_designer/engine/dataset_builders/column_wise_builder.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import functools
import importlib.metadata
import json
import logging
import time
import uuid
from pathlib import Path
from typing import Callable
from typing import TYPE_CHECKING, Callable

import pandas as pd

Expand Down Expand Up @@ -35,14 +38,21 @@
from data_designer.engine.dataset_builders.utils.dataset_batch_manager import (
DatasetBatchManager,
)
from data_designer.engine.models.telemetry import InferenceEvent, NemoSourceEnum, TaskStatusEnum, TelemetryHandler
from data_designer.engine.processing.processors.base import Processor
from data_designer.engine.processing.processors.drop_columns import DropColumnsProcessor
from data_designer.engine.registry.data_designer_registry import DataDesignerRegistry
from data_designer.engine.resources.resource_provider import ResourceProvider

if TYPE_CHECKING:
from data_designer.engine.models.usage import ModelUsageStats

logger = logging.getLogger(__name__)


_CLIENT_VERSION: str = importlib.metadata.version("data_designer")


class ColumnWiseDatasetBuilder:
def __init__(
self,
Expand Down Expand Up @@ -89,11 +99,12 @@ def build(

generators = self._initialize_generators()
start_time = time.perf_counter()
group_id = uuid.uuid4().hex

self.batch_manager.start(num_records=num_records, buffer_size=buffer_size)
for batch_idx in range(self.batch_manager.num_batches):
logger.info(f"⏳ Processing batch {batch_idx + 1} of {self.batch_manager.num_batches}")
self._run_batch(generators)
self._run_batch(generators, batch_mode="batch", group_id=group_id)
df_batch = self._run_processors(
stage=BuildStage.POST_BATCH,
dataframe=self.batch_manager.get_current_batch(as_dataframe=True),
Expand All @@ -114,10 +125,10 @@ def build_preview(self, *, num_records: int) -> pd.DataFrame:
self._run_model_health_check_if_needed()

generators = self._initialize_generators()

group_id = uuid.uuid4().hex
start_time = time.perf_counter()
self.batch_manager.start(num_records=num_records, buffer_size=num_records)
self._run_batch(generators, save_partial_results=False)
self._run_batch(generators, batch_mode="preview", save_partial_results=False, group_id=group_id)
dataset = self.batch_manager.get_current_batch(as_dataframe=True)
self.batch_manager.reset()

Expand All @@ -143,7 +154,10 @@ def _initialize_generators(self) -> list[ColumnGenerator]:
for config in self._column_configs
]

def _run_batch(self, generators: list[ColumnGenerator], *, save_partial_results: bool = True) -> None:
def _run_batch(
self, generators: list[ColumnGenerator], *, batch_mode: str, save_partial_results: bool = True, group_id: str
) -> None:
pre_batch_snapshot = self._resource_provider.model_registry.get_model_usage_snapshot()
for generator in generators:
generator.log_pre_generation()
try:
Expand All @@ -166,6 +180,12 @@ def _run_batch(self, generators: list[ColumnGenerator], *, save_partial_results:
)
raise DatasetGenerationError(f"🛑 Failed to process {column_error_str}:\n{e}")

try:
usage_deltas = self._resource_provider.model_registry.get_usage_deltas(pre_batch_snapshot)
self._emit_batch_inference_events(batch_mode, usage_deltas, group_id)
except Exception:
pass

def _run_from_scratch_column_generator(self, generator: ColumnGenerator) -> None:
df = generator.generate_from_scratch(self.batch_manager.num_records_batch)
self.batch_manager.add_records(df.to_dict(orient="records"))
Expand Down Expand Up @@ -289,3 +309,25 @@ def _write_configs(self) -> None:
json_file_name="model_configs.json",
configs=self._resource_provider.model_registry.model_configs.values(),
)

def _emit_batch_inference_events(
self, batch_mode: str, usage_deltas: dict[str, ModelUsageStats], group_id: str
) -> None:
if not usage_deltas:
return

events = [
InferenceEvent(
nemo_source=NemoSourceEnum.DATADESIGNER,
task=batch_mode,
task_status=TaskStatusEnum.SUCCESS,
model=model_name,
input_tokens=delta.token_usage.input_tokens,
output_tokens=delta.token_usage.output_tokens,
)
for model_name, delta in usage_deltas.items()
]

with TelemetryHandler(source_client_version=_CLIENT_VERSION, session_id=group_id) as telemetry_handler:
for event in events:
telemetry_handler.enqueue(event)
28 changes: 27 additions & 1 deletion src/data_designer/engine/models/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from data_designer.engine.model_provider import ModelProvider, ModelProviderRegistry
from data_designer.engine.models.facade import ModelFacade
from data_designer.engine.models.litellm_overrides import apply_litellm_patches
from data_designer.engine.models.usage import ModelUsageStats, RequestUsageStats, TokenUsageStats
from data_designer.engine.secret_resolver import SecretResolver

logger = logging.getLogger(__name__)
Expand All @@ -25,7 +26,7 @@ def __init__(
self._secret_resolver = secret_resolver
self._model_provider_registry = model_provider_registry
self._model_configs = {}
self._models = {}
self._models: dict[str, ModelFacade] = {}
self._set_model_configs(model_configs)

@property
Expand Down Expand Up @@ -69,6 +70,31 @@ def get_model_usage_stats(self, total_time_elapsed: float) -> dict[str, dict]:
if model.usage_stats.has_usage
}

def get_model_usage_snapshot(self) -> dict[str, ModelUsageStats]:
return {
model.model_name: model.usage_stats.model_copy(deep=True)
for model in self._models.values()
if model.usage_stats.has_usage
}

def get_usage_deltas(self, snapshot: dict[str, ModelUsageStats]) -> dict[str, ModelUsageStats]:
deltas = {}
for model_name, current in self.get_model_usage_snapshot().items():
prev = snapshot.get(model_name)
delta_input = current.token_usage.input_tokens - (prev.token_usage.input_tokens if prev else 0)
delta_output = current.token_usage.output_tokens - (prev.token_usage.output_tokens if prev else 0)
delta_successful = current.request_usage.successful_requests - (
prev.request_usage.successful_requests if prev else 0
)
delta_failed = current.request_usage.failed_requests - (prev.request_usage.failed_requests if prev else 0)

if delta_input > 0 or delta_output > 0 or delta_successful > 0 or delta_failed > 0:
deltas[model_name] = ModelUsageStats(
token_usage=TokenUsageStats(input_tokens=delta_input, output_tokens=delta_output),
request_usage=RequestUsageStats(successful_requests=delta_successful, failed_requests=delta_failed),
)
return deltas

def get_model_provider(self, *, model_alias: str) -> ModelProvider:
model_config = self.get_model_config(model_alias=model_alias)
return self._model_provider_registry.get_provider(model_config.provider)
Expand Down
Loading
Loading