Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -410,18 +410,41 @@ def _get_columns_to_profile(self) -> List[str]:
if not self.config.any_field_level_metrics_enabled():
return []

# Get columns to ignore due to tags
(
ignore_table_profiling,
columns_list_to_ignore_profiling,
) = _get_columns_to_ignore_profiling(
self.dataset_name,
self.config.tags_to_ignore_profiling,
self.platform,
self.env,
)

# If the entire table is tagged to ignore profiling, return empty list
if ignore_table_profiling:
self.report.report_dropped(
f"The profile of table {self.dataset_name} (table is tagged with tags_to_ignore_profiling)"
)
return []

# Compute columns to profile
columns_to_profile: List[str] = []

# Compute ignored columns
ignored_columns_by_pattern: List[str] = []
ignored_columns_by_type: List[str] = []
ignored_columns_by_tags: List[str] = []

for col_dict in self.dataset.columns:
col = col_dict["name"]
self.column_types[col] = str(col_dict["type"])

# Check if column is tagged to ignore profiling
if col in columns_list_to_ignore_profiling:
ignored_columns_by_tags.append(col)
# We expect the allow/deny patterns to specify '<table_pattern>.<column_pattern>'
if (
elif (
not self.config._allow_deny_patterns.allowed(
f"{self.dataset_name}.{col}"
)
Expand All @@ -442,6 +465,10 @@ def _get_columns_to_profile(self) -> List[str]:
self.report.report_dropped(
f"The profile of columns by type {self.dataset_name}({', '.join(sorted(ignored_columns_by_type))})"
)
if ignored_columns_by_tags:
self.report.report_dropped(
f"The profile of columns by tags {self.dataset_name}({', '.join(sorted(ignored_columns_by_tags))})"
)

if self.config.max_number_of_fields_to_profile is not None:
if len(columns_to_profile) > self.config.max_number_of_fields_to_profile:
Expand Down Expand Up @@ -1696,3 +1723,55 @@ def _get_columns_to_ignore_sampling(
)

return ignore_table, columns_to_ignore


def _get_columns_to_ignore_profiling(
dataset_name: str, tags_to_ignore: Optional[List[str]], platform: str, env: str
) -> Tuple[bool, List[str]]:
logger.debug("Collecting columns to ignore for profiling")

ignore_table: bool = False
columns_to_ignore: List[str] = []

if not tags_to_ignore:
return ignore_table, columns_to_ignore

try:
dataset_urn = mce_builder.make_dataset_urn(
name=dataset_name, platform=platform, env=env
)

datahub_graph = get_default_graph(ClientMode.INGESTION)

dataset_tags = datahub_graph.get_tags(dataset_urn)
if dataset_tags:
ignore_table = any(
tag_association.tag.split("urn:li:tag:")[1] in tags_to_ignore
for tag_association in dataset_tags.tags
if "urn:li:tag:" in tag_association.tag
and len(tag_association.tag.split("urn:li:tag:")) > 1
)

if not ignore_table:
metadata = datahub_graph.get_aspect(
entity_urn=dataset_urn, aspect_type=EditableSchemaMetadata
)

if metadata:
for schemaField in metadata.editableSchemaFieldInfo:
if schemaField.globalTags:
columns_to_ignore.extend(
schemaField.fieldPath
for tag_association in schemaField.globalTags.tags
if "urn:li:tag:" in tag_association.tag
and len(tag_association.tag.split("urn:li:tag:")) > 1
and tag_association.tag.split("urn:li:tag:")[1]
in tags_to_ignore
)

except Exception as e:
logger.warning(f"Error fetching tags for profiling ignore logic: {e}")
# Return default values on error - don't ignore anything
return False, []

return ignore_table, columns_to_ignore
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,15 @@ class GEProfilingConfig(GEProfilingBaseConfig):
),
)

tags_to_ignore_profiling: Optional[List[str]] = pydantic.Field(
default=None,
description=(
"Fixed list of tags to completely ignore profiling for columns with these tags."
" Columns with these tags will be excluded from all profiling activities."
" If not specified, all columns will be profiled based on other configuration."
),
)

