Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion notebooks/upstream.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -750,8 +750,14 @@
}
],
"metadata": {
"kernelspec": {
"display_name": "data-tools",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python"
"name": "python",
"version": "3.12.3"
}
},
"nbformat": 4,
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies = [
"langchain-nvidia-ai-endpoints>=0.3.16",
"langchain-xai>=0.2.5",
"langchain-perplexity>=0.1.2",
"duckdb>=1.3.2",
]

[dependency-groups]
Expand Down
33 changes: 33 additions & 0 deletions src/data_tools/adapters/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from abc import ABC, abstractmethod
from typing import Any

from data_tools.adapters.models import (
ColumnProfile,
ProfilingOutput,
)


class Adapter(ABC):
@abstractmethod
def profile(self, data: Any) -> ProfilingOutput:
pass

@abstractmethod
def column_profile(
self,
data: Any,
table_name: str,
column_name: str,
total_count: int,
sample_limit: int = 10,
dtype_sample_limit: int = 10000,
) -> ColumnProfile:
pass

@abstractmethod
def load():
...

@abstractmethod
def execute():
raise NotImplementedError()
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from typing import Any, Callable

from .dataframe import DataFrame
from .adapter import Adapter


class ModuleInterface:
Expand All @@ -18,13 +18,14 @@ def import_module(name: str) -> ModuleInterface:
return importlib.import_module(name) # type: ignore


DEFAULT_PLUGINS = ["data_tools.dataframes.types.pandas.pandas"]
DEFAULT_PLUGINS = [
"data_tools.adapters.types.pandas.pandas",
"data_tools.adapters.types.duckdb.duckdb",
]


class DataFrameFactory:
dataframe_funcs: dict[
str, tuple[Callable[[Any], bool], Callable[..., DataFrame]]
] = {}
class AdapterFactory:
dataframe_funcs: dict[str, tuple[Callable[[Any], bool], Callable[..., Adapter]]] = {}

# LOADER
def __init__(self, plugins: list[dict] = None):
Expand All @@ -42,7 +43,7 @@ def register(
cls,
env_type: str,
checker_fn: Callable[[Any], bool],
creator_fn: Callable[..., DataFrame],
creator_fn: Callable[..., Adapter],
) -> None:
"""Register a new execution engine type"""
cls.dataframe_funcs[env_type] = (checker_fn, creator_fn)
Expand All @@ -53,7 +54,7 @@ def unregister(cls, env_type: str) -> None:
cls.dataframe_funcs.pop(env_type, None)

