Skip to content

Commit f9a9453

Browse files
author
Yoonjin Hwang
committed
add database utilities
1 parent e302776 commit f9a9453

14 files changed

+877
-5
lines changed

.env.example

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,69 @@ AWS_BEDROCK_EMBEDDING_MODEL=amazon.titan-embed-text-v2:0
6666
# HUGGING_FACE_EMBEDDING_API_TOKEN=
6767

6868
DATAHUB_SERVER = 'http://-.-.-.-:-'
69-
CLICKHOUSE_HOST = '-.-.-.-'
70-
CLICKHOUSE_DATABASE = 'main'
71-
CLICKHOUSE_USER = '-'
72-
CLICKHOUSE_PASSWORD = '-'
73-
CLICKHOUSE_PORT = 9000
69+
70+
71+
###############################################
72+
######## Database Connector SELECTION #########
73+
###############################################
74+
75+
# clickhouse
76+
# DB_TYPE=clickhouse
77+
# CLICKHOUSE_HOST=_._._._
78+
# CLICKHOUSE_PORT=9000
79+
# CLICKHOUSE_USER=_
80+
# CLICKHOUSE_PASSWORD=_
81+
# CLICKHOUSE_DATABASE=_
82+
83+
# databricks
84+
# DB_TYPE=databricks
85+
# DATABRICKS_HOST=_
86+
# DATABRICKS_HTTP_PATH=_
87+
# DATABRICKS_ACCESS_TOKEN=_
88+
89+
# duckdb
90+
# DB_TYPE=duckdb
91+
# DUCKDB_PATH=./data/duckdb.db
92+
93+
# mariadb
94+
# DB_TYPE=mariadb
95+
# MARIADB_HOST=_
96+
# MARIADB_PORT=3306
97+
# MARIADB_USER=_
98+
# MARIADB_PASSWORD=_
99+
# MARIADB_DATABASE=_
100+
101+
# mysql
102+
# DB_TYPE=mysql
103+
# MYSQL_HOST=_
104+
# MYSQL_PORT=3306
105+
# MYSQL_USER=_
106+
# MYSQL_PASSWORD=_
107+
# MYSQL_DATABASE=_
108+
109+
# oracle
110+
# DB_TYPE=oracle
111+
# ORACLE_HOST=_
112+
# ORACLE_PORT=1521
113+
# ORACLE_USER=_
114+
# ORACLE_PASSWORD=_
115+
# ORACLE_DATABASE=_
116+
# ORACLE_SERVICE_NAME=_
117+
118+
# postgres
119+
# DB_TYPE=postgres
120+
# POSTGRES_HOST=_
121+
# POSTGRES_PORT=5432
122+
# POSTGRES_USER=_
123+
# POSTGRES_PASSWORD=_
124+
# POSTGRES_DATABASE=_
125+
126+
# snowflake
127+
# DB_TYPE=snowflake
128+
# SNOWFLAKE_USER=_
129+
# SNOWFLAKE_PASSWORD=_
130+
# SNOWFLAKE_ACCOUNT=_
131+
132+
# sqlite
133+
# DB_TYPE=sqlite
134+
# SQLITE_PATH=./data/sqlite.db

