Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions datacontract/engines/soda/connections/duckdb_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,21 @@ def get_duckdb_connection(
elif server.format == "csv":
columns = to_csv_types(model)
run.log_info("Using columns: " + str(columns))
csv_params = build_csv_parameters(server)
if csv_params !="":
run.log_info("Using specified CSV parameters")
else:
run.log_info("No CSV parameters specified duckdb default inference will be used")

if columns is None:
con.sql(
f"""CREATE VIEW "{model_name}" AS SELECT * FROM read_csv('{model_path}', hive_partitioning=1);"""
f"""CREATE VIEW "{model_name}" AS SELECT * FROM read_csv('{model_path}', hive_partitioning=1{csv_params});"""
)
else:
con.sql(
f"""CREATE VIEW "{model_name}" AS SELECT * FROM read_csv('{model_path}', hive_partitioning=1, columns={columns});"""
f"""CREATE VIEW "{model_name}" AS SELECT * FROM read_csv('{model_path}', hive_partitioning=1, columns={columns}{csv_params});"""
)

elif server.format == "delta":
con.sql("update extensions;") # Make sure we have the latest delta extension
con.sql(f"""CREATE VIEW "{model_name}" AS SELECT * FROM delta_scan('{model_path}');""")
Expand Down Expand Up @@ -96,6 +103,57 @@ def to_json_types(model: Model) -> dict[Any, str | None] | None:
return columns


def build_csv_parameters(server: Server) -> str:
"""Build CSV parameters string for DuckDB read_csv function.

Args:
server: Server configuration containing CSV parameters. Supports:
- infer_schema (bool): If True (default) no param string is created.
If False, creates duckdb param string based on provided parameters.
- Standard CSV params: all_varchar, comment, compression, delim/delimiter,
decimal_separator, escape, encoding, header, new_line, quote, skip

Returns:
String containing formatted CSV parameters for DuckDB read_csv function.
Returns empty string when infer_schema=True and no additional CSV parameters.
Returns comma-prefixed parameter string when parameters are present.
"""
params = []

infer_schema = getattr(server, 'infer_schema', True)

if not infer_schema:
# When infer_schema=False, process CSV parameters if they exist
csv_params = {
'all_varchar': getattr(server, 'all_varchar', None),
'comment': getattr(server, 'comment', None),
'compression': getattr(server, 'compression', None),
'delim': getattr(server, 'delim', None) or getattr(server, 'delimiter', None),
'decimal_separator': getattr(server, 'decimal_separator', None),
'escape': getattr(server, 'escape', None),
'header': getattr(server, 'header', None),
'new_line': getattr(server, 'new_line', None),
'quote': getattr(server, 'quote', None),
'skip': getattr(server, 'skip', None),
'encoding': getattr(server, 'encoding', None),
}

for param_name, param_value in csv_params.items():
if param_value is not None:
if isinstance(param_value, bool):
params.append(f"{param_name}={str(param_value).lower()}")
elif isinstance(param_value, str):
params.append(f"{param_name}='{param_value}'")
else:
params.append(f"{param_name}={param_value}")

# Return comma-prefixed string if params exist, empty string otherwise
if len(params) > 0:
return ", " + ", ".join(params)
else:
return ""


def add_nested_views(con: duckdb.DuckDBPyConnection, model_name: str, fields: Dict[str, Field] | None):
model_name = model_name.strip('"')
if fields is None:
Expand Down
Loading