Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions src/intugle/adapters/types/bigquery/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time

from typing import TYPE_CHECKING, Any, Optional

import numpy as np
Expand All @@ -18,11 +19,17 @@
try:
from google.cloud import bigquery
from google.oauth2 import service_account
from sqlglot import transpile
GOOGLE_BIGQUERY_AVAILABLE = True
except ImportError:
GOOGLE_BIGQUERY_AVAILABLE = False

BIGQUERY_AVAILABLE = True
try:
from sqlglot import exp, transpile
SQLGLOT_AVAILABLE = True
except ImportError:
BIGQUERY_AVAILABLE = False
SQLGLOT_AVAILABLE = False

BIGQUERY_AVAILABLE = GOOGLE_BIGQUERY_AVAILABLE and SQLGLOT_AVAILABLE


class BigQueryAdapter(Adapter):
Expand Down Expand Up @@ -98,15 +105,8 @@ def connect(self):
def _get_fqn(self, identifier: str) -> str:
"""Gets the fully qualified name for a table identifier."""
if "." in identifier:
# Already has project or dataset prefix
parts = identifier.split(".")
if len(parts) == 2:
# dataset.table format
return f"`{self._project_id}.{identifier}`"
elif len(parts) == 3:
# project.dataset.table format
return f"`{identifier}`"
return f"`{self._project_id}.{self._dataset_id}.{identifier}`"
return exp.to_table(identifier).sql(dialect="bigquery")
return exp.to_table(identifier, db=self._dataset_id, catalog=self._project_id).sql(dialect="bigquery")

@staticmethod
def check_data(data: Any) -> BigQueryConfig:
Expand Down
15 changes: 3 additions & 12 deletions src/intugle/adapters/types/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
DATABRICKS_SQL_AVAILABLE = False

try:
from sqlglot import transpile
from sqlglot import exp, transpile
SQLGLOT_AVAILABLE = True
except ImportError:
SQLGLOT_AVAILABLE = False
Expand Down Expand Up @@ -136,19 +136,10 @@ def connect(self):

def _get_fqn(self, identifier: str) -> str:
"""Gets the fully qualified name for a table identifier."""
# An identifier is already fully qualified if it contains a dot.
if "." in identifier:
return identifier
return exp.to_table(identifier).sql(dialect="databricks")

# Backticks are used to handle reserved keywords and special characters.
safe_schema = f"`{self._schema}`"
safe_identifier = f"`{identifier}`"

if self.catalog:
safe_catalog = f"`{self.catalog}`"
return f"{safe_catalog}.{safe_schema}.{safe_identifier}"

return f"{safe_schema}.{safe_identifier}"
return exp.to_table(identifier, db=self._schema, catalog=self.catalog).sql(dialect="databricks")

@staticmethod
def check_data(data: Any) -> DatabricksConfig:
Expand Down
7 changes: 3 additions & 4 deletions src/intugle/adapters/types/mariadb/mariadb.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
MARIADB_CONNECTOR_AVAILABLE = False

try:
from sqlglot import transpile
from sqlglot import exp, transpile

SQLGLOT_AVAILABLE = True
except Exception:
Expand Down Expand Up @@ -112,9 +112,8 @@ def _quote_id(identifier: str) -> str:

def _get_fqn(self, identifier: str) -> str:
if "." in identifier:
parts = identifier.split(".")
return ".".join(self._quote_id(p) for p in parts)
return f"{self._quote_id(self.database)}.{self._quote_id(identifier)}"
return exp.to_table(identifier).sql(dialect="mysql") # MariaDB uses MySQL dialect in sqlglot
return exp.to_table(identifier, db=self._database).sql(dialect="mysql")

@staticmethod
def check_data(data: Any) -> MariaDBConfig:
Expand Down
7 changes: 4 additions & 3 deletions src/intugle/adapters/types/oracle/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
ORACLE_AVAILABLE = False

try:
from sqlglot import transpile
from sqlglot import exp, transpile

SQLGLOT_AVAILABLE = True
except ImportError:
Expand Down Expand Up @@ -103,9 +103,10 @@ def connect(self):
cursor.execute(f"ALTER SESSION SET CURRENT_SCHEMA = {params.schema_}")

def _get_fqn(self, identifier: str) -> str:
"""Gets the fully qualified name for a table identifier."""
if "." in identifier:
return identifier.upper() # Oracle identifiers are case-insensitive/upper by default unless quoted
return f'"{self._schema}"."{identifier}"'
return exp.to_table(identifier).sql(dialect="oracle")
return exp.to_table(identifier, db=self._schema).sql(dialect="oracle")

@staticmethod
def check_data(data: Any) -> OracleConfig:
Expand Down
6 changes: 3 additions & 3 deletions src/intugle/adapters/types/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
ASYNC_PG_AVAILABLE = False

try:
from sqlglot import transpile
from sqlglot import exp, transpile

