Skip to content

Commit 94595b3

Browse files
committed
experiment new data loading protocal
1 parent d143354 commit 94595b3

File tree

9 files changed

+673
-219
lines changed

9 files changed

+673
-219
lines changed

.env.template

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,4 @@
55
DISABLE_DISPLAY_KEYS=false # if true, the display keys will not be shown in the frontend
66
EXEC_PYTHON_IN_SUBPROCESS=false # if true, the python code will be executed in a subprocess to avoid crashing the main app, but it will increase the time of response
77

8-
LOCAL_DB_DIR= # the directory to store the local database, if not provided, the app will use the temp directory
9-
10-
# External atabase connection settings
11-
# check https://duckdb.org/docs/stable/extensions/mysql.html
12-
# and https://duckdb.org/docs/stable/extensions/postgres.html
13-
USE_EXTERNAL_DB=false # if true, the app will use an external database instead of the one in the app
14-
DB_NAME=mysql_db # the name to refer to this database connection
15-
DB_TYPE=mysql # mysql or postgresql
16-
DB_HOST=localhost
17-
DB_PORT=0
18-
DB_DATABASE=mysql
19-
DB_USER=root
20-
DB_PASSWORD=
8+
LOCAL_DB_DIR= # the directory to store the local database, if not provided, the app will use the temp directory

py-src/data_formulator/app.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from data_formulator.tables_routes import tables_bp
3838
from data_formulator.agent_routes import agent_bp
3939

