Skip to content

Commit 3720390

Browse files
committed
add new data loader design
1 parent 8e83241 commit 3720390

File tree

6 files changed

+177
-131
lines changed

6 files changed

+177
-131
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from data_formulator.data_loader.external_data_loader import ExternalDataLoader
2+
from data_formulator.data_loader.mysql_data_loader import MySQLDataLoader
3+
from data_formulator.data_loader.kusto_data_loader import KustoDataLoader
4+
5+
DATA_LOADERS = {
6+
"mysql": MySQLDataLoader,
7+
"kusto": KustoDataLoader
8+
}
9+
10+
__all__ = ["ExternalDataLoader", "MySQLDataLoader", "KustoDataLoader", "DATA_LOADERS"]

py-src/data_formulator/data_loader/kusto_data_loader.py

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ class KustoDataLoader(ExternalDataLoader):
1818
@staticmethod
1919
def list_params() -> bool:
2020
params_list = [
21-
{"name": "kusto_cluster", "type": "string", "required": True},
22-
{"name": "kusto_database", "type": "string", "required": True},
23-
{"name": "client_id", "type": "string", "required": False},
24-
{"name": "client_secret", "type": "string", "required": False},
25-
{"name": "tenant_id", "type": "string", "required": False}
21+
{"name": "kusto_cluster", "type": "string", "required": True, "description": ""},
22+
{"name": "kusto_database", "type": "string", "required": True, "description": ""},
23+
{"name": "client_id", "type": "string", "required": False, "description": "only necessary for AppKey auth"},
24+
{"name": "client_secret", "type": "string", "required": False, "description": "only necessary for AppKey auth"},
25+
{"name": "tenant_id", "type": "string", "required": False, "description": "only necessary for AppKey auth"}
2626
]
2727
return params_list
2828

@@ -53,10 +53,47 @@ def query(self, kql: str) -> pd.DataFrame:
5353
return dataframe_from_result_table(result.primary_results[0])
5454

5555
def list_tables(self) -> List[Dict[str, Any]]:
56+
57+
58+
# first list functions (views)
59+
query = ".show functions"
60+
function_result_df = self.query(query)
61+
62+
functions = []
63+
for func in function_result_df.to_dict(orient="records"):
64+
func_name = func['Name']
65+
result = self.query(f".show function ['{func_name}'] schema as json").to_dict(orient="records")
66+
schema = json.loads(result[0]['Schema'])
67+
parameters = schema['InputParameters']
68+
columns = [{
69+
'name': r["Name"],
70+
'type': r["Type"]
71+
} for r in schema['OutputColumns']]
72+
73+
# skip functions with parameters at the moment
74+
if len(parameters) > 0:
75+
continue
76+
77+
sample_query = f"['{func_name}'] | take {10}"
78+
sample_result = self.query(sample_query).to_dict(orient="records")
79+
80+
function_metadata = {
81+
"row_count": 0,
82+
"columns": columns,
83+
"parameters": parameters,
84+
"sample_rows": sample_result
85+
}
86+
functions.append({
87+
"type": "function",
88+
"name": func_name,
89+
"metadata": function_metadata
90+
})
91+
92+
# then list tables
5693
query = ".show tables"
5794
tables_df = self.query(query)
5895

59-
results = []
96+
tables = []
6097
for table in tables_df.to_dict(orient="records"):
6198
table_name = table['TableName']
6299
schema_result = self.query(f".show table ['{table_name}'] schema as json").to_dict(orient="records")
@@ -77,12 +114,13 @@ def list_tables(self) -> List[Dict[str, Any]]:
77114
"sample_rows": sample_result
78115
}
79116

