Skip to content

Commit 9460b3e

Browse files
committed
Merge remote-tracking branch 'origin/dev' into dev
2 parents 257e162 + 2ba8043 commit 9460b3e

File tree

7 files changed

+265
-37
lines changed

7 files changed

+265
-37
lines changed

py-src/data_formulator/data_loader/azure_blob_data_loader.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def _setup_azure_authentication(self):
116116
)
117117
""")
118118

119-
def list_tables(self) -> List[Dict[str, Any]]:
119+
def list_tables(self, table_filter: str = None) -> List[Dict[str, Any]]:
120120
# Use Azure SDK to list blobs in the container
121121
from azure.storage.blob import BlobServiceClient
122122

@@ -145,8 +145,7 @@ def list_tables(self) -> List[Dict[str, Any]]:
145145
container_client = blob_service_client.get_container_client(self.container_name)
146146

147147
# List blobs in the container
148-
blob_list = container_client.list_blobs()
149-
148+
blob_list = container_client.list_blobs()
150149
results = []
151150

152151
for blob in blob_list:
@@ -156,6 +155,10 @@ def list_tables(self) -> List[Dict[str, Any]]:
156155
if blob_name.endswith('/') or not self._is_supported_file(blob_name):
157156
continue
158157

158+
# Apply table filter if provided
159+
if table_filter and table_filter.lower() not in blob_name.lower():
160+
continue
161+
159162
# Create Azure blob URL
160163
azure_url = f"az://{self.account_name}.{self.endpoint}/{self.container_name}/{blob_name}"
161164

py-src/data_formulator/data_loader/external_data_loader.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,18 @@ def sanitize_table_name(name_as: str) -> str:
4444
class ExternalDataLoader(ABC):
4545

4646
def ingest_df_to_duckdb(self, df: pd.DataFrame, table_name: str):
47+
# Log DataFrame info before ingestion
48+
import logging
49+
logger = logging.getLogger(__name__)
50+
logger.info(f"Ingesting DataFrame to DuckDB table '{table_name}'")
51+
logger.info(f"DataFrame shape: {df.shape}")
52+
logger.info(f"DataFrame dtypes: {dict(df.dtypes)}")
53+
54+
# Log sample of datetime columns
55+
for col in df.columns:
56+
if pd.api.types.is_datetime64_any_dtype(df[col]):
57+
sample_values = df[col].dropna().head(3)
58+
logger.info(f"Datetime column '{col}' sample values: {list(sample_values)}")
4759

4860
base_name = table_name
4961
counter = 1
@@ -59,8 +71,19 @@ def ingest_df_to_duckdb(self, df: pd.DataFrame, table_name: str):
5971
# Create table
6072
random_suffix = ''.join(random.choices(string.ascii_letters + string.digits, k=6))
6173
self.duck_db_conn.register(f'df_temp_{random_suffix}', df)
74+
75+
# Log table schema after registration
76+
try:
77+
schema_info = self.duck_db_conn.execute(f"DESCRIBE df_temp_{random_suffix}").fetchall()
78+
logger.info(f"DuckDB table schema: {schema_info}")
79+
except Exception as e:
80+
logger.warning(f"Could not get schema info: {e}")
81+
6282
self.duck_db_conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM df_temp_{random_suffix}")
6383
self.duck_db_conn.execute(f"DROP VIEW df_temp_{random_suffix}") # Drop the temporary view after creating the table
84+
85+
logger.info(f"Successfully created DuckDB table '{table_name}'")
86+
6487

6588
@staticmethod
6689
@abstractmethod
@@ -69,15 +92,14 @@ def list_params() -> List[Dict[str, Any]]:
6992

7093
@staticmethod
7194
@abstractmethod
72-
def auth_instructions() -> str:
73-
pass
95+
def auth_instructions() -> str: pass
7496

7597
@abstractmethod
7698
def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnection):
7799
pass
78100

79101
@abstractmethod
80-
def list_tables(self) -> List[Dict[str, Any]]:
102+
def list_tables(self, table_filter: str = None) -> List[Dict[str, Any]]:
81103
# should include: table_name, column_names, column_types, sample_data
82104
pass
83105

py-src/data_formulator/data_loader/kusto_data_loader.py

Lines changed: 93 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,27 @@
1+
import logging
2+
import sys
13
from typing import Dict, Any, List
24
import pandas as pd
35
import json
46
import duckdb
57
import random
68
import string
9+
from datetime import datetime
710

811
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
912
from azure.kusto.data.helpers import dataframe_from_result_table
1013

1114
from data_formulator.data_loader.external_data_loader import ExternalDataLoader, sanitize_table_name
1215

16+
# Configure root logger for general application logging
17+
logging.basicConfig(
18+
level=logging.INFO,
19+
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
20+
handlers=[logging.StreamHandler(sys.stdout)]
21+
)
22+
23+
# Get logger for this module
24+
logger = logging.getLogger(__name__)
1325

1426
class KustoDataLoader(ExternalDataLoader):
1527

@@ -67,23 +79,93 @@ def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnecti
6779
self.kusto_cluster, self.client_id, self.client_secret, self.tenant_id))
6880
else:
6981
# This function provides an interface to Kusto. It uses Azure CLI auth, but you can also use other auth types.
70-
self.client = KustoClient(KustoConnectionStringBuilder.with_az_cli_authentication(self.kusto_cluster))
82+
cluster_url = KustoConnectionStringBuilder.with_az_cli_authentication(self.kusto_cluster)
83+
logger.info(f"Connecting to Kusto cluster: {self.kusto_cluster}")
84+
self.client = KustoClient(cluster_url)
85+
logger.info("Using Azure CLI authentication for Kusto client. Ensure you have run `az login` in your terminal.")
7186
except Exception as e:
72-
raise Exception(f"Error creating Kusto client: {e}, please authenticate with Azure CLI when starting the app.")
73-
87+
logger.error(f"Error creating Kusto client: {e}")
88+
raise Exception(f"Error creating Kusto client: {e}, please authenticate with Azure CLI when starting the app.")
7489
self.duck_db_conn = duck_db_conn
7590

91+
def _convert_kusto_datetime_columns(self, df: pd.DataFrame) -> pd.DataFrame:
92+
"""Convert Kusto datetime columns to proper pandas datetime format"""
93+
logger.info(f"Processing DataFrame with columns: {list(df.columns)}")
94+
logger.info(f"Column dtypes before conversion: {dict(df.dtypes)}")
95+
96+
for col in df.columns:
97+
original_dtype = df[col].dtype
98+
99+
if df[col].dtype == 'object':
100+
# Try to identify datetime columns by checking sample values
101+
sample_values = df[col].dropna().head(3)
102+
if len(sample_values) > 0:
103+
# Check if values look like datetime strings or timestamp numbers
104+
first_val = sample_values.iloc[0]
105+
106+
# Handle Kusto datetime format (ISO 8601 strings)
107+
if isinstance(first_val, str) and ('T' in first_val or '-' in first_val):
108+
try:
109+
# Try to parse as datetime
110+
pd.to_datetime(sample_values.iloc[0])
111+
logger.info(f"Converting column '{col}' from string to datetime")
112+
df[col] = pd.to_datetime(df[col], errors='coerce', utc=True).dt.tz_localize(None)
113+
except Exception as e:
114+
logger.debug(f"Failed to convert column '{col}' as string datetime: {e}")
115+
116+
# Handle numeric timestamps (Unix timestamps in various formats)
117+
elif isinstance(first_val, (int, float)) and first_val > 1000000000:
118+
try:
119+
# Try different timestamp formats
120+
if first_val > 1e15: # Likely microseconds since epoch
121+
logger.info(f"Converting column '{col}' from microseconds timestamp to datetime")
122+
df[col] = pd.to_datetime(df[col], unit='us', errors='coerce', utc=True).dt.tz_localize(None)
123+
elif first_val > 1e12: # Likely milliseconds since epoch
124+
logger.info(f"Converting column '{col}' from milliseconds timestamp to datetime")
125+
df[col] = pd.to_datetime(df[col], unit='ms', errors='coerce', utc=True).dt.tz_localize(None)
126+
else: # Likely seconds since epoch
127+
logger.info(f"Converting column '{col}' from seconds timestamp to datetime")
128+
df[col] = pd.to_datetime(df[col], unit='s', errors='coerce', utc=True).dt.tz_localize(None)
129+
except Exception as e:
130+
logger.debug(f"Failed to convert column '{col}' as numeric timestamp: {e}")
131+
132+
# Handle datetime64 columns that might have timezone info
133+
elif pd.api.types.is_datetime64_any_dtype(df[col]):
134+
# Ensure timezone-aware datetimes are properly handled
135+
if hasattr(df[col].dt, 'tz') and df[col].dt.tz is not None:
136+
logger.info(f"Converting timezone-aware datetime column '{col}' to UTC")
137+
df[col] = df[col].dt.tz_convert('UTC').dt.tz_localize(None)
138+
139+
# Log if conversion happened
140+
if original_dtype != df[col].dtype:
141+
logger.info(f"Column '{col}' converted from {original_dtype} to {df[col].dtype}")
142+
143+
logger.info(f"Column dtypes after conversion: {dict(df.dtypes)}")
144+
return df
145+
76146
def query(self, kql: str) -> pd.DataFrame:
147+
logger.info(f"Executing KQL query: {kql} on database {self.kusto_database}")
77148
result = self.client.execute(self.kusto_database, kql)
78-
return dataframe_from_result_table(result.primary_results[0])
149+
logger.info(f"Query executed successfully, returning results.")
150+
df = dataframe_from_result_table(result.primary_results[0])
151+
152+
# Convert datetime columns properly
153+
df = self._convert_kusto_datetime_columns(df)
154+
155+
return df
79156

80-
def list_tables(self) -> List[Dict[str, Any]]:
157+
def list_tables(self, table_filter: str = None) -> List[Dict[str, Any]]:
81158
query = ".show tables"
82159
tables_df = self.query(query)
83160

84161
tables = []
85162
for table in tables_df.to_dict(orient="records"):
86163
table_name = table['TableName']
164+
165+
# Apply table filter if provided
166+
if table_filter and table_filter.lower() not in table_name.lower():
167+
continue
168+
87169
schema_result = self.query(f".show table ['{table_name}'] schema as json").to_dict(orient="records")
88170
columns = [{
89171
'name': r["Name"],
@@ -94,7 +176,10 @@ def list_tables(self) -> List[Dict[str, Any]]:
94176
row_count = row_count_result[0]["TotalRowCount"]
95177

96178
sample_query = f"['{table_name}'] | take {5}"
97-
sample_result = json.loads(self.query(sample_query).to_json(orient="records"))
179+
sample_df = self.query(sample_query)
180+
181+
# Convert sample data to JSON with proper datetime handling
182+
sample_result = json.loads(sample_df.to_json(orient="records", date_format='iso'))
98183

99184
table_metadata = {
100185
"row_count": row_count,
@@ -159,7 +244,8 @@ def ingest_data(self, table_name: str, name_as: str = None, size: int = 5000000)
159244
total_rows_ingested += len(chunk_df)
160245

161246
def view_query_sample(self, query: str) -> str:
162-
return json.loads(self.query(query).head(10).to_json(orient="records"))
247+
df = self.query(query).head(10)
248+
return json.loads(df.to_json(orient="records", date_format='iso'))
163249

164250
def ingest_data_from_query(self, query: str, name_as: str) -> pd.DataFrame:
165251
# Sanitize the table name for SQL compatibility

py-src/data_formulator/data_loader/mysql_data_loader.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,10 @@ def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnecti
6161
try:
6262
self.duck_db_conn.execute("DETACH mysqldb;")
6363
except:
64-
pass # Ignore if mysqldb doesn't exist
65-
66-
# Register MySQL connection
64+
pass # Ignore if mysqldb doesn't exist # Register MySQL connection
6765
self.duck_db_conn.execute(f"ATTACH '{attatch_string}' AS mysqldb (TYPE mysql);")
6866

69-
def list_tables(self):
67+
def list_tables(self, table_filter: str = None):
7068
tables_df = self.duck_db_conn.execute(f"""
7169
SELECT TABLE_SCHEMA, TABLE_NAME FROM mysqldb.information_schema.tables
7270
WHERE table_schema NOT IN ('information_schema', 'mysql', 'performance_schema', 'sys')
@@ -78,6 +76,10 @@ def list_tables(self):
7876