40+
4041
app = Flask(__name__, static_url_path='', static_folder=os.path.join(APP_ROOT, "dist"))
4142
app.secret_key = secrets.token_hex(16) # Generate a random secret key for sessions
4243

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from abc import ABC, abstractmethod
2+
from typing import Dict, Any, List
3+
import pandas as pd
4+
import json
5+
import duckdb
6+
import random
7+
import string
8+
9+
class ExternalDataLoader(ABC):
10+
11+
def ingest_df_to_duckdb(self, df: pd.DataFrame, table_name: str):
12+
13+
base_name = table_name
14+
counter = 1
15+
while True:
16+
# Check if table exists
17+
exists = self.duck_db_conn.execute(f"SELECT COUNT(*) FROM duckdb_tables() WHERE table_name = '{table_name}'").fetchone()[0] > 0
18+
if not exists:
19+
break
20+
# If exists, append counter to base name
21+
table_name = f"{base_name}_{counter}"
22+
counter += 1
23+
24+
# Create table
25+
random_suffix = ''.join(random.choices(string.ascii_letters + string.digits, k=6))
26+
self.duck_db_conn.register(f'df_temp_{random_suffix}', df)
27+
self.duck_db_conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM df_temp_{random_suffix}")
28+
self.duck_db_conn.execute(f"DROP VIEW df_temp_{random_suffix}") # Drop the temporary view after creating the table
29+
30+
@staticmethod
31+
@abstractmethod
32+
def list_params() -> List[Dict[str, Any]]:
33+
pass
34+
35+
@abstractmethod
36+
def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnection):
37+
pass
38+
39+
@abstractmethod
40+
def list_tables(self) -> List[Dict[str, Any]]:
41+
# should include: table_name, column_names, column_types, sample_data
42+
pass
43+
44+
@abstractmethod
45+
def ingest_data(self, table_name: str, name_as: str = None, size: int = 1000000):
46+
pass
47+
48+
@abstractmethod
49+
def ingest_data_from_query(self, query: str, name_as: str):
50+
pass
51+
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
from typing import Dict, Any, List
2+
import pandas as pd
3+
import json
4+
import duckdb
5+
import random
6+
import string
7+
8+
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
9+
from azure.kusto.data.helpers import dataframe_from_result_table
10+
11+
from data_formulator.data_loader.external_data_loader import ExternalDataLoader
12+
13+
def sanitize_table_name(table_name: str) -> str:
14+
return table_name.replace(".", "_").replace("-", "_")
15+
16+
class KustoDataLoader(ExternalDataLoader):
17+
18+
@staticmethod
19+
def list_params() -> bool:
20+
params_list = [
21+
{"name": "kusto_cluster", "type": "string", "required": True},
22+
{"name": "kusto_database", "type": "string", "required": True},
23+
{"name": "client_id", "type": "string", "required": False},
24+
{"name": "client_secret", "type": "string", "required": False},
25+
{"name": "tenant_id", "type": "string", "required": False}
26+
]
27+
return params_list
28+
29+
def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnection):
30+
31+
self.kusto_cluster = params.get("kusto_cluster", None)
32+
self.kusto_database = params.get("kusto_database", None)
33+
34+
self.client_id = params.get("client_id", None)
35+
self.client_secret = params.get("client_secret", None)
36+
self.tenant_id = params.get("tenant_id", None)
37+
38+
try:
39+
if self.client_id and self.client_secret and self.tenant_id:
40+
# This function provides an interface to Kusto. It uses AAD application key authentication.
41+
self.client = KustoClient(KustoConnectionStringBuilder.with_aad_application_key_authentication(
42+
self.kusto_cluster, self.client_id, self.client_secret, self.tenant_id))
43+
else:
44+
# This function provides an interface to Kusto. It uses Azure CLI auth, but you can also use other auth types.
45+
self.client = KustoClient(KustoConnectionStringBuilder.with_az_cli_authentication(self.kusto_cluster))
46+
except Exception as e:
47+
raise Exception(f"Error creating Kusto client: {e}, please authenticate with Azure CLI when starting the app.")
48+
49+
self.duck_db_conn = duck_db_conn
50+
51+
def query(self, kql: str) -> pd.DataFrame:
52+
result = self.client.execute(self.kusto_database, kql)
53+
return dataframe_from_result_table(result.primary_results[0])
54+
55+
def list_tables(self) -> List[Dict[str, Any]]:
56+
query = ".show tables"
57+
tables_df = self.query(query)
58+
59+
results = []
60+
for table in tables_df.to_dict(orient="records"):
61+
table_name = table['TableName']
62+
schema_result = self.query(f".show table ['{table_name}'] schema as json").to_dict(orient="records")
63+
columns = [{
64+
'name': r["Name"],
65+
'type': r["Type"]
66+
} for r in json.loads(schema_result[0]['Schema'])['OrderedColumns']]
67+
68+
row_count_result = self.query(f".show table ['{table_name}'] details").to_dict(orient="records")
69+
row_count = row_count_result[0]["TotalRowCount"]
70+
71+
sample_query = f"['{table_name}'] | take {10}"
72+
sample_result = self.query(sample_query).to_dict(orient="records")
73+
74+
table_metadata = {
75+
"row_count": row_count,
76+
"columns": columns,
77+
"sample_rows": sample_result
78+
}
79+
80+
results.append({
81+
"name": table_name,
82+
"metadata": table_metadata
83+
})
84+
85+
return results
86+
87+
def ingest_data(self, table_name: str, name_as: str = None, size: int = 5000000) -> pd.DataFrame:
88+
if name_as is None:
89+
name_as = table_name
90+
91+
# Create a subquery that applies random ordering once with a fixed seed
92+
total_rows_ingested = 0
93+
first_chunk = True
94+
chunk_size = 100000
95+
96+
size_estimate_query = f"['{table_name}'] | take {10000} | summarize Total=sum(estimate_data_size(*))"
97+
size_estimate_result = self.query(size_estimate_query)
98+
size_estimate = size_estimate_result['Total'].values[0]
99+
print(f"size_estimate: {size_estimate}")
100+
101+
chunk_size = min(64 * 1024 * 1024 / size_estimate * 0.9 * 10000, 5000000)
102+
print(f"estimated_chunk_size: {chunk_size}")
103+
104+
while total_rows_ingested < size:
105+
try:
106+
query = f"['{table_name}'] | serialize | extend rn=row_number() | where rn >= {total_rows_ingested} and rn < {total_rows_ingested + chunk_size} | project-away rn"
107+
chunk_df = self.query(query)
108+
except Exception as e:
109+
chunk_size = int(chunk_size * 0.8)
110+
continue
111+
112+
print(f"total_rows_ingested: {total_rows_ingested}")
113+
print(chunk_df.head())
114+
115+
# Stop if no more data
116+
if chunk_df.empty:
117+
break
118+
119+
# Sanitize the table name for SQL compatibility
120+
name_as = sanitize_table_name(name_as)
121+
122+
# For first chunk, create new table; for subsequent chunks, append
123+
if first_chunk:
124+
self.ingest_df_to_duckdb(chunk_df, name_as)
125+
first_chunk = False
126+
else:
127+
# Append to existing table
128+
random_suffix = ''.join(random.choices(string.ascii_letters + string.digits, k=6))
129+
self.duck_db_conn.register(f'df_temp_{random_suffix}', chunk_df)
130+
self.duck_db_conn.execute(f"INSERT INTO {name_as} SELECT * FROM df_temp_{random_suffix}")
131+
self.duck_db_conn.execute(f"DROP VIEW df_temp_{random_suffix}")
132+
133+
total_rows_ingested += len(chunk_df)
134+
135+
136+
def ingest_data_from_query(self, query: str, name_as: str) -> pd.DataFrame:
137+
# Sanitize the table name for SQL compatibility
138+
name_as = sanitize_table_name(name_as)
139+
df = self.query(query)
140+
self.ingest_df_to_duckdb(df, name_as)
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import json
2+
3+
import pandas as pd
4+
import duckdb
5+
6+
from data_formulator.data_loader.external_data_loader import ExternalDataLoader
7+
from typing import Dict, Any
8+
9+
class MySQLDataLoader(ExternalDataLoader):
10+
11+
@staticmethod
12+
def list_params() -> bool:
13+
params_list = [
14+
{"name": "user", "type": "string", "required": True, "default": "root"},
15+
{"name": "password", "type": "string", "required": False, "default": ""},
16+
{"name": "host", "type": "string", "required": True, "default": "localhost"},
17+
{"name": "database", "type": "string", "required": True, "default": "mysql"}
18+
]
19+
return params_list
20+
21+
def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnection):
22+
self.params = params
23+
self.duck_db_conn = duck_db_conn
24+
25+
# Install and load the MySQL extension
26+
self.duck_db_conn.install_extension("mysql")
27+
self.duck_db_conn.load_extension("mysql")
28+
29+
attatch_string = ""
30+
for key, value in self.params.items():
31+
if value:
32+
attatch_string += f"{key}={value} "
33+
34+
# Register MySQL connection
35+
self.duck_db_conn.execute(f"ATTACH '{attatch_string}' AS mysqldb (TYPE mysql);")
36+
37+
def list_tables(self):
38+
tables_df = self.duck_db_conn.execute(f"""
39+
SELECT TABLE_SCHEMA, TABLE_NAME FROM mysqldb.information_schema.tables
40+
WHERE table_schema NOT IN ('information_schema', 'mysql', 'performance_schema', 'sys')
41+
""").fetch_df()
42+
43+
results = []
44+
45+
for schema, table_name in tables_df.values:
46+
47+
full_table_name = f"{schema}.{table_name}"
48+
49+
# Get column information using DuckDB's information schema
50+
columns_df = self.duck_db_conn.execute(f"DESCRIBE mysqldb.{full_table_name}").df()
51+
columns = [{
52+
'name': row['column_name'],
53+
'type': row['column_type']
54+
} for _, row in columns_df.iterrows()]
55+
56+
# Get sample data
57+
sample_df = self.duck_db_conn.execute(f"SELECT * FROM mysqldb.{full_table_name} LIMIT 10").df()
58+
sample_rows = json.loads(sample_df.to_json(orient="records"))
59+
60+
# get row count
61+
row_count = self.duck_db_conn.execute(f"SELECT COUNT(*) FROM mysqldb.{full_table_name}").fetchone()[0]
62+
63+
table_metadata = {
64+
"row_count": row_count,
65+
"columns": columns,
66+
"sample_rows": sample_rows
67+
}
68+
69+
results.append({
70+
"name": full_table_name,
71+
"metadata": table_metadata
72+
})
73+
74+
return results
75+
76+
def ingest_data(self, table_name: str, name_as: str = None, size: int = 1000000):
77+
# Create table in the main DuckDB database from MySQL data
78+
if name_as is None:
79+
name_as = table_name.split('.')[-1]
80+
81+
self.duck_db_conn.execute(f"""
82+
CREATE OR REPLACE TABLE {name_as} AS
83+
SELECT * FROM mysqldb.{table_name}
84+
LIMIT {size}
85+
""")
86+
87+
def ingest_data_from_query(self, query: str, name_as: str) -> pd.DataFrame:
88+
self.duck_db_conn.execute(f"""
89+
CREATE OR REPLACE TABLE main.{name_as} AS
90+
SELECT * FROM ({query})
91+
""")