@classmethod
def create(cls, df: Any) -> DataFrame:
def create(cls, df: Any) -> Adapter:
"""Create a execution engine type"""
for checker_fn, creator_fn in cls.dataframe_funcs.values():
if checker_fn(df):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ColumnProfile(BaseModel):
)
datatype_l2: Optional[str] = Field(
default=None,
description="The inferred data type category (dimension/measure) for the column, based on the datatype l1 results",
description="The inferred data type category (dimension/measure) for the column, based on the datatype l1 results", # noqa: E501
)
business_glossary: Optional[str] = Field(
default=None, description="The business glossary entry for the column, if available."
Expand Down Expand Up @@ -114,9 +114,7 @@ class KeyIdentificationOutput(BaseModel):

"""

column_name: Optional[str] = Field(
default=None, description="The name of the column identified as a primary key."
)
column_name: Optional[str] = Field(default=None, description="The name of the column identified as a primary key.")


class ColumnGlossary(BaseModel):
Expand Down
211 changes: 211 additions & 0 deletions src/data_tools/adapters/types/duckdb/duckdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
import time

from typing import Any, Optional

import duckdb
import numpy as np
import pandas as pd
import pandas.api.types as ptypes

from data_tools.adapters.adapter import Adapter
from data_tools.adapters.factory import AdapterFactory
from data_tools.adapters.models import (
ColumnProfile,
ProfilingOutput,
)
from data_tools.adapters.types.duckdb.models import DuckdbConfig
from data_tools.adapters.types.pandas.utils import convert_to_native
from data_tools.common.exception import errors
from data_tools.core.utilities.processing import string_standardization


class DuckdbAdapter(Adapter):
def profile(self, data: DuckdbConfig) -> ProfilingOutput:
"""
Generates a profile of a file.

Args:
df: The input pandas DataFrame.

Returns:
A pydantic model containing the profile information:
- "count": Total number of rows.
- "columns": List of column names.
- "dtypes": A dictionary mapping column names to generalized data types.
"""
if not isinstance(data, DuckdbConfig):
raise TypeError("Input must be a duckdb config.")

def __format_dtype__(dtype: Any) -> str:
"""Maps dtype to a generalized type string."""
type_map = {
"VARCHAR": "string",
"DATE": "date & time",
"BIGINT": "integer",
"DOUBLE": "float",
"FLOAT": "float",
}
return type_map.get(dtype, "string")

table_name = "__profile_table__"
self.load(data, table_name)

# Fetching total count of table
query = f"""
SELECT count(*) as count FROM {table_name}
"""
data = duckdb.execute(query).fetchall()

total_count = data[0][0]

# Fetching column name and their data types of table
query = """
SELECT column_name, data_type
FROM duckdb_columns()
WHERE table_name = ?
"""
data = duckdb.execute(query, [table_name]).fetchall()

dtypes = {col: __format_dtype__(dtype) for col, dtype in data}
columns = [col for col, _ in data]

return ProfilingOutput(
count=total_count,
columns=columns,
dtypes=dtypes,
)

def column_profile(
self,
data: DuckdbConfig,
table_name: str,
column_name: str,
total_count: int,
sample_limit: int = 10,
dtype_sample_limit: int = 10000,
) -> Optional[ColumnProfile]:
"""
Generates a detailed profile for a single column of a pandas DataFrame.

It calculates null and distinct counts, and generates two types of samples:
1. `sample_data`: A sample of unique values.
2. `dtype_sample`: A potentially larger sample combining unique values with
random non-unique values, intended for data type analysis.

Args:
df: The input pandas DataFrame.
column_name: The name of the column to profile.
sample_limit: The desired number of items for the data samples.

Returns:
A dictionary containing the profile for the column, or None if the
column does not exist.
"""
if not isinstance(data, DuckdbConfig):
raise TypeError("Input must be a duckdb config.")

self.load(data, table_name)

start_ts = time.time()

# --- Calculations --- #
query = f"""
SELECT
COUNT(DISTINCT {column_name}) AS distinct_count,
SUM(CASE WHEN {column_name} IS NULL THEN 1 ELSE 0 END) AS null_count
FROM
{table_name}
"""
distinct_null_data = duckdb.execute(query).fetchall()

distinct_count, null_count = distinct_null_data[0]
not_null_count = total_count - null_count

# --- Sampling Logic --- #
# 1. Get a sample of distinct values.
sample_query = f"""
SELECT
DISTINCT CAST( {column_name} AS VARCHAR) AS sample_values
FROM
{table_name}
WHERE
{column_name} IS NOT NULL LIMIT {dtype_sample_limit}
"""
data = duckdb.execute(sample_query).fetchall()
distinct_values = [d[0] for d in data]

not_null_series = pd.Series(distinct_values)

if distinct_count > 0:
distinct_sample_size = min(distinct_count, dtype_sample_limit)
sample_data = list(np.random.choice(distinct_values, distinct_sample_size, replace=False))
else:
sample_data = []

# 2. Create a combined sample for data type analysis.
dtype_sample = None
if distinct_count >= dtype_sample_limit:
# If we have enough distinct values, that's the best sample.
dtype_sample = sample_data
elif distinct_count > 0 and not_null_count > 0:
# If distinct values are few, supplement them with random non-distinct values.
remaining_sample_size = dtype_sample_limit - distinct_count

# Use replace=True in case the number of non-null values is less than the remaining sample size needed.
additional_samples = list(not_null_series.sample(n=remaining_sample_size, replace=True))

# Combine the full set of unique values with the additional random samples.
dtype_sample = list(distinct_values) + additional_samples
else:
dtype_sample = []

# --- Convert numpy types to native Python types for JSON compatibility --- #
native_sample_data = convert_to_native(sample_data)
native_dtype_sample = convert_to_native(dtype_sample)

business_name = string_standardization(column_name)

# --- Final Profile --- #
return ColumnProfile(
column_name=column_name,
business_name=business_name,
table_name=table_name,
null_count=null_count,
count=total_count,
distinct_count=distinct_count,
uniqueness=distinct_count / total_count if total_count > 0 else 0.0,
completeness=not_null_count / total_count if total_count > 0 else 0.0,
sample_data=native_sample_data[:sample_limit],
dtype_sample=native_dtype_sample,
ts=time.time() - start_ts,
)

@staticmethod
def _get_load_func(data: DuckdbConfig):
func = {
"csv": "read_csv",
"parquet": "read_parquet",
"xlsx": "read_xlsx",
}
ld_func = func.get(data.type)
if ld_func is None:
raise errors.NotFoundError(f"Type: {data.type} not supported")

return f"{ld_func}('{data.path}')"

def load(self, data: DuckdbConfig, table_name: str):
ld_func = self._get_load_func(data)

query = f"""CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM {ld_func};"""

duckdb.execute(query)

def execute(): ...


def can_handle_pandas(df: Any) -> bool:
return isinstance(df, DuckdbConfig)


def register(factory: AdapterFactory):
factory.register("duckdb", can_handle_pandas, DuckdbAdapter)
6 changes: 6 additions & 0 deletions src/data_tools/adapters/types/duckdb/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from data_tools.common.schema import SchemaBase


class DuckdbConfig(SchemaBase):
path: str
type: str
Loading