SQLGLOT_AVAILABLE = True
except ImportError:
Expand Down Expand Up @@ -150,8 +150,8 @@ async def _connect_async(self):
def _get_fqn(self, identifier: str) -> str:
"""Gets the fully qualified name for a table identifier."""
if "." in identifier:
return identifier
return f'"{self._schema}"."{identifier}"'
return exp.to_table(identifier).sql(dialect="postgres")
return exp.to_table(identifier, db=self._schema).sql(dialect="postgres")

@staticmethod
def check_data(data: Any) -> PostgresConfig:
Expand Down
2 changes: 1 addition & 1 deletion src/intugle/adapters/types/snowflake/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def _sync_metadata(self, manifest: "Manifest"):
# Apply comments and tags to tables and columns
for source in manifest.sources.values():
# Construct the fully qualified table name using details from profiles.yml
full_table_name = f"{database}.{schema}.{source.table.name}"
full_table_name = f"{quote_identifier(database)}.{quote_identifier(schema)}.{quote_identifier(source.table.name)}"

# Set table comment
if source.table.description:
Expand Down
6 changes: 3 additions & 3 deletions src/intugle/adapters/types/sqlserver/sqlserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
MSSQL_PYTHON_AVAILABLE = False

try:
from sqlglot import transpile
from sqlglot import exp, transpile

SQLGLOT_AVAILABLE = True
except ImportError:
Expand Down Expand Up @@ -112,8 +112,8 @@ def connect(self):
def _get_fqn(self, identifier: str) -> str:
"""Gets the fully qualified name for a table identifier."""
if "." in identifier:
return identifier
return f'[{self._schema}].[{identifier}]'
return exp.to_table(identifier).sql(dialect="tsql")
return exp.to_table(identifier, db=self._schema).sql(dialect="tsql")

@staticmethod
def check_data(data: Any) -> SQLServerConfig:
Expand Down
4 changes: 2 additions & 2 deletions src/intugle/data_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def get_all_field_details(self) -> dict[str, FieldDetailsModel]:
datatype_l1=column.type,
datatype_l2=column.category,
sql_code=f"\"{source.table.name}\".\"{column.name}\"",
is_pii=False,
is_pii="pii" in [t.lower() for t in (column.tags or [])],
asset_id=source.table.name,
asset_name=source.table.name,
asset_details={},
Expand Down Expand Up @@ -235,7 +235,7 @@ def field_details_fetcher(ids: list[str]):
datatype_l1=column_detail.type,
datatype_l2=column_detail.category,
sql_code=f"\"{table}\".\"{column}\"",
is_pii=False,
is_pii="pii" in [t.lower() for t in (column_detail.tags or [])],
asset_id=table,
asset_name=table,
asset_details={},
Expand Down
2 changes: 1 addition & 1 deletion src/intugle/mcp/docs_search/service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import os

from typing import Any, Dict, List, Optional
from typing import List, Optional

import aiohttp

Expand Down
4 changes: 2 additions & 2 deletions src/intugle/semantic_model.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List

import pandas as pd
Expand All @@ -11,7 +12,6 @@
from intugle.link_predictor.predictor import LinkPredictor
from intugle.semantic_search import SemanticSearch
from intugle.utils.files import update_relationship_file_mtime
from pathlib import Path

if TYPE_CHECKING:
from intugle.adapters.adapter import Adapter
Expand Down Expand Up @@ -83,6 +83,7 @@ def _initialize_from_list(self, data_list: List[DataSet]):
"DataSet objects provided in a list must have a 'name' attribute."
)
self.datasets[dataset.name] = dataset

def _initialize_from_folder(self, folder_path: str):
"""
Initialize datasets by scanning a folder (recursively) for supported data files.
Expand Down Expand Up @@ -133,7 +134,6 @@ def _initialize_from_folder(self, folder_path: str):
f"No supported data files (.csv, .parquet, .xlsx) found in directory: {folder_path}"
)


def profile(self, force_recreate: bool = False):
"""
Runs the data profiling pipeline for all contained datasets.
Expand Down
6 changes: 4 additions & 2 deletions tests/adapters/test_bigquery_adapter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from unittest.mock import MagicMock, patch

import pytest
from unittest.mock import MagicMock, patch, PropertyMock

from intugle.adapters.types.bigquery.bigquery import (
BIGQUERY_AVAILABLE,
BigQueryAdapter,
can_handle_bigquery,
BIGQUERY_AVAILABLE,
)
from intugle.adapters.types.bigquery.models import BigQueryConfig, BigQueryConnectionConfig

Expand Down
4 changes: 3 additions & 1 deletion tests/adapters/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import numpy as np
from intugle.adapters.utils import convert_to_native
import pytest

from intugle.adapters.utils import convert_to_native


def test_numpy_scalar_int():
value = np.int64(10)
result = convert_to_native(value)
Expand Down
Loading