profile_nested_fields: bool = Field(
default=False,
description="Whether to profile complex types like structs, arrays and maps. ",
Expand Down
Empty file.
144 changes: 144 additions & 0 deletions metadata-ingestion/tests/unit/ge_profiling/test_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import pytest

from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig


def test_profile_table_level_only():
config = GEProfilingConfig.model_validate(
{"enabled": True, "profile_table_level_only": True}
)
assert config.any_field_level_metrics_enabled() is False

config = GEProfilingConfig.model_validate(
{
"enabled": True,
"profile_table_level_only": True,
"include_field_max_value": False,
}
)
assert config.any_field_level_metrics_enabled() is False


def test_profile_table_level_only_fails_with_field_metric_enabled():
with pytest.raises(
ValueError,
match="Cannot enable field-level metrics if profile_table_level_only is set",
):
GEProfilingConfig.model_validate(
{
"enabled": True,
"profile_table_level_only": True,
"include_field_max_value": True,
}
)


def test_tags_to_ignore_profiling_config():
"""Test that tags_to_ignore_profiling configuration is properly parsed."""
config = GEProfilingConfig.model_validate(
{
"enabled": True,
"tags_to_ignore_profiling": ["PII", "Sensitive"],
}
)
assert config.tags_to_ignore_profiling == ["PII", "Sensitive"]

# Test with None (default)
config = GEProfilingConfig.model_validate({"enabled": True})
assert config.tags_to_ignore_profiling is None


def test_tags_to_ignore_profiling_vs_sampling():
"""Test that both tags_to_ignore_profiling and tags_to_ignore_sampling can be configured."""
config = GEProfilingConfig.model_validate(
{
"enabled": True,
"tags_to_ignore_profiling": ["PII", "Sensitive"],
"tags_to_ignore_sampling": ["HighCardinality"],
}
)
assert config.tags_to_ignore_profiling == ["PII", "Sensitive"]
assert config.tags_to_ignore_sampling == ["HighCardinality"]


def test_tags_to_ignore_profiling_empty_list():
"""Test that tags_to_ignore_profiling can be an empty list."""
config = GEProfilingConfig.model_validate(
{
"enabled": True,
"tags_to_ignore_profiling": [],
}
)
assert config.tags_to_ignore_profiling == []


def test_tags_to_ignore_profiling_single_tag():
"""Test that tags_to_ignore_profiling works with a single tag."""
config = GEProfilingConfig.model_validate(
{
"enabled": True,
"tags_to_ignore_profiling": ["PII"],
}
)
assert config.tags_to_ignore_profiling == ["PII"]


def test_tags_to_ignore_profiling_multiple_tags():
"""Test that tags_to_ignore_profiling works with multiple tags."""
config = GEProfilingConfig.model_validate(
{
"enabled": True,
"tags_to_ignore_profiling": [
"PII",
"Sensitive",
"Confidential",
"Internal",
],
}
)
assert config.tags_to_ignore_profiling == [
"PII",
"Sensitive",
"Confidential",
"Internal",
]


def test_tags_to_ignore_profiling_with_other_config():
"""Test that tags_to_ignore_profiling works alongside other profiling configuration."""
config = GEProfilingConfig.model_validate(
{
"enabled": True,
"tags_to_ignore_profiling": ["PII"],
"include_field_null_count": True,
"include_field_distinct_count": False,
"max_number_of_fields_to_profile": 100,
"profile_table_size_limit": 5000000,
}
)
assert config.tags_to_ignore_profiling == ["PII"]
assert config.include_field_null_count is True
assert config.include_field_distinct_count is False
assert config.max_number_of_fields_to_profile == 100
assert config.profile_table_size_limit == 5000000


def test_tags_to_ignore_profiling_validation():
"""Test validation of tags_to_ignore_profiling field."""
# Test that non-list values are rejected
with pytest.raises(ValueError):
GEProfilingConfig.model_validate(
{
"enabled": True,
"tags_to_ignore_profiling": "PII", # Should be a list, not string
}
)

# Test that non-string list items are rejected
with pytest.raises(ValueError):
GEProfilingConfig.model_validate(
{
"enabled": True,
"tags_to_ignore_profiling": ["PII", 123], # 123 is not a string
}
)
Loading
Loading