Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion notebooks/upstream.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -750,8 +750,14 @@
}
],
"metadata": {
"kernelspec": {
"display_name": "data-tools",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python"
"name": "python",
"version": "3.12.3"
}
},
"nbformat": 4,
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies = [
"langchain-nvidia-ai-endpoints>=0.3.16",
"langchain-xai>=0.2.5",
"langchain-perplexity>=0.1.2",
"duckdb>=1.3.2",
]

[dependency-groups]
Expand Down
33 changes: 33 additions & 0 deletions src/data_tools/adapters/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from abc import ABC, abstractmethod
from typing import Any

from data_tools.adapters.models import (
ColumnProfile,
ProfilingOutput,
)


class Adapter(ABC):
@abstractmethod
def profile(self, data: Any) -> ProfilingOutput:
pass

@abstractmethod
def column_profile(
self,
data: Any,
table_name: str,
column_name: str,
total_count: int,
sample_limit: int = 10,
dtype_sample_limit: int = 10000,
) -> ColumnProfile:
pass

@abstractmethod
def load():
...

@abstractmethod
def execute():
raise NotImplementedError()
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from typing import Any, Callable

from .dataframe import DataFrame
from .adapter import Adapter


class ModuleInterface:
Expand All @@ -18,13 +18,14 @@ def import_module(name: str) -> ModuleInterface:
return importlib.import_module(name) # type: ignore


DEFAULT_PLUGINS = ["data_tools.dataframes.types.pandas.pandas"]
DEFAULT_PLUGINS = [
"data_tools.adapters.types.pandas.pandas",
"data_tools.adapters.types.duckdb.duckdb",
]


class DataFrameFactory:
dataframe_funcs: dict[
str, tuple[Callable[[Any], bool], Callable[..., DataFrame]]
] = {}
class AdapterFactory:
dataframe_funcs: dict[str, tuple[Callable[[Any], bool], Callable[..., Adapter]]] = {}

# LOADER
def __init__(self, plugins: list[dict] = None):
Expand All @@ -42,7 +43,7 @@ def register(
cls,
env_type: str,
checker_fn: Callable[[Any], bool],
creator_fn: Callable[..., DataFrame],
creator_fn: Callable[..., Adapter],
) -> None:
"""Register a new execution engine type"""
cls.dataframe_funcs[env_type] = (checker_fn, creator_fn)
Expand All @@ -53,7 +54,7 @@ def unregister(cls, env_type: str) -> None:
cls.dataframe_funcs.pop(env_type, None)

