@@ -65,24 +65,28 @@ def connect(self, db_params: Dict[str, Any]):
6565 )
6666
6767 self .base_path = path
68-
68+
6969 # 1. Determine connection type
70- db_file = ":memory:" # Default to in-memory
70+ db_file = ":memory:" # Default to in-memory
7171 read_only = False
72-
72+
7373 if path .endswith ((".db" , ".duckdb" )):
7474 db_file = path
7575 read_only = True
7676 self .is_directory_scan = False
7777 logger .info (f"Connecting to persistent DuckDB file: '{ path } '" )
7878 else :
7979 # Path is a file pattern or directory, so use an in-memory DB for querying.
80- logger .info (f"Connecting to in-memory DuckDB for path: '{ path } '" )
80+ logger .info (
81+ f"Connecting to in-memory DuckDB for path: '{ path } '"
82+ )
8183 if path .endswith (("/" ) or "*" in path ):
82- self .is_directory_scan = True
83-
84+ self .is_directory_scan = True
85+
8486 # 2. Establish connection
85- self .connection = duckdb .connect (database = db_file , read_only = read_only )
87+ self .connection = duckdb .connect (
88+ database = db_file , read_only = read_only
89+ )
8690 self .cursor = self .connection .cursor ()
8791
8892 # 3. Handle S3 paths by installing the httpfs extension.
@@ -92,7 +96,9 @@ def connect(self, db_params: Dict[str, Any]):
9296 self .cursor .execute ("INSTALL httpfs; LOAD httpfs;" )
9397 logger .info ("Installed and loaded httpfs for S3 access." )
9498 except Exception as e :
95- logger .warning (f"Could not install/load httpfs, S3 access may fail: { e } " )
99+ logger .warning (
100+ f"Could not install/load httpfs, S3 access may fail: { e } "
101+ )
96102
97103 logger .info ("Successfully connected to DuckDB." )
98104 except Exception as e :
@@ -114,11 +120,11 @@ def _get_full_path(self, table_name: str) -> str:
114120 """
115121 if self .is_directory_scan :
116122 # Ensure a single slash between the base path and the file name.
117- if self .base_path .endswith ('/' ):
123+ if self .base_path .endswith ("/" ):
118124 return f"{ self .base_path } { table_name } "
119125 else :
120126 return f"{ self .base_path } /{ table_name } "
121-
127+
122128 # If not a directory scan, the base_path is the full file/pattern path.
123129 return self .base_path
124130
@@ -138,7 +144,9 @@ def get_tables(self) -> List[str]:
138144
139145 # Case 1: Persistent .db file. Query the schema for tables and views.
140146 if self .base_path .endswith ((".db" , ".duckdb" )):
141- logger .info (f"Fetching tables and views from DB: '{ self .base_path } '" )
147+ logger .info (
148+ f"Fetching tables and views from DB: '{ self .base_path } '"
149+ )
142150 self .cursor .execute ("SHOW ALL TABLES;" )
143151 tables = [row [0 ] for row in self .cursor .fetchall ()]
144152 logger .info (f"Found { len (tables )} tables/views." )
@@ -147,25 +155,30 @@ def get_tables(self) -> List[str]:
147155 # Case 2: Directory scan. Use glob to find files in the directory.
148156 if self .is_directory_scan :
149157 glob_func = "s3_glob" if self .is_s3 else "glob"
150-
158+
151159 # Ensure path ends with a wildcard for globbing
152160 glob_path = self .base_path
153161 if not glob_path .endswith (("*" , "*/" )):
154- if not glob_path .endswith ('/' ):
155- glob_path += "/"
156- glob_path += "*.*" # Glob for common file types
157-
158- query = f"SELECT basename(file_name) FROM { glob_func } ('{ glob_path } ')"
162+ if not glob_path .endswith ("/" ):
163+ glob_path += "/"
164+ glob_path += "*.*" # Glob for common file types
165+
166+ query = (
167+ f"SELECT basename(file_name) FROM { glob_func } ('{ glob_path } ')"
168+ )
159169 logger .info (f"Globbing for files using query: { query } " )
160-
170+
161171 try :
162172 self .cursor .execute (query )
163173 # Return just the file names, not the full path, as "tables".
164174 tables = [row [0 ] for row in self .cursor .fetchall ()]
165175 logger .info (f"Found { len (tables )} files in directory." )
166176 return tables
167177 except Exception as e :
168- logger .error (f"Failed to glob files at '{ self .base_path } ': { e } " , exc_info = True )
178+ logger .error (
179+ f"Failed to glob files at '{ self .base_path } ': { e } " ,
180+ exc_info = True ,
181+ )
169182 raise ConnectorError (f"Failed to list files: { e } " )
170183
171184 # Case 3: Single file or pattern. Return the path itself as the "table".
@@ -207,10 +220,17 @@ def get_columns(self, table_name: str) -> List[Dict[str, str]]:
207220 return columns
208221
209222 except Exception as e :
210- logger .error (f"Failed to fetch columns for '{ table_name } ': { e } " , exc_info = True )
211- raise ConnectorError (f"Failed to fetch columns for '{ table_name } ': { e } " ) from e
223+ logger .error (
224+ f"Failed to fetch columns for '{ table_name } ': { e } " ,
225+ exc_info = True ,
226+ )
227+ raise ConnectorError (
228+ f"Failed to fetch columns for '{ table_name } ': { e } "
229+ ) from e
212230
213- def get_column_profile (self , table_name : str , column_name : str ) -> Dict [str , Any ]:
231+ def get_column_profile (
232+ self , table_name : str , column_name : str
233+ ) -> Dict [str , Any ]:
214234 """
215235 Generates profile stats for a column in a DuckDB table or file.
216236
@@ -230,7 +250,7 @@ def get_column_profile(self, table_name: str, column_name: str) -> Dict[str, Any
230250 # 1. Determine the source (a table name or a file-based subquery).
231251 source_query = ""
232252 if self .base_path .endswith ((".db" , ".duckdb" )):
233- source_query = f'"{ table_name } "' # It's a table
253+ source_query = f'"{ table_name } "' # It's a table
234254 else :
235255 # It's a file, so create a subquery using read_auto.
236256 full_path = self ._get_full_path (table_name )
@@ -249,14 +269,21 @@ def get_column_profile(self, table_name: str, column_name: str) -> Dict[str, Any
249269 try :
250270 self .cursor .execute (query )
251271 row = self .cursor .fetchone ()
252-
272+
253273 total_count = row [0 ]
254274 null_count = row [1 ] if row [1 ] is not None else 0
255275 distinct_count = row [2 ] if row [2 ] is not None else 0
256276
257277 if total_count == 0 :
258- logger .info (f" - Profile for '{ table_name } .{ column_name } ': Table is empty." )
259- return {"null_ratio" : 0 , "distinct_count" : 0 , "is_unique" : True , "total_count" : 0 }
278+ logger .info (
279+ f" - Profile for '{ table_name } .{ column_name } ': Table is empty."
280+ )
281+ return {
282+ "null_ratio" : 0 ,
283+ "distinct_count" : 0 ,
284+ "is_unique" : True ,
285+ "total_count" : 0 ,
286+ }
260287
261288 null_ratio = null_count / total_count
262289 is_unique = (distinct_count == total_count ) and (null_count == 0 )
@@ -267,15 +294,22 @@ def get_column_profile(self, table_name: str, column_name: str) -> Dict[str, Any
267294 "distinct_count" : distinct_count ,
268295 "is_unique" : is_unique ,
269296 }
270- logger .info (f" - Profile for '{ table_name } .{ column_name } ': { stats } " )
297+ logger .info (
298+ f" - Profile for '{ table_name } .{ column_name } ': { stats } "
299+ )
271300 return stats
272301
273302 except Exception as e :
274303 logger .warning (
275304 f"Failed to profile column '{ table_name } .{ column_name } ': { e } " ,
276- exc_info = True
305+ exc_info = True ,
277306 )
278- return {"null_ratio" : "N/A" , "distinct_count" : "N/A" , "is_unique" : False , "total_count" : "N/A" }
307+ return {
308+ "null_ratio" : "N/A" ,
309+ "distinct_count" : "N/A" ,
310+ "is_unique" : False ,
311+ "total_count" : "N/A" ,
312+ }
279313
280314 def get_views (self ) -> List [Dict [str , str ]]:
281315 """
@@ -306,15 +340,18 @@ def get_foreign_keys(self) -> List[Dict[str, str]]:
306340 """
307341 if not self .cursor :
308342 raise ConnectorError ("Not connected to a DuckDB database." )
309-
343+
310344 if not self .base_path .endswith ((".db" , ".duckdb" )):
311- logger .info ("Foreign keys are not supported for file/directory scans." )
345+ logger .info (
346+ "Foreign keys are not supported for file/directory scans."
347+ )
312348 return []
313349
314350 logger .info ("Fetching foreign key relationships..." )
315351 try :
316352 # DuckDB >= 0.9.0 has a more robust constraints function.
317- self .cursor .execute ("""
353+ self .cursor .execute (
354+ """
318355 SELECT
319356 fk.table_name AS from_table,
320357 fk.column_names[1] AS from_column,
@@ -323,12 +360,15 @@ def get_foreign_keys(self) -> List[Dict[str, str]]:
323360 FROM duckdb_constraints() fk
324361 JOIN duckdb_constraints() pk ON fk.primary_key_index = pk.constraint_index
325362 WHERE fk.constraint_type = 'FOREIGN KEY'
326- """ )
363+ """
364+ )
327365 except duckdb .CatalogException :
328366 # Fallback for older DuckDB versions.
329- logger .warning ("Using legacy foreign key query for older DuckDB version." )
367+ logger .warning (
368+ "Using legacy foreign key query for older DuckDB version."
369+ )
330370 self .cursor .execute ("SELECT * FROM duckdb_foreign_keys();" )
331-
371+
332372 foreign_keys = [
333373 {
334374 "from_table" : fk [0 ],
@@ -338,7 +378,7 @@ def get_foreign_keys(self) -> List[Dict[str, str]]:
338378 }
339379 for fk in self .cursor .fetchall ()
340380 ]
341-
381+
342382 logger .info (f"Found { len (foreign_keys )} foreign key relationships." )
343383 return foreign_keys
344384
0 commit comments