db_utils/__init__.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
from typing import Optional
2+
import os
3+
from .config import DBConfig
4+
from .logger import logger
5+
6+
from dotenv import load_dotenv
7+
8+
from .base_connector import BaseConnector
9+
10+
from .clickhouse_connector import ClickHouseConnector
11+
from .postgres_connector import PostgresConnector
12+
from .mysql_connector import MySQLConnector
13+
from .mariadb_connector import MariaDBConnector
14+
from .oracle_connector import OracleConnector
15+
from .duckdb_connector import DuckDBConnector
16+
from .databricks_connector import DatabricksConnector
17+
from .snowflake_connector import SnowflakeConnector
18+
19+
env_path = os.path.join(os.getcwd(), ".env")
20+
21+
if os.path.exists(env_path):
22+
load_dotenv(env_path, override=True)
23+
print(f"✅ 환경변수 파일(.env)이 {os.getcwd()}에 로드되었습니다!")
24+
else:
25+
print(f"⚠️ 환경변수 파일(.env)이 {os.getcwd()}에 없습니다!")
26+
27+
def get_db_connector(db_type: Optional[str] = None, config: Optional[DBConfig] = None):
28+
"""
29+
Return the appropriate DB connector instance.
30+
- If db_type is not provided, loads from environment variable DB_TYPE
31+
- If config is not provided, loads from environment using db_type
32+
33+
Parameters:
34+
db_type (Optional[str]): Database type (e.g., 'postgresql', 'mysql')
35+
config (Optional[DBConfig]): Connection config
36+
37+
Returns:
38+
BaseConnector: Initialized DB connector instance
39+
40+
Raises:
41+
ValueError: If type/config is missing or invalid
42+
"""
43+
if db_type is None:
44+
db_type = os.getenv("DB_TYPE")
45+
if not db_type:
46+
raise ValueError("DB type must be provided or set in environment as DB_TYPE.")
47+
48+
db_type = db_type.lower()
49+
50+
if config is None:
51+
config = load_config_from_env(db_type.upper())
52+
53+
connector_map = {
54+
"clickhouse": ClickHouseConnector,
55+
"postgresql": PostgresConnector,
56+
"mysql": MySQLConnector,
57+
"mariadb": MariaDBConnector,
58+
"oracle": OracleConnector,
59+
"duckdb": DuckDBConnector,
60+
"databricks": DatabricksConnector,
61+
"snowflake": SnowflakeConnector,
62+
}
63+
64+
if db_type not in connector_map:
65+
logger.error(f"Unsupported DB type: {db_type}")
66+
raise ValueError(f"Unsupported DB type: {db_type}")
67+
68+
required_fields = {
69+
"oracle": ["extra.service_name"],
70+
"databricks": ["extra.http_path", "extra.access_token"],
71+
"snowflake": ["extra.account"],
72+
}
73+
74+
missing = []
75+
for path in required_fields.get(db_type, []):
76+
cur = config
77+
for key in path.split("."):
78+
cur = cur.get(key) if isinstance(cur, dict) else None
79+
if cur is None:
80+
missing.append(path)
81+
break
82+
83+
if missing:
84+
logger.error(f"Missing required fields for {db_type}: {', '.join(missing)}")
85+
raise ValueError(f"Missing required fields for {db_type}: {', '.join(missing)}")
86+
87+
return connector_map[db_type](config)
88+
89+
90+
91+
def load_config_from_env(prefix: str) -> DBConfig:
92+
"""
93+
Load DBConfig from environment variables with a given prefix.
94+
Standard keys are extracted, all other prefixed keys go to 'extra'.
95+
96+
Example:
97+
If prefix = 'SNOWFLAKE', loads:
98+
- SNOWFLAKE_HOST
99+
- SNOWFLAKE_USER
100+
- SNOWFLAKE_PASSWORD
101+
- SNOWFLAKE_PORT
102+
- SNOWFLAKE_DATABASE
103+
Other keys like SNOWFLAKE_ACCOUNT, SNOWFLAKE_WAREHOUSE -> extra
104+
"""
105+
base_keys = {"HOST", "PORT", "USER", "PASSWORD", "DATABASE"}
106+
107+
# Extract standard values
108+
config = {
109+
"host": os.getenv(f"{prefix}_HOST"),
110+
"port": int(os.getenv(f"{prefix}_PORT")) if os.getenv(f"{prefix}_PORT") else None,
111+
"user": os.getenv(f"{prefix}_USER"),
112+
"password": os.getenv(f"{prefix}_PASSWORD"),
113+
"database": os.getenv(f"{prefix}_DATABASE"),
114+
}
115+
116+
# Auto-detect extra keys
117+
extra = {}
118+
for key, value in os.environ.items():
119+
if key.startswith(f"{prefix}_"):
120+
suffix = key[len(f"{prefix}_"):]
121+
if suffix.upper() not in base_keys:
122+
extra[suffix.lower()] = value
123+
124+
if extra:
125+
config["extra"] = extra
126+
127+
return DBConfig(**config)

db_utils/base_connector.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from abc import ABC, abstractmethod
2+
import pandas as pd
3+
4+
class BaseConnector(ABC):
5+
"""
6+
Abstract base class for database connectors.
7+
"""
8+
9+
@abstractmethod
10+
def connect(self):
11+
"""
12+
Initialize the database connection.
13+
"""
14+
pass
15+
16+
@abstractmethod
17+
def run_sql(self, sql: str) -> pd.DataFrame:
18+
"""
19+
Returns the result of the SQL query as a pandas DataFrame.
20+
21+
Parameters:
22+
sql (str): SQL query string to be executed.
23+
24+
Returns:
25+
pd.DataFrame: Result of the SQL query as a pandas DataFrame.
26+
"""
27+
pass