py-src/data_formulator/db_manager.py

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,9 @@
99
from dotenv import load_dotenv
1010

1111
class DuckDBManager:
12-
def __init__(self, external_db_connections: Dict[str, Dict[str, Any]], local_db_dir: str):
12+
def __init__(self, local_db_dir: str):
1313
# Store session db file paths
1414
self._db_files: Dict[str, str] = {}
15-
16-
# External db connections and tracking of installed extensions
17-
self._external_db_connections: Dict[str, Dict[str, Any]] = external_db_connections
18-
self._installed_extensions: Dict[str, List[str]] = {}
1915
self._local_db_dir: str = local_db_dir
2016

2117
@contextmanager
@@ -26,7 +22,6 @@ def connection(self, session_id: str) -> ContextManager[duckdb.DuckDBPyConnectio
2622
conn = self.get_connection(session_id)
2723
yield conn
2824
finally:
29-
# Close the connection after use
3025
if conn:
3126
conn.close()
3227

@@ -40,52 +35,18 @@ def get_connection(self, session_id: str) -> duckdb.DuckDBPyConnection:
4035
db_file = os.path.join(db_dir, f"df_{session_id}.duckdb")
4136
print(f"=== Creating new db file: {db_file}")
4237
self._db_files[session_id] = db_file
43-
# Initialize extension tracking for this file
44-
self._installed_extensions[db_file] = []
4538
else:
4639
print(f"=== Using existing db file: {self._db_files[session_id]}")
4740
db_file = self._db_files[session_id]
4841

4942
# Create a fresh connection to the database file
5043
conn = duckdb.connect(database=db_file)
5144

52-
if self._external_db_connections and self._external_db_connections['db_type'] in ['mysql', 'postgresql']:
53-
db_name = self._external_db_connections['db_name']
54-
db_type = self._external_db_connections['db_type']
55-
56-
print(f"=== connecting to {db_type} extension")
57-
# Only install if not already installed for this db file
58-
if db_type not in self._installed_extensions.get(db_file, []):
59-
conn.execute(f"INSTALL {db_type};")
60-
self._installed_extensions[db_file].append(db_type)
61-
62-
conn.execute(f"LOAD {db_type};")
63-
conn.execute(f"""CREATE SECRET (
64-
TYPE {db_type},
65-
HOST '{self._external_db_connections['host']}',
66-
PORT '{self._external_db_connections['port']}',
67-
DATABASE '{self._external_db_connections['database']}',
68-
USER '{self._external_db_connections['user']}',
69-
PASSWORD '{self._external_db_connections['password']}');
70-
""")
71-
conn.execute(f"ATTACH '' AS {db_name} (TYPE {db_type});")
72-
# result = conn.execute(f"SELECT * FROM {db_name}.information_schema.tables WHERE table_schema NOT IN ('information_schema', 'mysql', 'performance_schema', 'sys');").fetch_df()
73-
# print(f"=== result: {result}")
74-
7545
return conn
7646

7747
env = load_dotenv()
7848

7949
# Initialize the DB manager
8050
db_manager = DuckDBManager(
81-
external_db_connections={
82-
"db_name": os.getenv('DB_NAME'),
83-
"db_type": os.getenv('DB_TYPE'),
84-
"host": os.getenv('DB_HOST'),
85-
"port": os.getenv('DB_PORT'),
86-
"database": os.getenv('DB_DATABASE'),
87-
"user": os.getenv('DB_USER'),
88-
"password": os.getenv('DB_PASSWORD')
89-
} if os.getenv('USE_EXTERNAL_DB') == 'true' else None,
9051
local_db_dir=os.getenv('LOCAL_DB_DIR')
9152
)

0 commit comments

Comments
 (0)