From d038ee2d0771942583897031c4ab801ca3b3a7ee Mon Sep 17 00:00:00 2001 From: zak1504 Date: Thu, 1 Jan 2026 13:58:11 +0530 Subject: [PATCH 1/2] feat: Add Amazon Redshift adapter --- examples/redshift_adapter_example.py | 163 ++++++++ pyproject.toml | 4 + src/intugle/adapters/factory.py | 1 + src/intugle/adapters/types/redshift/README.md | 182 +++++++++ .../adapters/types/redshift/__init__.py | 1 + src/intugle/adapters/types/redshift/models.py | 19 + .../adapters/types/redshift/redshift.py | 371 ++++++++++++++++++ tests/adapters/test_redshift_adapter.py | 204 ++++++++++ 8 files changed, 945 insertions(+) create mode 100644 examples/redshift_adapter_example.py create mode 100644 src/intugle/adapters/types/redshift/README.md create mode 100644 src/intugle/adapters/types/redshift/__init__.py create mode 100644 src/intugle/adapters/types/redshift/models.py create mode 100644 src/intugle/adapters/types/redshift/redshift.py create mode 100644 tests/adapters/test_redshift_adapter.py diff --git a/examples/redshift_adapter_example.py b/examples/redshift_adapter_example.py new file mode 100644 index 0000000..b01478e --- /dev/null +++ b/examples/redshift_adapter_example.py @@ -0,0 +1,163 @@ +""" +Example: Using Intugle with Amazon Redshift + +This example demonstrates how to use Intugle to profile and analyze data in Amazon Redshift. + +Prerequisites: +1. Install Intugle with Redshift support: pip install intugle[redshift] +2. Configure your profiles.yml with Redshift connection details +3. Have access to a Redshift cluster with sample data +""" + +from intugle.adapters.types.redshift.models import RedshiftConfig +from intugle.adapters.types.redshift.redshift import RedshiftAdapter +from intugle.analysis.models import DataSet + + +def main(): + # Initialize the Redshift adapter + # This will read configuration from profiles.yml + adapter = RedshiftAdapter() + + print(f"Connected to Redshift source: {adapter.source_name}") + print(f"Database: {adapter.database}") + print(f"Schema: {adapter.schema}") + print("-" * 50) + + # Example 1: Profile a table + print("\n=== Example 1: Profile a Table ===") + customers_config = RedshiftConfig( + identifier="customers", + type="redshift" + ) + + profile = adapter.profile(customers_config, "customers") + print(f"Table: customers") + print(f"Total rows: {profile.count}") + print(f"Columns: {', '.join(profile.columns)}") + print(f"Data types: {profile.dtypes}") + + # Example 2: Profile a specific column + print("\n=== Example 2: Profile a Column ===") + column_profile = adapter.column_profile( + data=customers_config, + table_name="customers", + column_name="customer_id", + total_count=profile.count, + sample_limit=5 + ) + + print(f"Column: {column_profile.column_name}") + print(f"Total count: {column_profile.count}") + print(f"Null count: {column_profile.null_count}") + print(f"Distinct count: {column_profile.distinct_count}") + print(f"Uniqueness: {column_profile.uniqueness:.2%}") + print(f"Completeness: {column_profile.completeness:.2%}") + print(f"Sample data: {column_profile.sample_data}") + + # Example 3: Execute a custom query + print("\n=== Example 3: Execute a Custom Query ===") + query = """ + SELECT + customer_segment, + COUNT(*) as customer_count, + AVG(lifetime_value) as avg_lifetime_value + FROM customers + GROUP BY customer_segment + ORDER BY customer_count DESC + """ + + df = adapter.to_df_from_query(query) + print("Customer segments:") + print(df.to_string(index=False)) + + # Example 4: Create a view from a query + print("\n=== Example 4: Create a View ===") + view_query = """ + SELECT + customer_id, + customer_name, + email, + customer_segment, + lifetime_value + FROM customers + WHERE customer_segment = 'Premium' + AND lifetime_value > 10000 + """ + + adapter.create_table_from_query( + table_name="premium_customers", + query=view_query, + materialize="view" + ) + print("Created view: premium_customers") + + # Verify the view was created + premium_df = adapter.to_df_from_query( + "SELECT COUNT(*) as count FROM premium_customers" + ) + print(f"Premium customers count: {premium_df['count'][0]}") + + # Example 5: Analyze relationship between tables + print("\n=== Example 5: Analyze Table Relationships ===") + customers_dataset = DataSet( + RedshiftConfig(identifier="customers"), + name="customers" + ) + + orders_dataset = DataSet( + RedshiftConfig(identifier="orders"), + name="orders" + ) + + # Find how many customer IDs appear in both tables + intersection = adapter.intersect_count( + table1=customers_dataset, + column1_name="customer_id", + table2=orders_dataset, + column2_name="customer_id" + ) + + print(f"Customers with orders: {intersection}") + + # Example 6: Analyze composite key uniqueness + print("\n=== Example 6: Composite Key Analysis ===") + orders_config = RedshiftConfig(identifier="orders") + + # Check uniqueness of composite key (customer_id, order_date) + composite_uniqueness = adapter.get_composite_key_uniqueness( + table_name="orders", + columns=["customer_id", "order_date"], + dataset_data=orders_config + ) + + print(f"Unique (customer_id, order_date) combinations: {composite_uniqueness}") + + # Example 7: Create a materialized view + print("\n=== Example 7: Create a Materialized View ===") + materialized_query = """ + SELECT + DATE_TRUNC('month', order_date) as month, + customer_segment, + COUNT(DISTINCT customer_id) as unique_customers, + COUNT(*) as order_count, + SUM(order_total) as total_revenue + FROM orders o + JOIN customers c ON o.customer_id = c.customer_id + GROUP BY 1, 2 + """ + + adapter.create_table_from_query( + table_name="monthly_segment_metrics", + query=materialized_query, + materialize="materialized_view" + ) + print("Created materialized view: monthly_segment_metrics") + print("Note: Refresh with: REFRESH MATERIALIZED VIEW monthly_segment_metrics") + + print("\n" + "=" * 50) + print("All examples completed successfully!") + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 4df6392..8ea2566 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,6 +69,10 @@ postgres = [ "asyncpg>=0.30.0", "sqlglot>=27.20.0", ] +redshift = [ + "redshift-connector>=2.1.0", + "sqlglot>=27.20.0", +] sqlserver = [ "mssql-python>=0.13.1", "sqlglot>=27.20.0", diff --git a/src/intugle/adapters/factory.py b/src/intugle/adapters/factory.py index 81bbc2e..3eb0b7b 100644 --- a/src/intugle/adapters/factory.py +++ b/src/intugle/adapters/factory.py @@ -32,6 +32,7 @@ def is_safe_plugin_name(plugin_name: str) -> bool: "intugle.adapters.types.snowflake.snowflake", "intugle.adapters.types.databricks.databricks", "intugle.adapters.types.postgres.postgres", + "intugle.adapters.types.redshift.redshift", "intugle.adapters.types.mysql.mysql", "intugle.adapters.types.sqlserver.sqlserver", "intugle.adapters.types.sqlite.sqlite", diff --git a/src/intugle/adapters/types/redshift/README.md b/src/intugle/adapters/types/redshift/README.md new file mode 100644 index 0000000..152d8b9 --- /dev/null +++ b/src/intugle/adapters/types/redshift/README.md @@ -0,0 +1,182 @@ +# Amazon Redshift Adapter + +The Redshift adapter allows Intugle to connect to and interact with Amazon Redshift data warehouses. + +## Installation + +To use the Redshift adapter, you need to install Intugle with the Redshift optional dependencies: + +```bash +pip install intugle[redshift] +``` + +This will install: +- `redshift-connector>=2.1.0` - The official Amazon Redshift Python connector +- `sqlglot>=27.20.0` - SQL transpilation library + +## Configuration + +Add a Redshift configuration to your `profiles.yml` file: + +```yaml +redshift: + name: my_redshift_source + user: your_username + password: your_password + host: your-cluster.region.redshift.amazonaws.com + port: 5439 # Default Redshift port + database: your_database + schema: public +``` + +### Configuration Parameters + +- **name** (optional): A friendly name for your Redshift source. Defaults to "my_redshift_source" +- **user**: Your Redshift username +- **password**: Your Redshift password +- **host**: The Redshift cluster endpoint (without the port) +- **port**: The port number (default: 5439) +- **database**: The database name to connect to +- **schema**: The schema to use for queries + +## Usage + +### Basic Example + +```python +from intugle.adapters.types.redshift.models import RedshiftConfig +from intugle.adapters.types.redshift.redshift import RedshiftAdapter + +# Create a config for your table +config = RedshiftConfig( + identifier="my_table", + type="redshift" +) + +# Get the adapter instance +adapter = RedshiftAdapter() + +# Profile the table +profile = adapter.profile(config, "my_table") +print(f"Total rows: {profile.count}") +print(f"Columns: {profile.columns}") +``` + +### Profile a Column + +```python +# Get detailed profile for a specific column +column_profile = adapter.column_profile( + data=config, + table_name="my_table", + column_name="customer_id", + total_count=profile.count, + sample_limit=10, + dtype_sample_limit=10000 +) + +print(f"Distinct count: {column_profile.distinct_count}") +print(f"Null count: {column_profile.null_count}") +print(f"Uniqueness: {column_profile.uniqueness}") +print(f"Completeness: {column_profile.completeness}") +``` + +### Query and Create Views/Tables + +```python +# Execute a query and get results as DataFrame +df = adapter.to_df_from_query("SELECT * FROM my_table LIMIT 100") + +# Create a view from a query +query = """ +SELECT customer_id, SUM(order_total) as total_spent +FROM orders +GROUP BY customer_id +HAVING SUM(order_total) > 1000 +""" + +adapter.create_table_from_query( + table_name="high_value_customers", + query=query, + materialize="view" # Options: "view", "table", "materialized_view" +) +``` + +### Analyzing Relationships + +```python +from intugle.analysis.models import DataSet + +# Create DataSet objects +customers = DataSet( + RedshiftConfig(identifier="customers"), + name="customers" +) + +orders = DataSet( + RedshiftConfig(identifier="orders"), + name="orders" +) + +# Find intersection count between tables +intersection = adapter.intersect_count( + table1=customers, + column1_name="customer_id", + table2=orders, + column2_name="customer_id" +) + +print(f"Common customer IDs: {intersection}") +``` + +## Features + +The Redshift adapter supports all standard Intugle adapter operations: + +- **Profiling**: Get row counts, column lists, and data types +- **Column Profiling**: Detailed statistics including null counts, distinct values, uniqueness, and completeness +- **Query Execution**: Run arbitrary SQL queries +- **DataFrame Conversion**: Convert query results to Pandas DataFrames +- **Table/View Creation**: Create tables, views, or materialized views from queries +- **Relationship Analysis**: Calculate intersections between tables +- **Composite Key Analysis**: Analyze uniqueness of composite keys + +## SQL Dialect + +The adapter uses SQLGlot to transpile queries to Redshift's SQL dialect. This means you can write queries in a more standard SQL format, and they will be automatically converted to Redshift-compatible SQL. + +## Notes + +- The Redshift adapter is based on the PostgreSQL adapter since Redshift is built on PostgreSQL +- Some SQL features may differ from standard PostgreSQL due to Redshift's columnar storage and distributed architecture +- For best performance, consider Redshift's distribution keys and sort keys when creating tables +- Materialized views in Redshift need to be manually refreshed using `REFRESH MATERIALIZED VIEW` + +## Troubleshooting + +### Connection Issues + +If you're having trouble connecting: + +1. Verify your cluster endpoint and credentials +2. Check that your IP is whitelisted in the Redshift security group +3. Ensure the cluster is publicly accessible (if connecting from outside AWS) +4. Verify the database and schema exist + +### Missing Dependencies + +If you see an error about missing dependencies: + +``` +ImportError: Redshift dependencies are not installed. Please run 'pip install intugle[redshift]'. +``` + +Install the required dependencies: + +```bash +pip install intugle[redshift] +``` + +## License + +The `redshift-connector` package is licensed under the Apache License 2.0, which is compatible with this project's Apache 2.0 license. diff --git a/src/intugle/adapters/types/redshift/__init__.py b/src/intugle/adapters/types/redshift/__init__.py new file mode 100644 index 0000000..5a1624b --- /dev/null +++ b/src/intugle/adapters/types/redshift/__init__.py @@ -0,0 +1 @@ +# Redshift adapter module diff --git a/src/intugle/adapters/types/redshift/models.py b/src/intugle/adapters/types/redshift/models.py new file mode 100644 index 0000000..532017f --- /dev/null +++ b/src/intugle/adapters/types/redshift/models.py @@ -0,0 +1,19 @@ +from typing import Literal + +from pydantic import Field + +from intugle.common.schema import SchemaBase + + +class RedshiftConnectionConfig(SchemaBase): + user: str + password: str + host: str + port: int = 5439 # Default Redshift port + database: str + schema_: str = Field(..., alias="schema") + + +class RedshiftConfig(SchemaBase): + identifier: str + type: Literal["redshift"] = "redshift" diff --git a/src/intugle/adapters/types/redshift/redshift.py b/src/intugle/adapters/types/redshift/redshift.py new file mode 100644 index 0000000..5caee9f --- /dev/null +++ b/src/intugle/adapters/types/redshift/redshift.py @@ -0,0 +1,371 @@ +import time +from typing import TYPE_CHECKING, Any, Optional + +import numpy as np +import pandas as pd + +from intugle.adapters.adapter import Adapter +from intugle.adapters.factory import AdapterFactory +from intugle.adapters.models import ColumnProfile, DataSetData, ProfilingOutput +from intugle.adapters.types.redshift.models import RedshiftConfig, RedshiftConnectionConfig +from intugle.adapters.utils import convert_to_native +from intugle.core import settings +from intugle.core.utilities.processing import string_standardization + +if TYPE_CHECKING: + from intugle.analysis.models import DataSet + +try: + import redshift_connector + + REDSHIFT_CONNECTOR_AVAILABLE = True +except ImportError: + REDSHIFT_CONNECTOR_AVAILABLE = False + +try: + from sqlglot import transpile + + SQLGLOT_AVAILABLE = True +except ImportError: + SQLGLOT_AVAILABLE = False + + +REDSHIFT_AVAILABLE = REDSHIFT_CONNECTOR_AVAILABLE and SQLGLOT_AVAILABLE + + +class RedshiftAdapter(Adapter): + _instance = None + _initialized = False + + @property + def database(self) -> Optional[str]: + return self._database + + @database.setter + def database(self, value: str): + self._database = value + + @property + def schema(self) -> Optional[str]: + return self._schema + + @schema.setter + def schema(self, value: str): + self._schema = value + + @property + def source_name(self) -> str: + return self._source_name + + @source_name.setter + def source_name(self, value: str): + self._source_name = value + + def __new__(cls, *args, **kwargs): + if not cls._instance: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + if self._initialized: + return + + if not REDSHIFT_AVAILABLE: + raise ImportError( + "Redshift dependencies are not installed. Please run 'pip install intugle[redshift]'." + ) + + self.connection: Optional["redshift_connector.Connection"] = None + self._database: Optional[str] = None + self._schema: Optional[str] = None + self._source_name: str = settings.PROFILES.get("redshift", {}).get("name", "my_redshift_source") + + self.connect() + self._initialized = True + + def connect(self): + connection_parameters_dict = settings.PROFILES.get("redshift", {}) + if not connection_parameters_dict: + raise ValueError("Could not create Redshift connection. No 'redshift' section found in profiles.yml.") + + params = RedshiftConnectionConfig.model_validate(connection_parameters_dict) + self._database = params.database + self._schema = params.schema_ + + self.connection = redshift_connector.connect( + user=params.user, + password=params.password, + host=params.host, + port=params.port, + database=params.database, + ) + # Set the search path to the schema + with self.connection.cursor() as cursor: + cursor.execute(f"SET search_path TO {self._schema}") + + def _get_fqn(self, identifier: str) -> str: + """Gets the fully qualified name for a table identifier.""" + if "." in identifier: + return identifier + return f'"{self._schema}"."{identifier}"' + + @staticmethod + def check_data(data: Any) -> RedshiftConfig: + try: + data = RedshiftConfig.model_validate(data) + except Exception: + raise TypeError("Input must be a Redshift config.") + return data + + def _execute_sql(self, query: str, params: tuple = None) -> list[Any]: + with self.connection.cursor() as cursor: + if params: + cursor.execute(query, params) + else: + cursor.execute(query) + try: + result = cursor.fetchall() + return result + except Exception: + # For queries that don't return results (e.g., CREATE TABLE) + return [] + + def _get_pandas_df(self, query: str, params: tuple = None) -> pd.DataFrame: + with self.connection.cursor() as cursor: + if params: + cursor.execute(query, params) + else: + cursor.execute(query) + + try: + result = cursor.fetchall() + if not result: + return pd.DataFrame() + + # Get column names from cursor description + columns = [desc[0] for desc in cursor.description] + return pd.DataFrame(result, columns=columns) + except Exception: + return pd.DataFrame() + + def profile(self, data: RedshiftConfig, table_name: str) -> ProfilingOutput: + data = self.check_data(data) + fqn = self._get_fqn(data.identifier) + + total_count = self._execute_sql(f"SELECT COUNT(*) FROM {fqn}")[0][0] + + query = f""" + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_schema = '{self._schema}' AND table_name = '{data.identifier}' + """ + rows = self._execute_sql(query) + columns = [row[0] for row in rows] + dtypes = {row[0]: row[1] for row in rows} + + return ProfilingOutput( + count=total_count, + columns=columns, + dtypes=dtypes, + ) + + def column_profile( + self, + data: RedshiftConfig, + table_name: str, + column_name: str, + total_count: int, + sample_limit: int = 10, + dtype_sample_limit: int = 10000, + ) -> Optional[ColumnProfile]: + data = self.check_data(data) + fqn = self._get_fqn(data.identifier) + start_ts = time.time() + + # Null and distinct counts + query = f""" + SELECT + SUM(CASE WHEN "{column_name}" IS NULL THEN 1 ELSE 0 END) as null_count, + COUNT(DISTINCT "{column_name}") as distinct_count + FROM {fqn} + """ + result = self._execute_sql(query)[0] + null_count = result[0] + distinct_count = result[1] + not_null_count = total_count - null_count + + # Sampling + sample_query = f""" + SELECT DISTINCT CAST("{column_name}" AS VARCHAR(max)) + FROM {fqn} + WHERE "{column_name}" IS NOT NULL + LIMIT {dtype_sample_limit} + """ + distinct_values_result = self._execute_sql(sample_query) + distinct_values = [row[0] for row in distinct_values_result] + + if distinct_count > 0: + distinct_sample_size = min(distinct_count, dtype_sample_limit) + sample_data = list(np.random.choice(distinct_values, distinct_sample_size, replace=False)) + else: + sample_data = [] + + dtype_sample = None + if distinct_count >= dtype_sample_limit: + dtype_sample = sample_data + elif distinct_count > 0 and not_null_count > 0: + remaining_sample_size = dtype_sample_limit - distinct_count + additional_samples_query = f""" + SELECT CAST("{column_name}" AS VARCHAR(max)) + FROM {fqn} + WHERE "{column_name}" IS NOT NULL + ORDER BY RANDOM() + LIMIT {remaining_sample_size} + """ + additional_samples_result = self._execute_sql(additional_samples_query) + additional_samples = [row[0] for row in additional_samples_result] + dtype_sample = list(distinct_values) + additional_samples + else: + dtype_sample = [] + + native_sample_data = convert_to_native(sample_data) + native_dtype_sample = convert_to_native(dtype_sample) + business_name = string_standardization(column_name) + + return ColumnProfile( + column_name=column_name, + table_name=table_name, + business_name=business_name, + null_count=null_count, + count=total_count, + distinct_count=distinct_count, + uniqueness=distinct_count / total_count if total_count > 0 else 0.0, + completeness=not_null_count / total_count if total_count > 0 else 0.0, + sample_data=native_sample_data[:sample_limit], + dtype_sample=native_dtype_sample, + ts=time.time() - start_ts, + ) + + def load(self, data: RedshiftConfig, table_name: str): + self.check_data(data) + # No-op, we assume the table already exists in Redshift. + + def execute(self, query: str): + return self._execute_sql(query) + + def to_df(self, data: RedshiftConfig, table_name: str) -> pd.DataFrame: + data = self.check_data(data) + fqn = self._get_fqn(data.identifier) + return self._get_pandas_df(f"SELECT * FROM {fqn}") + + def to_df_from_query(self, query: str) -> pd.DataFrame: + return self._get_pandas_df(query) + + def create_table_from_query( + self, table_name: str, query: str, materialize: str = "view", **kwargs + ) -> str: + fqn = self._get_fqn(table_name) + transpiled_sql = transpile(query, write="redshift")[0] + + if materialize == "table": + self._execute_sql(f"DROP TABLE IF EXISTS {fqn}") + self._execute_sql(f"CREATE TABLE {fqn} AS {transpiled_sql}") + elif materialize == "materialized_view": + # Redshift supports materialized views + self._execute_sql(f"DROP MATERIALIZED VIEW IF EXISTS {fqn}") + self._execute_sql(f"CREATE MATERIALIZED VIEW {fqn} AS {transpiled_sql}") + else: + # Redshift views don't support CREATE OR REPLACE, so we drop first + self._execute_sql(f"DROP VIEW IF EXISTS {fqn}") + self._execute_sql(f"CREATE VIEW {fqn} AS {transpiled_sql}") + + return transpiled_sql + + def create_new_config_from_etl(self, etl_name: str) -> "DataSetData": + return RedshiftConfig(identifier=etl_name) + + def intersect_count(self, table1: "DataSet", column1_name: str, table2: "DataSet", column2_name: str) -> int: + table1_adapter = self.check_data(table1.data) + table2_adapter = self.check_data(table2.data) + + fqn1 = self._get_fqn(table1_adapter.identifier) + fqn2 = self._get_fqn(table2_adapter.identifier) + + query = f""" + SELECT COUNT(*) FROM ( + SELECT DISTINCT "{column1_name}" FROM {fqn1} WHERE "{column1_name}" IS NOT NULL + INTERSECT + SELECT DISTINCT "{column2_name}" FROM {fqn2} WHERE "{column2_name}" IS NOT NULL + ) as t + """ + return self._execute_sql(query)[0][0] + + def get_composite_key_uniqueness(self, table_name: str, columns: list[str], dataset_data: DataSetData) -> int: + data = self.check_data(dataset_data) + fqn = self._get_fqn(data.identifier) + safe_columns = [f'"{col}"' for col in columns] + column_list = ", ".join(safe_columns) + null_cols_filter = " AND ".join(f"{c} IS NOT NULL" for c in safe_columns) + + query = f""" + SELECT COUNT(*) FROM ( + SELECT DISTINCT {column_list} FROM {fqn} + WHERE {null_cols_filter} + ) as t + """ + return self._execute_sql(query)[0][0] + + def intersect_composite_keys_count( + self, + table1: "DataSet", + columns1: list[str], + table2: "DataSet", + columns2: list[str], + ) -> int: + table1_adapter = self.check_data(table1.data) + table2_adapter = self.check_data(table2.data) + + fqn1 = self._get_fqn(table1_adapter.identifier) + fqn2 = self._get_fqn(table2_adapter.identifier) + + safe_columns1 = [f'"{col}"' for col in columns1] + safe_columns2 = [f'"{col}"' for col in columns2] + + # Subquery for distinct keys from table 1 + distinct_cols1 = ", ".join(safe_columns1) + null_filter1 = " AND ".join(f"{c} IS NOT NULL" for c in safe_columns1) + subquery1 = f'(SELECT DISTINCT {distinct_cols1} FROM {fqn1} WHERE {null_filter1}) AS t1' + + # Subquery for distinct keys from table 2 + distinct_cols2 = ", ".join(safe_columns2) + null_filter2 = " AND ".join(f"{c} IS NOT NULL" for c in safe_columns2) + subquery2 = f'(SELECT DISTINCT {distinct_cols2} FROM {fqn2} WHERE {null_filter2}) AS t2' + + # Join conditions + join_conditions = " AND ".join( + [f"t1.{c1} = t2.{c2}" for c1, c2 in zip(safe_columns1, safe_columns2)] + ) + + query = f""" + SELECT COUNT(*) + FROM {subquery1} + INNER JOIN {subquery2} ON {join_conditions} + """ + return self._execute_sql(query)[0][0] + + def get_details(self, data: RedshiftConfig): + data = self.check_data(data) + return data.model_dump() + + +def can_handle_redshift(df: Any) -> bool: + try: + RedshiftConfig.model_validate(df) + return True + except Exception: + return False + + +def register(factory: AdapterFactory): + if REDSHIFT_AVAILABLE: + factory.register("redshift", can_handle_redshift, RedshiftAdapter, RedshiftConfig) diff --git a/tests/adapters/test_redshift_adapter.py b/tests/adapters/test_redshift_adapter.py new file mode 100644 index 0000000..bab5acf --- /dev/null +++ b/tests/adapters/test_redshift_adapter.py @@ -0,0 +1,204 @@ +import pytest +from unittest.mock import MagicMock, Mock, patch + +from intugle.adapters.types.redshift.redshift import RedshiftAdapter, REDSHIFT_AVAILABLE +from intugle.adapters.types.redshift.models import RedshiftConfig +from intugle.analysis.models import DataSet + + +@pytest.fixture +def mock_redshift_connection(): + """Mock a Redshift connection.""" + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value.__enter__.return_value = mock_cursor + return mock_conn, mock_cursor + + +@pytest.fixture +def mock_settings(): + """Mock settings with Redshift configuration.""" + with patch("intugle.adapters.types.redshift.redshift.settings") as mock_settings: + mock_settings.PROFILES.get.return_value = { + "user": "test_user", + "password": "test_password", + "host": "test-cluster.redshift.amazonaws.com", + "port": 5439, + "database": "test_db", + "schema": "public", + "name": "test_redshift_source" + } + yield mock_settings + + +@pytest.mark.skipif(not REDSHIFT_AVAILABLE, reason="Redshift dependencies not installed") +class TestRedshiftAdapter: + """Unit tests for RedshiftAdapter.""" + + def test_redshift_config_validation(self): + """Test that RedshiftConfig validates correctly.""" + config = RedshiftConfig(identifier="test_table", type="redshift") + assert config.identifier == "test_table" + assert config.type == "redshift" + + def test_check_data(self): + """Test the check_data static method.""" + valid_config = {"identifier": "test_table", "type": "redshift"} + result = RedshiftAdapter.check_data(valid_config) + assert isinstance(result, RedshiftConfig) + assert result.identifier == "test_table" + + with pytest.raises(TypeError): + RedshiftAdapter.check_data({"invalid": "config"}) + + @patch("intugle.adapters.types.redshift.redshift.redshift_connector") + def test_connect(self, mock_connector, mock_settings): + """Test the connection initialization.""" + mock_conn = MagicMock() + mock_connector.connect.return_value = mock_conn + + # Reset the singleton + RedshiftAdapter._instance = None + RedshiftAdapter._initialized = False + + adapter = RedshiftAdapter() + + mock_connector.connect.assert_called_once_with( + user="test_user", + password="test_password", + host="test-cluster.redshift.amazonaws.com", + port=5439, + database="test_db" + ) + assert adapter.database == "test_db" + assert adapter.schema == "public" + assert adapter.source_name == "test_redshift_source" + + @patch("intugle.adapters.types.redshift.redshift.redshift_connector") + def test_get_fqn(self, mock_connector, mock_settings): + """Test fully qualified name generation.""" + # Reset singleton + RedshiftAdapter._instance = None + RedshiftAdapter._initialized = False + + adapter = RedshiftAdapter() + + # Test with simple identifier + assert adapter._get_fqn("test_table") == '"public"."test_table"' + + # Test with already qualified identifier + assert adapter._get_fqn("schema.test_table") == "schema.test_table" + + @patch("intugle.adapters.types.redshift.redshift.redshift_connector") + def test_profile(self, mock_connector, mock_settings, mock_redshift_connection): + """Test the profile method.""" + mock_conn, mock_cursor = mock_redshift_connection + mock_connector.connect.return_value = mock_conn + + # Reset singleton + RedshiftAdapter._instance = None + RedshiftAdapter._initialized = False + + adapter = RedshiftAdapter() + adapter.connection = mock_conn + + # Mock the count query + mock_cursor.fetchall.side_effect = [ + [(100,)], # Total count + [("col1", "varchar"), ("col2", "integer")] # Column metadata + ] + + config = RedshiftConfig(identifier="test_table") + result = adapter.profile(config, "test_table") + + assert result.count == 100 + assert "col1" in result.columns + assert "col2" in result.columns + assert result.dtypes["col1"] == "varchar" + assert result.dtypes["col2"] == "integer" + + @patch("intugle.adapters.types.redshift.redshift.redshift_connector") + def test_create_table_from_query(self, mock_connector, mock_settings, mock_redshift_connection): + """Test creating a table from a query.""" + mock_conn, mock_cursor = mock_redshift_connection + mock_connector.connect.return_value = mock_conn + + # Reset singleton + RedshiftAdapter._instance = None + RedshiftAdapter._initialized = False + + adapter = RedshiftAdapter() + adapter.connection = mock_conn + + mock_cursor.fetchall.return_value = [] + + query = "SELECT * FROM source_table WHERE id > 10" + adapter.create_table_from_query("new_table", query, materialize="table") + + # Verify that DROP and CREATE were called + assert mock_cursor.execute.call_count >= 2 + + @patch("intugle.adapters.types.redshift.redshift.redshift_connector") + def test_to_df_from_query(self, mock_connector, mock_settings, mock_redshift_connection): + """Test converting query results to DataFrame.""" + mock_conn, mock_cursor = mock_redshift_connection + mock_connector.connect.return_value = mock_conn + + # Reset singleton + RedshiftAdapter._instance = None + RedshiftAdapter._initialized = False + + adapter = RedshiftAdapter() + adapter.connection = mock_conn + + # Mock query result + mock_cursor.description = [("id",), ("name",)] + mock_cursor.fetchall.return_value = [(1, "Alice"), (2, "Bob")] + + df = adapter.to_df_from_query("SELECT * FROM test_table") + + assert len(df) == 2 + assert list(df.columns) == ["id", "name"] + assert df.iloc[0]["name"] == "Alice" + + @patch("intugle.adapters.types.redshift.redshift.redshift_connector") + def test_create_new_config_from_etl(self, mock_connector, mock_settings): + """Test creating a new config from ETL name.""" + # Reset singleton + RedshiftAdapter._instance = None + RedshiftAdapter._initialized = False + + adapter = RedshiftAdapter() + + new_config = adapter.create_new_config_from_etl("my_etl_table") + + assert isinstance(new_config, RedshiftConfig) + assert new_config.identifier == "my_etl_table" + assert new_config.type == "redshift" + + +def test_can_handle_redshift(): + """Test the can_handle_redshift function.""" + from intugle.adapters.types.redshift.redshift import can_handle_redshift + + valid_config = {"identifier": "test_table", "type": "redshift"} + assert can_handle_redshift(valid_config) is True + + invalid_config = {"identifier": "test_table", "type": "postgres"} + assert can_handle_redshift(invalid_config) is False + + +@pytest.mark.skipif(not REDSHIFT_AVAILABLE, reason="Redshift dependencies not installed") +def test_adapter_registration(): + """Test that the Redshift adapter can be registered with the factory.""" + from intugle.adapters.factory import AdapterFactory + from intugle.adapters.types.redshift.redshift import register + + factory = AdapterFactory.__new__(AdapterFactory) + factory.dataframe_funcs = {} + factory.config_types = [] + + register(factory) + + assert "redshift" in factory.dataframe_funcs + assert RedshiftConfig in factory.config_types From 2315fca56a81c4c3a32dee7883dba5b906e7e17b Mon Sep 17 00:00:00 2001 From: zak1504 Date: Thu, 1 Jan 2026 14:55:43 +0530 Subject: [PATCH 2/2] fix: Resolve merge conflict - add both redshift and bigquery adapters --- pyproject.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 8ea2566..a2e3ba4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,6 +73,10 @@ redshift = [ "redshift-connector>=2.1.0", "sqlglot>=27.20.0", ] +bigquery = [ + "google-cloud-bigquery>=3.11.0", + "sqlglot>=27.20.0", +] sqlserver = [ "mssql-python>=0.13.1", "sqlglot>=27.20.0",