7977
full_table_name = f"mysqldb.{schema}.{table_name}"
8078

79+
# Apply table filter if provided
80+
if table_filter and table_filter.lower() not in table_name.lower():
81+
continue
82+
8183
# Get column information using DuckDB's information schema
8284
columns_df = self.duck_db_conn.execute(f"DESCRIBE {full_table_name}").df()
8385
columns = [{

py-src/data_formulator/data_loader/s3_data_loader.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnecti
7878
if self.aws_session_token: # Add this block
7979
self.duck_db_conn.execute(f"SET s3_session_token='{self.aws_session_token}'")
8080

81-
def list_tables(self) -> List[Dict[str, Any]]:
81+
def list_tables(self, table_filter: str = None) -> List[Dict[str, Any]]:
8282
# Use boto3 to list objects in the bucket
8383
import boto3
8484

@@ -103,6 +103,10 @@ def list_tables(self) -> List[Dict[str, Any]]:
103103
if key.endswith('/') or not self._is_supported_file(key):
104104
continue
105105

106+
# Apply table filter if provided
107+
if table_filter and table_filter.lower() not in key.lower():
108+
continue
109+
106110
# Create S3 URL
107111
s3_url = f"s3://{self.bucket}/{key}"
108112

0 commit comments

Comments
 (0)