80-
results.append({
117+
tables.append({
118+
"type": "table",
81119
"name": table_name,
82120
"metadata": table_metadata
83121
})
84122

85-
return results
123+
return functions + tables
86124

87125
def ingest_data(self, table_name: str, name_as: str = None, size: int = 5000000) -> pd.DataFrame:
88126
if name_as is None:

py-src/data_formulator/data_loader/mysql_data_loader.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ class MySQLDataLoader(ExternalDataLoader):
1111
@staticmethod
1212
def list_params() -> bool:
1313
params_list = [
14-
{"name": "user", "type": "string", "required": True, "default": "root"},
15-
{"name": "password", "type": "string", "required": False, "default": ""},
16-
{"name": "host", "type": "string", "required": True, "default": "localhost"},
17-
{"name": "database", "type": "string", "required": True, "default": "mysql"}
14+
{"name": "user", "type": "string", "required": True, "default": "root", "description": ""},
15+
{"name": "password", "type": "string", "required": False, "default": "", "description": "leave blank for no password"},
16+
{"name": "host", "type": "string", "required": True, "default": "localhost", "description": ""},
17+
{"name": "database", "type": "string", "required": True, "default": "mysql", "description": ""}
1818
]
1919
return params_list
2020

py-src/data_formulator/tables_routes.py

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
from pathlib import Path
1717

1818
from data_formulator.db_manager import db_manager
19-
from data_formulator.data_loader.external_data_loader import ExternalDataLoader
20-
from data_formulator.data_loader.mysql_data_loader import MySQLDataLoader
21-
from data_formulator.data_loader.kusto_data_loader import KustoDataLoader
19+
from data_formulator.data_loader import DATA_LOADERS
2220

2321
import re
2422
from typing import Tuple
@@ -701,6 +699,9 @@ def sanitize_db_error_message(error: Exception) -> Tuple[str, int]:
701699
# File errors
702700
r"No such file": ("File not found", 404),
703701
r"Permission denied": ("Access denied", 403),
702+
703+
# Data loader errors
704+
r"Entity ID": ("Entity ID not found", 500),
704705
}
705706

706707
# Check if error matches any safe pattern
@@ -715,34 +716,20 @@ def sanitize_db_error_message(error: Exception) -> Tuple[str, int]:
715716
return "An unexpected error occurred", 500
716717

717718

718-
719-
available_data_loaders = {
720-
'mysql': MySQLDataLoader,
721-
'kusto': KustoDataLoader
722-
}
723-
724-
@tables_bp.route('/data-loader/list-params', methods=['POST'])
725-
def data_loader_list_params():
726-
"""List params for a data loader"""
719+
@tables_bp.route('/data-loader/list-data-loaders', methods=['GET'])
720+
def data_loader_list_data_loaders():
721+
"""List all available data loaders"""
727722

728723
try:
729-
data = request.get_json()
730-
data_loader_type = data.get('data_loader_type')
731-
732-
if data_loader_type not in available_data_loaders:
733-
return jsonify({"status": "error", "message": f"Invalid data loader type. Must be one of: {', '.join(available_data_loaders.keys())}"}), 400
734-
735-
data_loader = available_data_loaders[data_loader_type]
736-
737-
params = data_loader.list_params()
738-
739724
return jsonify({
740725
"status": "success",
741-
"params": params
726+
"data_loaders": {
727+
name: data_loader.list_params()
728+
for name, data_loader in DATA_LOADERS.items()
729+
}
742730
})
743-
744731
except Exception as e:
745-
logger.error(f"Error listing params for data loader: {str(e)}")
732+
logger.error(f"Error listing data loaders: {str(e)}")
746733
safe_msg, status_code = sanitize_db_error_message(e)
747734
return jsonify({
748735
"status": "error",
@@ -758,11 +745,11 @@ def data_loader_list_tables():
758745
data_loader_type = data.get('data_loader_type')
759746
data_loader_params = data.get('data_loader_params')
760747

761-
if data_loader_type not in available_data_loaders:
762-
return jsonify({"status": "error", "message": f"Invalid data loader type. Must be one of: {', '.join(available_data_loaders.keys())}"}), 400
748+
if data_loader_type not in DATA_LOADERS:
749+
return jsonify({"status": "error", "message": f"Invalid data loader type. Must be one of: {', '.join(DATA_LOADERS.keys())}"}), 400
763750

764751
with db_manager.connection(session['session_id']) as duck_db_conn:
765-
data_loader = available_data_loaders[data_loader_type](data_loader_params, duck_db_conn)
752+
data_loader = DATA_LOADERS[data_loader_type](data_loader_params, duck_db_conn)
766753
tables = data_loader.list_tables()
767754

768755
return jsonify({
@@ -772,7 +759,7 @@ def data_loader_list_tables():
772759

773760
except Exception as e:
774761
logger.error(f"Error listing tables from data loader: {str(e)}")
775-
print(traceback.format_exc())
762+
#print(traceback.format_exc())
776763
safe_msg, status_code = sanitize_db_error_message(e)
777764
return jsonify({
778765
"status": "error",
@@ -790,11 +777,11 @@ def data_loader_ingest_data():
790777
data_loader_params = data.get('data_loader_params')
791778
table_name = data.get('table_name')
792779

793-
if data_loader_type not in available_data_loaders:
794-
return jsonify({"status": "error", "message": f"Invalid data loader type. Must be one of: {', '.join(available_data_loaders.keys())}"}), 400
780+
if data_loader_type not in DATA_LOADERS:
781+
return jsonify({"status": "error", "message": f"Invalid data loader type. Must be one of: {', '.join(DATA_LOADERS.keys())}"}), 400
795782

796783
with db_manager.connection(session['session_id']) as duck_db_conn:
797-
data_loader = available_data_loaders[data_loader_type](data_loader_params, duck_db_conn)
784+
data_loader = DATA_LOADERS[data_loader_type](data_loader_params, duck_db_conn)
798785
data_loader.ingest_data(table_name)
799786

800787
return jsonify({

src/app/utils.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ export function getUrls() {
6666
QUERY_TABLE: `/api/tables/query`,
6767
SAMPLE_TABLE: `/api/tables/sample-table`,
6868

69-
DATA_LOADER_LIST_PARAMS: `/api/tables/data-loader/list-params`,
69+
DATA_LOADER_LIST_DATA_LOADERS: `/api/tables/data-loader/list-data-loaders`,
7070
DATA_LOADER_LIST_TABLES: `/api/tables/data-loader/list-tables`,
7171
DATA_LOADER_INGEST_DATA: `/api/tables/data-loader/ingest-data`,
7272
};

0 commit comments

Comments
 (0)