Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 96 additions & 9 deletions datacontract/engines/soda/connections/duckdb.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import io
import os
import uuid

import duckdb

from datacontract.export.csv_type_converter import convert_to_duckdb_csv_type
from datacontract.model.run import Run
from datacontract.model.run import Check, ResultEnum, Run


def get_duckdb_connection(data_contract, server, run: Run):
Expand Down Expand Up @@ -42,14 +44,80 @@ def get_duckdb_connection(data_contract, server, run: Run):
elif server.format == "csv":
columns = to_csv_types(model)
run.log_info("Using columns: " + str(columns))
if columns is None:
con.sql(
f"""CREATE VIEW "{model_name}" AS SELECT * FROM read_csv('{model_path}', hive_partitioning=1);"""
)
else:
con.sql(
f"""CREATE VIEW "{model_name}" AS SELECT * FROM read_csv('{model_path}', hive_partitioning=1, columns={columns});"""
)

# Start with the required parameter.
params = ["hive_partitioning=1"]

# Define a mapping for CSV parameters: server attribute -> read_csv parameter name.
param_mapping = {
"delimiter": "delim", # Map server.delimiter to 'delim'
"header": "header",
"escape": "escape",
"allVarchar": "all_varchar",
"allowQuotedNulls": "allow_quoted_nulls",
"dateformat": "dateformat",
"decimalSeparator": "decimal_separator",
"newLine": "new_line",
"timestampformat": "timestampformat",
"quote": "quote",
}
for server_attr, read_csv_param in param_mapping.items():
value = getattr(server, server_attr, None)
if value is not None:
# Wrap string values in quotes.
if isinstance(value, str):
params.append(f"{read_csv_param}='{value}'")
else:
params.append(f"{read_csv_param}={value}")

# Sniff out columns, if available:
has_header = getattr(server, "header", True)
if columns is not None and (has_header or has_header is None):
csv_columns = sniff_csv_header(model_path, server)
difference = set(csv_columns) - set(columns.keys())
same_order = list(columns.keys()) == csv_columns[: len(columns)]
if not same_order:
run.checks.append(
Check(
id=str(uuid.uuid4()),
model=model_name,
category="schema",
type="fields_are_same",
name="Column order mismatch",
result=ResultEnum.warning,
reason=f"Order of columns in {model_path} does not match the model.",
details=f"Expected: {'|'.join(columns.keys())}\nActual: {'|'.join(csv_columns)}",
engine="datacontract",
)
)
if len(difference) > 0:
run.checks.append(
Check(
id=str(uuid.uuid4()),
model=model_name,
category="schema",
type="fields_are_same",
name="Dataset contained unexpected fields",
result=ResultEnum.warning,
reason=f"{model_path} contained unexpected fields: {', '.join(difference)}",
engine="datacontract",
)
)
columns = {k: columns.get(k, "VARCHAR") for k in csv_columns}

# Add columns if they exist.
if columns is not None:
params.append(f"columns={columns}")

# Build the parameter string.
params_str = ", ".join(params)

# Create the view with the assembled parameters.
con.sql(f"""
CREATE VIEW "{model_name}" AS
SELECT * FROM read_csv('{model_path}', {params_str});
""")

elif server.format == "delta":
con.sql("update extensions;") # Make sure we have the latest delta extension
con.sql(f"""CREATE VIEW "{model_name}" AS SELECT * FROM delta_scan('{model_path}');""")
Expand All @@ -66,6 +134,25 @@ def to_csv_types(model) -> dict:
return columns


def sniff_csv_header(model_path, server):
# Define a mapping for CSV parameters: server attribute -> duckdb.read_csv parameter name.
# Note! The parameter names in the python calls (read_csv, read_csv_auto, and from_csv_auto)
# are different from those used in the SQL statements.
param_mapping = {
"delimiter": "delimiter",
"header": "header",
"escape": "escapechar",
"decimal_separator": "decimal",
"quote": "quotechar",
}
# Remainder params are left out, as we do not care about parsing datatype for just the header.
with open(model_path, "rb") as model_file:
header_line = model_file.readline()
csv_params = {v: getattr(server, k) for (k, v) in param_mapping.items() if getattr(server, k, None) is not None}
# from_csv_auto
return duckdb.from_csv_auto(io.BytesIO(header_line), **csv_params).columns


