1
+ import logging
2
+ import sys
1
3
from typing import Dict , Any , List
2
4
import pandas as pd
3
5
import json
4
6
import duckdb
5
7
import random
6
8
import string
9
+ from datetime import datetime
7
10
8
11
from azure .kusto .data import KustoClient , KustoConnectionStringBuilder
9
12
from azure .kusto .data .helpers import dataframe_from_result_table
10
13
11
14
from data_formulator .data_loader .external_data_loader import ExternalDataLoader , sanitize_table_name
12
15
16
+ # Configure root logger for general application logging
17
+ logging .basicConfig (
18
+ level = logging .INFO ,
19
+ format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ,
20
+ handlers = [logging .StreamHandler (sys .stdout )]
21
+ )
22
+
23
+ # Get logger for this module
24
+ logger = logging .getLogger (__name__ )
13
25
14
26
class KustoDataLoader (ExternalDataLoader ):
15
27
@@ -67,23 +79,93 @@ def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnecti
67
79
self .kusto_cluster , self .client_id , self .client_secret , self .tenant_id ))
68
80
else :
69
81
# This function provides an interface to Kusto. It uses Azure CLI auth, but you can also use other auth types.
70
- self .client = KustoClient (KustoConnectionStringBuilder .with_az_cli_authentication (self .kusto_cluster ))
82
+ cluster_url = KustoConnectionStringBuilder .with_az_cli_authentication (self .kusto_cluster )
83
+ logger .info (f"Connecting to Kusto cluster: { self .kusto_cluster } " )
84
+ self .client = KustoClient (cluster_url )
85
+ logger .info ("Using Azure CLI authentication for Kusto client. Ensure you have run `az login` in your terminal." )
71
86
except Exception as e :
72
- raise Exception (f"Error creating Kusto client: { e } , please authenticate with Azure CLI when starting the app. " )
73
-
87
+ logger . error (f"Error creating Kusto client: { e } " )
88
+ raise Exception ( f"Error creating Kusto client: { e } , please authenticate with Azure CLI when starting the app." )
74
89
self .duck_db_conn = duck_db_conn
75
90
91
+ def _convert_kusto_datetime_columns (self , df : pd .DataFrame ) -> pd .DataFrame :
92
+ """Convert Kusto datetime columns to proper pandas datetime format"""
93
+ logger .info (f"Processing DataFrame with columns: { list (df .columns )} " )
94
+ logger .info (f"Column dtypes before conversion: { dict (df .dtypes )} " )
95
+
96
+ for col in df .columns :
97
+ original_dtype = df [col ].dtype
98
+
99
+ if df [col ].dtype == 'object' :
100
+ # Try to identify datetime columns by checking sample values
101
+ sample_values = df [col ].dropna ().head (3 )
102
+ if len (sample_values ) > 0 :
103
+ # Check if values look like datetime strings or timestamp numbers
104
+ first_val = sample_values .iloc [0 ]
105
+
106
+ # Handle Kusto datetime format (ISO 8601 strings)
107
+ if isinstance (first_val , str ) and ('T' in first_val or '-' in first_val ):
108
+ try :
109
+ # Try to parse as datetime
110
+ pd .to_datetime (sample_values .iloc [0 ])
111
+ logger .info (f"Converting column '{ col } ' from string to datetime" )
112
+ df [col ] = pd .to_datetime (df [col ], errors = 'coerce' , utc = True ).dt .tz_localize (None )
113
+ except Exception as e :
114
+ logger .debug (f"Failed to convert column '{ col } ' as string datetime: { e } " )
115
+
116
+ # Handle numeric timestamps (Unix timestamps in various formats)
117
+ elif isinstance (first_val , (int , float )) and first_val > 1000000000 :
118
+ try :
119
+ # Try different timestamp formats
120
+ if first_val > 1e15 : # Likely microseconds since epoch
121
+ logger .info (f"Converting column '{ col } ' from microseconds timestamp to datetime" )
122
+ df [col ] = pd .to_datetime (df [col ], unit = 'us' , errors = 'coerce' , utc = True ).dt .tz_localize (None )
123
+ elif first_val > 1e12 : # Likely milliseconds since epoch
124
+ logger .info (f"Converting column '{ col } ' from milliseconds timestamp to datetime" )
125
+ df [col ] = pd .to_datetime (df [col ], unit = 'ms' , errors = 'coerce' , utc = True ).dt .tz_localize (None )
126
+ else : # Likely seconds since epoch
127
+ logger .info (f"Converting column '{ col } ' from seconds timestamp to datetime" )
128
+ df [col ] = pd .to_datetime (df [col ], unit = 's' , errors = 'coerce' , utc = True ).dt .tz_localize (None )
129
+ except Exception as e :
130
+ logger .debug (f"Failed to convert column '{ col } ' as numeric timestamp: { e } " )
131
+
132
+ # Handle datetime64 columns that might have timezone info
133
+ elif pd .api .types .is_datetime64_any_dtype (df [col ]):
134
+ # Ensure timezone-aware datetimes are properly handled
135
+ if hasattr (df [col ].dt , 'tz' ) and df [col ].dt .tz is not None :
136
+ logger .info (f"Converting timezone-aware datetime column '{ col } ' to UTC" )
137
+ df [col ] = df [col ].dt .tz_convert ('UTC' ).dt .tz_localize (None )
138
+
139
+ # Log if conversion happened
140
+ if original_dtype != df [col ].dtype :
141
+ logger .info (f"Column '{ col } ' converted from { original_dtype } to { df [col ].dtype } " )
142
+
143
+ logger .info (f"Column dtypes after conversion: { dict (df .dtypes )} " )
144
+ return df
145
+
76
146
def query (self , kql : str ) -> pd .DataFrame :
147
+ logger .info (f"Executing KQL query: { kql } on database { self .kusto_database } " )
77
148
result = self .client .execute (self .kusto_database , kql )
78
- return dataframe_from_result_table (result .primary_results [0 ])
149
+ logger .info (f"Query executed successfully, returning results." )
150
+ df = dataframe_from_result_table (result .primary_results [0 ])
151
+
152
+ # Convert datetime columns properly
153
+ df = self ._convert_kusto_datetime_columns (df )
154
+
155
+ return df
79
156
80
- def list_tables (self ) -> List [Dict [str , Any ]]:
157
+ def list_tables (self , table_filter : str = None ) -> List [Dict [str , Any ]]:
81
158
query = ".show tables"
82
159
tables_df = self .query (query )
83
160
84
161
tables = []
85
162
for table in tables_df .to_dict (orient = "records" ):
86
163
table_name = table ['TableName' ]
164
+
165
+ # Apply table filter if provided
166
+ if table_filter and table_filter .lower () not in table_name .lower ():
167
+ continue
168
+
87
169
schema_result = self .query (f".show table ['{ table_name } '] schema as json" ).to_dict (orient = "records" )
88
170
columns = [{
89
171
'name' : r ["Name" ],
@@ -94,7 +176,10 @@ def list_tables(self) -> List[Dict[str, Any]]:
94
176
row_count = row_count_result [0 ]["TotalRowCount" ]
95
177
96
178
sample_query = f"['{ table_name } '] | take { 5 } "
97
- sample_result = json .loads (self .query (sample_query ).to_json (orient = "records" ))
179
+ sample_df = self .query (sample_query )
180
+
181
+ # Convert sample data to JSON with proper datetime handling
182
+ sample_result = json .loads (sample_df .to_json (orient = "records" , date_format = 'iso' ))
98
183
99
184
table_metadata = {
100
185
"row_count" : row_count ,
@@ -159,7 +244,8 @@ def ingest_data(self, table_name: str, name_as: str = None, size: int = 5000000)
159
244
total_rows_ingested += len (chunk_df )
160
245
161
246
def view_query_sample (self , query : str ) -> str :
162
- return json .loads (self .query (query ).head (10 ).to_json (orient = "records" ))
247
+ df = self .query (query ).head (10 )
248
+ return json .loads (df .to_json (orient = "records" , date_format = 'iso' ))
163
249
164
250
def ingest_data_from_query (self , query : str , name_as : str ) -> pd .DataFrame :
165
251
# Sanitize the table name for SQL compatibility
0 commit comments