db_utils/clickhouse_connector.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
from .base_connector import BaseConnector
2+
from clickhouse_driver import Client
3+
import pandas as pd
4+
from db_utils import DBConfig, logger
5+
6+
class ClickHouseConnector(BaseConnector):
7+
"""
8+
Connect to ClickHouse and execute SQL queries.
9+
"""
10+
client = None
11+
12+
def __init__(self, config: DBConfig):
13+
"""
14+
Initialize the ClickHouseConnector with connection parameters.
15+
16+
Parameters:
17+
config (DBConfig): Configuration object containing connection parameters.
18+
"""
19+
self.host = config["host"]
20+
self.port = config["port"]
21+
self.user = config["user"]
22+
self.password = config["password"]
23+
self.database = config["database"]
24+
self.connect()
25+
26+
def connect(self) -> None:
27+
"""
28+
Establish a connection to the ClickHouse server.
29+
"""
30+
try:
31+
self.client = Client(
32+
host=self.host,
33+
port=self.port,
34+
user=self.user,
35+
password=self.password,
36+
database=self.database,
37+
)
38+
logger.info("Successfully connected to ClickHouse.")
39+
except Exception as e:
40+
logger.error(f"Failed to connect to ClickHouse: {e}")
41+
raise
42+
43+
def run_sql(self, sql: str) -> pd.DataFrame:
44+
"""
45+
Execute a SQL query and return the result as a pandas DataFrame.
46+
47+
Parameters:
48+
sql (str): SQL query string to be executed.
49+
50+
Returns:
51+
pd.DataFrame: Result of the SQL query as a pandas DataFrame.
52+
"""
53+
if self.client is None:
54+
self.connect()
55+
56+
try:
57+
result = self.client.query(sql).result()
58+
return pd.DataFrame(result)
59+
except Exception as e:
60+
logger.error(f"Failed to execute SQL query: {e}")
61+
raise
62+
63+
def close(self) -> None:
64+
"""
65+
Close the connection to the ClickHouse server.
66+
"""
67+
if self.client:
68+
self.client.disconnect()
69+
logger.error("Connection to ClickHouse closed.")
70+
self.client = None

db_utils/config.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from typing import Optional, Dict, TypedDict
2+
3+
class DBConfig(TypedDict):
4+
5+
host: str
6+
port: Optional[int]
7+
user: Optional[str]
8+
password: Optional[str]
9+
database: Optional[str]
10+
extra: Optional[Dict[str, str]]

db_utils/databricks_connector.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
from databricks import sql
2+
import pandas as pd
3+
from .base_connector import BaseConnector
4+
from .config import DBConfig
5+
from .logger import logger
6+
7+
class DatabricksConnector(BaseConnector):
8+
"""
9+
Connect to Databricks SQL Warehouse and execute queries.
10+
"""
11+
connection = None
12+
13+
def __init__(self, config: DBConfig):
14+
"""
15+
Initialize the DatabricksConnector with connection parameters.
16+
17+
Parameters:
18+
config (DBConfig): Configuration object containing connection parameters.
19+
Required keys: host, extra.http_path, extra.access_token
20+
Optional keys: extra.catalog, extra.schema
21+
"""
22+
self.server_hostname = config["host"]
23+
self.http_path = config["extra"]["http_path"]
24+
self.access_token = config["extra"]["access_token"]
25+
self.catalog = config.get("extra", {}).get("catalog")
26+
self.schema = config.get("extra", {}).get("schema")
27+
self.connect()
28+
29+
def connect(self) -> None:
30+
"""
31+
Establish a connection to the Databricks SQL endpoint.
32+
"""
33+
try:
34+
self.connection = sql.connect(
35+
server_hostname=self.server_hostname,
36+
http_path=self.http_path,
37+
access_token=self.access_token,
38+
catalog=self.catalog,
39+
schema=self.schema,
40+
)
41+
logger.info("Successfully connected to Databricks.")
42+
except Exception as e:
43+
logger.error(f"Failed to connect to Databricks: {e}")
44+
raise
45+
46+
def run_sql(self, sql: str) -> pd.DataFrame:
47+
"""
48+
Execute a SQL query and return result as pandas DataFrame.
49+
50+
Parameters:
51+
sql (str): SQL query string to be executed.
52+
53+
Returns:
54+
pd.DataFrame: Result of the SQL query as a pandas DataFrame.
55+
"""
56+
if self.connection is None:
57+
self.connect()
58+
59+
try:
60+
cursor = self.connection.cursor()
61+
cursor.execute(sql)
62+
columns = [desc[0] for desc in cursor.description]
63+
rows = cursor.fetchall()
64+
return pd.DataFrame(rows, columns=columns)
65+
except Exception as e:
66+
logger.error(f"Failed to execute SQL query: {e}")
67+
raise
68+
finally:
69+
cursor.close()
70+
71+
def close(self) -> None:
72+
"""
73+
Close the Databricks connection.
74+
"""
75+
if self.connection:
76+
self.connection.close()
77+
logger.error("Connection to Databricks closed.")
78+
self.connection = None

0 commit comments

Comments
 (0)