def setup_s3_connection(con, server):
s3_region = os.getenv("DATACONTRACT_S3_REGION")
s3_access_key_id = os.getenv("DATACONTRACT_S3_ACCESS_KEY_ID")
Expand Down
9 changes: 9 additions & 0 deletions datacontract/model/data_contract_specification.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ class Server(pyd.BaseModel):
dataset: str | None = None
path: str | None = None
delimiter: str | None = None
header: bool | None = None
escape: str | None = None
allVarchar: bool | None = None
allowQuotedNulls: bool | None = None
dateformat: str | None = None
decimalSeparator: str | None = None
newLine: str | None = None
timestampformat: str | None = None
quote: str | None = None
endpointUrl: str | None = None
location: str | None = None
account: str | None = None
Expand Down
132 changes: 120 additions & 12 deletions datacontract/schemas/datacontract-1.1.0.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1212,11 +1212,47 @@
},
"delimiter": {
"type": "string",
"enum": [
"new_line",
"array"
"anyOf": [
{ "enum": ["new_line", "array"] },
{ "pattern": "^.$" }
],
"description": "Only for format = json. How multiple json documents are delimited within one file"
"description": "For JSON format, only 'new_line' or 'array' is allowed to indicate how multiple JSON documents are delimited. For CSV format, any single character can be used as the delimiter between columns. Only valid for CSV."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jochenchrist should I not modify this one and just change the one in the other repo then?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't the file here, but propose a change in the other repo.

We still can continue to use the provided keys in the data_contract_specification (as custom extensions)

},
"header": {
"type": "boolean",
"description": "Indicates whether the first row in the CSV file should be treated as column headers. Only valid for CSV."
},
"escape": {
"type": "string",
"description": "Specifies the escape character used in the CSV file to include special characters in fields. Only valid for CSV."
},
"all_varchar": {
"type": "boolean",
"description": "If true, all CSV columns are read as VARCHAR (strings), bypassing type inference. Only valid for CSV."
},
"allow_quoted_nulls": {
"type": "boolean",
"description": "If true, quoted 'NULL' values in the CSV are interpreted as SQL NULL rather than as the string 'NULL'. Only valid for CSV."
},
"dateformat": {
"type": "string",
"description": "A format string (e.g., '%Y-%m-%d') used to parse date values in the CSV. Only valid for CSV."
},
"decimal_separator": {
"type": "string",
"description": "The character used as the decimal separator in numeric CSV values (e.g., '.' or ','). Only valid for CSV."
},
"new_line": {
"type": "string",
"description": "The newline character(s) used in the CSV file (e.g., '\\n' or '\\r\\n'). Only valid for CSV."
},
"timestampformat": {
"type": "string",
"description": "A format string (e.g., '%Y-%m-%d %H:%M:%S') used to parse timestamp values in the CSV. Only valid for CSV."
},
"quote": {
"type": "string",
"description": "The character used for quoting fields in the CSV file (e.g., '\"'). Only valid for CSV."
}
},
"required": [
Expand Down Expand Up @@ -1248,11 +1284,47 @@
},
"delimiter": {
"type": "string",
"enum": [
"new_line",
"array"
"anyOf": [
{ "enum": ["new_line", "array"] },
{ "pattern": "^.$" }
],
"description": "Only for format = json. How multiple json documents are delimited within one file"
"description": "For JSON format, only 'new_line' or 'array' is allowed to indicate how multiple JSON documents are delimited. For CSV format, any single character can be used as the delimiter between columns. Only valid for CSV."
},
"header": {
"type": "boolean",
"description": "Indicates whether the first row in the CSV file should be treated as column headers. Only valid for CSV."
},
"escape": {
"type": "string",
"description": "Specifies the escape character used in the CSV file to include special characters in fields. Only valid for CSV."
},
"all_varchar": {
"type": "boolean",
"description": "If true, all CSV columns are read as VARCHAR (strings), bypassing type inference. Only valid for CSV."
},
"allow_quoted_nulls": {
"type": "boolean",
"description": "If true, quoted 'NULL' values in the CSV are interpreted as SQL NULL rather than as the string 'NULL'. Only valid for CSV."
},
"dateformat": {
"type": "string",
"description": "A format string (e.g., '%Y-%m-%d') used to parse date values in the CSV. Only valid for CSV."
},
"decimal_separator": {
"type": "string",
"description": "The character used as the decimal separator in numeric CSV values (e.g., '.' or ','). Only valid for CSV."
},
"new_line": {
"type": "string",
"description": "The newline character(s) used in the CSV file (e.g., '\\n' or '\\r\\n'). Only valid for CSV."
},
"timestampformat": {
"type": "string",
"description": "A format string (e.g., '%Y-%m-%d %H:%M:%S') used to parse timestamp values in the CSV. Only valid for CSV."
},
"quote": {
"type": "string",
"description": "The character used for quoting fields in the CSV file (e.g., '\"'). Only valid for CSV."
}
},
"required": [
Expand Down Expand Up @@ -1336,11 +1408,47 @@
},
"delimiter": {
"type": "string",
"enum": [
"new_line",
"array"
"anyOf": [
{ "enum": ["new_line", "array"] },
{ "pattern": "^.$" }
],
"description": "Only for format = json. How multiple json documents are delimited within one file"
"description": "For JSON format, only 'new_line' or 'array' is allowed to indicate how multiple JSON documents are delimited. For CSV format, any single character can be used as the delimiter between columns. Only valid for CSV."
},
"header": {
"type": "boolean",
"description": "Indicates whether the first row in the CSV file should be treated as column headers. Only valid for CSV."
},
"escape": {
"type": "string",
"description": "Specifies the escape character used in the CSV file to include special characters in fields. Only valid for CSV."
},
"all_varchar": {
"type": "boolean",
"description": "If true, all CSV columns are read as VARCHAR (strings), bypassing type inference. Only valid for CSV."
},
"allow_quoted_nulls": {
"type": "boolean",
"description": "If true, quoted 'NULL' values in the CSV are interpreted as SQL NULL rather than as the string 'NULL'. Only valid for CSV."
},
"dateformat": {
"type": "string",
"description": "A format string (e.g., '%Y-%m-%d') used to parse date values in the CSV. Only valid for CSV."
},
"decimal_separator": {
"type": "string",
"description": "The character used as the decimal separator in numeric CSV values (e.g., '.' or ','). Only valid for CSV."
},
"new_line": {
"type": "string",
"description": "The newline character(s) used in the CSV file (e.g., '\\n' or '\\r\\n'). Only valid for CSV."
},
"timestampformat": {
"type": "string",
"description": "A format string (e.g., '%Y-%m-%d %H:%M:%S') used to parse timestamp values in the CSV. Only valid for CSV."
},
"quote": {
"type": "string",
"description": "The character used for quoting fields in the CSV file (e.g., '\"'). Only valid for CSV."
}
},
"required": [
Expand Down
Loading