@classmethod
def create(cls, df: Any) -> DataFrame:
def create(cls, df: Any) -> Adapter:
"""Create a execution engine type"""
for checker_fn, creator_fn in cls.dataframe_funcs.values():
if checker_fn(df):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ColumnProfile(BaseModel):
)
datatype_l2: Optional[str] = Field(
default=None,
description="The inferred data type category (dimension/measure) for the column, based on the datatype l1 results",
description="The inferred data type category (dimension/measure) for the column, based on the datatype l1 results", # noqa: E501
)
business_glossary: Optional[str] = Field(
default=None, description="The business glossary entry for the column, if available."
Expand Down Expand Up @@ -114,9 +114,7 @@ class KeyIdentificationOutput(BaseModel):

"""

column_name: Optional[str] = Field(
default=None, description="The name of the column identified as a primary key."
)
column_name: Optional[str] = Field(default=None, description="The name of the column identified as a primary key.")


class ColumnGlossary(BaseModel):
Expand Down
211 changes: 211 additions & 0 deletions src/data_tools/adapters/types/duckdb/duckdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
import time

from typing import Any, Optional

import duckdb
import numpy as np
import pandas as pd
import pandas.api.types as ptypes

from data_tools.adapters.adapter import Adapter
from data_tools.adapters.factory import AdapterFactory
from data_tools.adapters.models import (
ColumnProfile,
ProfilingOutput,
)
from data_tools.adapters.types.duckdb.models import DuckdbConfig
from data_tools.adapters.types.pandas.utils import convert_to_native
from data_tools.common.exception import errors
from data_tools.core.utilities.processing import string_standardization


class DuckdbAdapter(Adapter):
def profile(self, data: DuckdbConfig) -> ProfilingOutput:
"""
Generates a profile of a file.

Args:
df: The input pandas DataFrame.

Returns:
A pydantic model containing the profile information:
- "count": Total number of rows.
- "columns": List of column names.
- "dtypes": A dictionary mapping column names to generalized data types.
"""
if not isinstance(data, DuckdbConfig):
raise TypeError("Input must be a duckdb config.")

def __format_dtype__(dtype: Any) -> str:
"""Maps dtype to a generalized type string."""
type_map = {
"VARCHAR": "string",
"DATE": "date & time",
"BIGINT": "integer",
"DOUBLE": "float",
"FLOAT": "float",
}
return type_map.get(dtype, "string")

table_name = "__profile_table__"
self.load(data, table_name)

# Fetching total count of table
query = f"""
SELECT count(*) as count FROM {table_name}
"""
data = duckdb.execute(query).fetchall()

total_count = data[0][0]

# Fetching column name and their data types of table
query = """
SELECT column_name, data_type
FROM duckdb_columns()
WHERE table_name = ?
"""
data = duckdb.execute(query, [table_name]).fetchall()

dtypes = {col: __format_dtype__(dtype) for col, dtype in data}
columns = [col for col, _ in data]

return ProfilingOutput(
count=total_count,
columns=columns,
dtypes=dtypes,
)

def column_profile(
self,
data: DuckdbConfig,
table_name: str,
column_name: str,
total_count: int,
sample_limit: int = 10,
dtype_sample_limit: int = 10000,
) -> Optional[ColumnProfile]:
"""
Generates a detailed profile for a single column of a pandas DataFrame.

It calculates null and distinct counts, and generates two types of samples:
1. `sample_data`: A sample of unique values.
2. `dtype_sample`: A potentially larger sample combining unique values with
random non-unique values, intended for data type analysis.

Args:
df: The input pandas DataFrame.
column_name: The name of the column to profile.
sample_limit: The desired number of items for the data samples.

Returns:
A dictionary containing the profile for the column, or None if the
column does not exist.
"""
if not isinstance(data, DuckdbConfig):
raise TypeError("Input must be a duckdb config.")

self.load(data, table_name)

start_ts = time.time()

# --- Calculations --- #
query = f"""
SELECT
COUNT(DISTINCT {column_name}) AS distinct_count,
SUM(CASE WHEN {column_name} IS NULL THEN 1 ELSE 0 END) AS null_count
FROM
{table_name}
"""
distinct_null_data = duckdb.execute(query).fetchall()

distinct_count, null_count = distinct_null_data[0]
not_null_count = total_count - null_count

# --- Sampling Logic --- #
# 1. Get a sample of distinct values.
sample_query = f"""
SELECT
DISTINCT CAST( {column_name} AS VARCHAR) AS sample_values
FROM
{table_name}
WHERE
{column_name} IS NOT NULL LIMIT {dtype_sample_limit}
"""
data = duckdb.execute(sample_query).fetchall()
distinct_values = [d[0] for d in data]

not_null_series = pd.Series(distinct_values)

if distinct_count > 0:
distinct_sample_size = min(distinct_count, dtype_sample_limit)
sample_data = list(np.random.choice(distinct_values, distinct_sample_size, replace=False))
else:
sample_data = []

# 2. Create a combined sample for data type analysis.
dtype_sample = None
if distinct_count >= dtype_sample_limit:
# If we have enough distinct values, that's the best sample.
dtype_sample = sample_data
elif distinct_count > 0 and not_null_count > 0:
# If distinct values are few, supplement them with random non-distinct values.
remaining_sample_size = dtype_sample_limit - distinct_count

# Use replace=True in case the number of non-null values is less than the remaining sample size needed.
additional_samples = list(not_null_series.sample(n=remaining_sample_size, replace=True))

# Combine the full set of unique values with the additional random samples.
dtype_sample = list(distinct_values) + additional_samples
else:
dtype_sample = []

# --- Convert numpy types to native Python types for JSON compatibility --- #
native_sample_data = convert_to_native(sample_data)
native_dtype_sample = convert_to_native(dtype_sample)

business_name = string_standardization(column_name)

# --- Final Profile --- #
return ColumnProfile(
column_name=column_name,
business_name=business_name,
table_name=table_name,
null_count=null_count,
count=total_count,
distinct_count=distinct_count,
uniqueness=distinct_count / total_count if total_count > 0 else 0.0,
completeness=not_null_count / total_count if total_count > 0 else 0.0,
sample_data=native_sample_data[:sample_limit],
dtype_sample=native_dtype_sample,
ts=time.time() - start_ts,
)

@staticmethod
def _get_load_func(data: DuckdbConfig):
func = {
"csv": "read_csv",
"parquet": "read_parquet",
"xlsx": "read_xlsx",
}
ld_func = func.get(data.type)
if ld_func is None:
raise errors.NotFoundError(f"Type: {data.type} not supported")

return f"{ld_func}('{data.path}')"

def load(self, data: DuckdbConfig, table_name: str):
ld_func = self._get_load_func(data)

query = f"""CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM {ld_func};"""

duckdb.execute(query)

def execute(): ...


def can_handle_pandas(df: Any) -> bool:
return isinstance(df, DuckdbConfig)


def register(factory: AdapterFactory):
factory.register("duckdb", can_handle_pandas, DuckdbAdapter)
6 changes: 6 additions & 0 deletions src/data_tools/adapters/types/duckdb/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from data_tools.common.schema import SchemaBase


class DuckdbConfig(SchemaBase):
path: str
type: str
Loading