diff --git a/datacontract/engines/soda/connections/duckdb.py b/datacontract/engines/soda/connections/duckdb.py index ea645a090..1d00c50f5 100644 --- a/datacontract/engines/soda/connections/duckdb.py +++ b/datacontract/engines/soda/connections/duckdb.py @@ -1,9 +1,11 @@ +import io import os +import uuid import duckdb from datacontract.export.csv_type_converter import convert_to_duckdb_csv_type -from datacontract.model.run import Run +from datacontract.model.run import Check, ResultEnum, Run def get_duckdb_connection(data_contract, server, run: Run): @@ -42,14 +44,80 @@ def get_duckdb_connection(data_contract, server, run: Run): elif server.format == "csv": columns = to_csv_types(model) run.log_info("Using columns: " + str(columns)) - if columns is None: - con.sql( - f"""CREATE VIEW "{model_name}" AS SELECT * FROM read_csv('{model_path}', hive_partitioning=1);""" - ) - else: - con.sql( - f"""CREATE VIEW "{model_name}" AS SELECT * FROM read_csv('{model_path}', hive_partitioning=1, columns={columns});""" - ) + + # Start with the required parameter. + params = ["hive_partitioning=1"] + + # Define a mapping for CSV parameters: server attribute -> read_csv parameter name. + param_mapping = { + "delimiter": "delim", # Map server.delimiter to 'delim' + "header": "header", + "escape": "escape", + "allVarchar": "all_varchar", + "allowQuotedNulls": "allow_quoted_nulls", + "dateformat": "dateformat", + "decimalSeparator": "decimal_separator", + "newLine": "new_line", + "timestampformat": "timestampformat", + "quote": "quote", + } + for server_attr, read_csv_param in param_mapping.items(): + value = getattr(server, server_attr, None) + if value is not None: + # Wrap string values in quotes. + if isinstance(value, str): + params.append(f"{read_csv_param}='{value}'") + else: + params.append(f"{read_csv_param}={value}") + + # Sniff out columns, if available: + has_header = getattr(server, "header", True) + if columns is not None and (has_header or has_header is None): + csv_columns = sniff_csv_header(model_path, server) + difference = set(csv_columns) - set(columns.keys()) + same_order = list(columns.keys()) == csv_columns[: len(columns)] + if not same_order: + run.checks.append( + Check( + id=str(uuid.uuid4()), + model=model_name, + category="schema", + type="fields_are_same", + name="Column order mismatch", + result=ResultEnum.warning, + reason=f"Order of columns in {model_path} does not match the model.", + details=f"Expected: {'|'.join(columns.keys())}\nActual: {'|'.join(csv_columns)}", + engine="datacontract", + ) + ) + if len(difference) > 0: + run.checks.append( + Check( + id=str(uuid.uuid4()), + model=model_name, + category="schema", + type="fields_are_same", + name="Dataset contained unexpected fields", + result=ResultEnum.warning, + reason=f"{model_path} contained unexpected fields: {', '.join(difference)}", + engine="datacontract", + ) + ) + columns = {k: columns.get(k, "VARCHAR") for k in csv_columns} + + # Add columns if they exist. + if columns is not None: + params.append(f"columns={columns}") + + # Build the parameter string. + params_str = ", ".join(params) + + # Create the view with the assembled parameters. + con.sql(f""" + CREATE VIEW "{model_name}" AS + SELECT * FROM read_csv('{model_path}', {params_str}); + """) + elif server.format == "delta": con.sql("update extensions;") # Make sure we have the latest delta extension con.sql(f"""CREATE VIEW "{model_name}" AS SELECT * FROM delta_scan('{model_path}');""") @@ -66,6 +134,25 @@ def to_csv_types(model) -> dict: return columns +def sniff_csv_header(model_path, server): + # Define a mapping for CSV parameters: server attribute -> duckdb.read_csv parameter name. + # Note! The parameter names in the python calls (read_csv, read_csv_auto, and from_csv_auto) + # are different from those used in the SQL statements. + param_mapping = { + "delimiter": "delimiter", + "header": "header", + "escape": "escapechar", + "decimal_separator": "decimal", + "quote": "quotechar", + } + # Remainder params are left out, as we do not care about parsing datatype for just the header. + with open(model_path, "rb") as model_file: + header_line = model_file.readline() + csv_params = {v: getattr(server, k) for (k, v) in param_mapping.items() if getattr(server, k, None) is not None} + # from_csv_auto + return duckdb.from_csv_auto(io.BytesIO(header_line), **csv_params).columns + + def setup_s3_connection(con, server): s3_region = os.getenv("DATACONTRACT_S3_REGION") s3_access_key_id = os.getenv("DATACONTRACT_S3_ACCESS_KEY_ID") diff --git a/datacontract/model/data_contract_specification.py b/datacontract/model/data_contract_specification.py index dcfdd94ec..9983ef728 100644 --- a/datacontract/model/data_contract_specification.py +++ b/datacontract/model/data_contract_specification.py @@ -58,6 +58,15 @@ class Server(pyd.BaseModel): dataset: str | None = None path: str | None = None delimiter: str | None = None + header: bool | None = None + escape: str | None = None + allVarchar: bool | None = None + allowQuotedNulls: bool | None = None + dateformat: str | None = None + decimalSeparator: str | None = None + newLine: str | None = None + timestampformat: str | None = None + quote: str | None = None endpointUrl: str | None = None location: str | None = None account: str | None = None diff --git a/datacontract/schemas/datacontract-1.1.0.schema.json b/datacontract/schemas/datacontract-1.1.0.schema.json index 29b933e14..f2d9d9069 100644 --- a/datacontract/schemas/datacontract-1.1.0.schema.json +++ b/datacontract/schemas/datacontract-1.1.0.schema.json @@ -1212,11 +1212,47 @@ }, "delimiter": { "type": "string", - "enum": [ - "new_line", - "array" + "anyOf": [ + { "enum": ["new_line", "array"] }, + { "pattern": "^.$" } ], - "description": "Only for format = json. How multiple json documents are delimited within one file" + "description": "For JSON format, only 'new_line' or 'array' is allowed to indicate how multiple JSON documents are delimited. For CSV format, any single character can be used as the delimiter between columns. Only valid for CSV." + }, + "header": { + "type": "boolean", + "description": "Indicates whether the first row in the CSV file should be treated as column headers. Only valid for CSV." + }, + "escape": { + "type": "string", + "description": "Specifies the escape character used in the CSV file to include special characters in fields. Only valid for CSV." + }, + "all_varchar": { + "type": "boolean", + "description": "If true, all CSV columns are read as VARCHAR (strings), bypassing type inference. Only valid for CSV." + }, + "allow_quoted_nulls": { + "type": "boolean", + "description": "If true, quoted 'NULL' values in the CSV are interpreted as SQL NULL rather than as the string 'NULL'. Only valid for CSV." + }, + "dateformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d') used to parse date values in the CSV. Only valid for CSV." + }, + "decimal_separator": { + "type": "string", + "description": "The character used as the decimal separator in numeric CSV values (e.g., '.' or ','). Only valid for CSV." + }, + "new_line": { + "type": "string", + "description": "The newline character(s) used in the CSV file (e.g., '\\n' or '\\r\\n'). Only valid for CSV." + }, + "timestampformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d %H:%M:%S') used to parse timestamp values in the CSV. Only valid for CSV." + }, + "quote": { + "type": "string", + "description": "The character used for quoting fields in the CSV file (e.g., '\"'). Only valid for CSV." } }, "required": [ @@ -1248,11 +1284,47 @@ }, "delimiter": { "type": "string", - "enum": [ - "new_line", - "array" + "anyOf": [ + { "enum": ["new_line", "array"] }, + { "pattern": "^.$" } ], - "description": "Only for format = json. How multiple json documents are delimited within one file" + "description": "For JSON format, only 'new_line' or 'array' is allowed to indicate how multiple JSON documents are delimited. For CSV format, any single character can be used as the delimiter between columns. Only valid for CSV." + }, + "header": { + "type": "boolean", + "description": "Indicates whether the first row in the CSV file should be treated as column headers. Only valid for CSV." + }, + "escape": { + "type": "string", + "description": "Specifies the escape character used in the CSV file to include special characters in fields. Only valid for CSV." + }, + "all_varchar": { + "type": "boolean", + "description": "If true, all CSV columns are read as VARCHAR (strings), bypassing type inference. Only valid for CSV." + }, + "allow_quoted_nulls": { + "type": "boolean", + "description": "If true, quoted 'NULL' values in the CSV are interpreted as SQL NULL rather than as the string 'NULL'. Only valid for CSV." + }, + "dateformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d') used to parse date values in the CSV. Only valid for CSV." + }, + "decimal_separator": { + "type": "string", + "description": "The character used as the decimal separator in numeric CSV values (e.g., '.' or ','). Only valid for CSV." + }, + "new_line": { + "type": "string", + "description": "The newline character(s) used in the CSV file (e.g., '\\n' or '\\r\\n'). Only valid for CSV." + }, + "timestampformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d %H:%M:%S') used to parse timestamp values in the CSV. Only valid for CSV." + }, + "quote": { + "type": "string", + "description": "The character used for quoting fields in the CSV file (e.g., '\"'). Only valid for CSV." } }, "required": [ @@ -1336,11 +1408,47 @@ }, "delimiter": { "type": "string", - "enum": [ - "new_line", - "array" + "anyOf": [ + { "enum": ["new_line", "array"] }, + { "pattern": "^.$" } ], - "description": "Only for format = json. How multiple json documents are delimited within one file" + "description": "For JSON format, only 'new_line' or 'array' is allowed to indicate how multiple JSON documents are delimited. For CSV format, any single character can be used as the delimiter between columns. Only valid for CSV." + }, + "header": { + "type": "boolean", + "description": "Indicates whether the first row in the CSV file should be treated as column headers. Only valid for CSV." + }, + "escape": { + "type": "string", + "description": "Specifies the escape character used in the CSV file to include special characters in fields. Only valid for CSV." + }, + "all_varchar": { + "type": "boolean", + "description": "If true, all CSV columns are read as VARCHAR (strings), bypassing type inference. Only valid for CSV." + }, + "allow_quoted_nulls": { + "type": "boolean", + "description": "If true, quoted 'NULL' values in the CSV are interpreted as SQL NULL rather than as the string 'NULL'. Only valid for CSV." + }, + "dateformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d') used to parse date values in the CSV. Only valid for CSV." + }, + "decimal_separator": { + "type": "string", + "description": "The character used as the decimal separator in numeric CSV values (e.g., '.' or ','). Only valid for CSV." + }, + "new_line": { + "type": "string", + "description": "The newline character(s) used in the CSV file (e.g., '\\n' or '\\r\\n'). Only valid for CSV." + }, + "timestampformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d %H:%M:%S') used to parse timestamp values in the CSV. Only valid for CSV." + }, + "quote": { + "type": "string", + "description": "The character used for quoting fields in the CSV file (e.g., '\"'). Only valid for CSV." } }, "required": [ diff --git a/datacontract/schemas/odcs-3.0.1.schema.json b/datacontract/schemas/odcs-3.0.1.schema.json index e501aad32..8f899bdbf 100644 --- a/datacontract/schemas/odcs-3.0.1.schema.json +++ b/datacontract/schemas/odcs-3.0.1.schema.json @@ -776,11 +776,47 @@ }, "delimiter": { "type": "string", - "enum": [ - "new_line", - "array" + "anyOf": [ + { "enum": ["new_line", "array"] }, + { "pattern": "^.$" } ], - "description": "Only for format = json. How multiple json documents are delimited within one file" + "description": "For JSON format, only 'new_line' or 'array' is allowed to indicate how multiple JSON documents are delimited. For CSV format, any single character can be used as the delimiter between columns. Only valid for CSV." + }, + "header": { + "type": "boolean", + "description": "Indicates whether the first row in the CSV file should be treated as column headers. Only valid for CSV." + }, + "escape": { + "type": "string", + "description": "Specifies the escape character used in the CSV file to include special characters in fields. Only valid for CSV." + }, + "all_varchar": { + "type": "boolean", + "description": "If true, all CSV columns are read as VARCHAR (strings), bypassing type inference. Only valid for CSV." + }, + "allow_quoted_nulls": { + "type": "boolean", + "description": "If true, quoted 'NULL' values in the CSV are interpreted as SQL NULL rather than as the string 'NULL'. Only valid for CSV." + }, + "dateformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d') used to parse date values in the CSV. Only valid for CSV." + }, + "decimal_separator": { + "type": "string", + "description": "The character used as the decimal separator in numeric CSV values (e.g., '.' or ','). Only valid for CSV." + }, + "new_line": { + "type": "string", + "description": "The newline character(s) used in the CSV file (e.g., '\\n' or '\\r\\n'). Only valid for CSV." + }, + "timestampformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d %H:%M:%S') used to parse timestamp values in the CSV. Only valid for CSV." + }, + "quote": { + "type": "string", + "description": "The character used for quoting fields in the CSV file (e.g., '\"'). Only valid for CSV." } }, "required": [ @@ -1381,11 +1417,47 @@ }, "delimiter": { "type": "string", - "enum": [ - "new_line", - "array" + "anyOf": [ + { "enum": ["new_line", "array"] }, + { "pattern": "^.$" } ], - "description": "Only for format = json. How multiple json documents are delimited within one file" + "description": "For JSON format, only 'new_line' or 'array' is allowed to indicate how multiple JSON documents are delimited. For CSV format, any single character can be used as the delimiter between columns. Only valid for CSV." + }, + "header": { + "type": "boolean", + "description": "Indicates whether the first row in the CSV file should be treated as column headers. Only valid for CSV." + }, + "escape": { + "type": "string", + "description": "Specifies the escape character used in the CSV file to include special characters in fields. Only valid for CSV." + }, + "all_varchar": { + "type": "boolean", + "description": "If true, all CSV columns are read as VARCHAR (strings), bypassing type inference. Only valid for CSV." + }, + "allow_quoted_nulls": { + "type": "boolean", + "description": "If true, quoted 'NULL' values in the CSV are interpreted as SQL NULL rather than as the string 'NULL'. Only valid for CSV." + }, + "dateformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d') used to parse date values in the CSV. Only valid for CSV." + }, + "decimal_separator": { + "type": "string", + "description": "The character used as the decimal separator in numeric CSV values (e.g., '.' or ','). Only valid for CSV." + }, + "new_line": { + "type": "string", + "description": "The newline character(s) used in the CSV file (e.g., '\\n' or '\\r\\n'). Only valid for CSV." + }, + "timestampformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d %H:%M:%S') used to parse timestamp values in the CSV. Only valid for CSV." + }, + "quote": { + "type": "string", + "description": "The character used for quoting fields in the CSV file (e.g., '\"'). Only valid for CSV." } }, "required": [ @@ -1417,11 +1489,47 @@ }, "delimiter": { "type": "string", - "enum": [ - "new_line", - "array" + "anyOf": [ + { "enum": ["new_line", "array"] }, + { "pattern": "^.$" } ], - "description": "Only for format = json. How multiple json documents are delimited within one file" + "description": "For JSON format, only 'new_line' or 'array' is allowed to indicate how multiple JSON documents are delimited. For CSV format, any single character can be used as the delimiter between columns. Only valid for CSV." + }, + "header": { + "type": "boolean", + "description": "Indicates whether the first row in the CSV file should be treated as column headers. Only valid for CSV." + }, + "escape": { + "type": "string", + "description": "Specifies the escape character used in the CSV file to include special characters in fields. Only valid for CSV." + }, + "all_varchar": { + "type": "boolean", + "description": "If true, all CSV columns are read as VARCHAR (strings), bypassing type inference. Only valid for CSV." + }, + "allow_quoted_nulls": { + "type": "boolean", + "description": "If true, quoted 'NULL' values in the CSV are interpreted as SQL NULL rather than as the string 'NULL'. Only valid for CSV." + }, + "dateformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d') used to parse date values in the CSV. Only valid for CSV." + }, + "decimal_separator": { + "type": "string", + "description": "The character used as the decimal separator in numeric CSV values (e.g., '.' or ','). Only valid for CSV." + }, + "new_line": { + "type": "string", + "description": "The newline character(s) used in the CSV file (e.g., '\\n' or '\\r\\n'). Only valid for CSV." + }, + "timestampformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d %H:%M:%S') used to parse timestamp values in the CSV. Only valid for CSV." + }, + "quote": { + "type": "string", + "description": "The character used for quoting fields in the CSV file (e.g., '\"'). Only valid for CSV." } }, "required": [ diff --git a/pyproject.toml b/pyproject.toml index d68e76174..7792eac74 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ "rich>=13.7,<13.10", "sqlglot>=26.6.0,<27.0.0", "duckdb==1.1.2", + "fsspec", "soda-core-duckdb>=3.3.20,<3.5.0", # remove setuptools when https://github.com/sodadata/soda-core/issues/2091 is resolved "setuptools>=60", diff --git a/tests/fixtures/csv/data/datacontract.yaml b/tests/fixtures/csv/data/datacontract.yaml index b0aacc22a..7e25751b0 100644 --- a/tests/fixtures/csv/data/datacontract.yaml +++ b/tests/fixtures/csv/data/datacontract.yaml @@ -7,7 +7,7 @@ servers: production: type: local format: csv - path: ./tests/fixtures/csv/data/sample_data.csv + path: ./fixtures/csv/data/sample_data.csv delimiter: ',' models: sample_data: diff --git a/tests/test_duckdb_csv.py b/tests/test_duckdb_csv.py new file mode 100644 index 000000000..697e64ddc --- /dev/null +++ b/tests/test_duckdb_csv.py @@ -0,0 +1,118 @@ +import pytest +import duckdb + +from datacontract.data_contract import DataContract +from datacontract.model.run import Run +from datacontract.lint import resolve +from datacontract.engines.soda.connections.duckdb import get_duckdb_connection + +def test_csv_all_fields_present(): + data_contract_str = """dataContractSpecification: 1.1.0 +id: my-data-contract-id +info: + title: My Data Contract + version: 0.0.1 +servers: + production: + type: local + format: csv + path: ./fixtures/csv/data/sample_data.csv + delimiter: ',' +models: + sample_data: + description: Csv file with encoding ascii + type: table + fields: + field_one: + type: string + field_two: + type: integer + field_three: + type: string + """ + data_contract = resolve.resolve_data_contract(data_contract_str = data_contract_str) + con = get_duckdb_connection(data_contract, data_contract.servers["production"], Run.create_run()) + assert con.table("sample_data").columns == ['field_one', 'field_two', 'field_three'] + assert con.table("sample_data").shape == (10,3) + +def test_csv_missing_field(): + data_contract_str = """dataContractSpecification: 1.1.0 +id: my-data-contract-id +info: + title: My Data Contract + version: 0.0.1 +servers: + production: + type: local + format: csv + path: ./fixtures/csv/data/sample_data.csv + delimiter: ',' +models: + sample_data: + description: Csv file with encoding ascii + type: table + fields: + field_one: + type: string + field_two: + type: integer + field_three: + type: string + missing_field: + type: string + """ + data_contract = resolve.resolve_data_contract(data_contract_str = data_contract_str) + # this is ok + run = Run.create_run() + con = get_duckdb_connection(data_contract, data_contract.servers["production"], run) + assert con.table("sample_data").columns == ['field_one', 'field_two', 'field_three'] + assert con.table("sample_data").shape == (10,3) + assert any([c.name == 'Column order mismatch' for c in run.checks if c.result == 'warning']) + # now test + data_contract = DataContract(data_contract_str=data_contract_str) + run = data_contract.test() + checks = {k.field:k for k in run.checks if k.type == 'field_is_present'} + assert checks['field_one'].result == 'passed' + assert checks['field_two'].result == 'passed' + assert checks['field_three'].result == 'passed' + assert checks['missing_field'].result == 'failed' + + +def test_local_csv_extra_field(): + data_contract_str = """dataContractSpecification: 1.1.0 +id: my-data-contract-id +info: + title: My Data Contract + version: 0.0.1 +servers: + production: + type: local + format: csv + path: ./fixtures/csv/data/sample_data.csv + delimiter: ',' +models: + sample_data: + description: Csv file with encoding ascii + type: table + fields: + field_three: + type: string + field_one: + type: string + """ + data_contract = resolve.resolve_data_contract(data_contract_str = data_contract_str) + # this is somewhat ok + run = Run.create_run() + con = get_duckdb_connection(data_contract, data_contract.servers["production"], run) + assert con.table("sample_data").columns == ['field_one', 'field_two', 'field_three'] + assert con.table("sample_data").shape == (10,3) + assert any([c.name == 'Column order mismatch' for c in run.checks if c.result == 'warning']) + assert any([c.name == 'Dataset contained unexpected fields' for c in run.checks if c.result == 'warning']) + + # now test + data_contract = DataContract(data_contract_str=data_contract_str) + run = data_contract.test() + checks = {k.field:k for k in run.checks if k.type == 'field_is_present'} + assert len(checks) == 2 + assert checks['field_one'].result == 'passed' + assert checks['field_three'].result == 'passed' diff --git a/tests/test_test_local_csv.py b/tests/test_test_local_csv.py new file mode 100644 index 000000000..9eee5ee70 --- /dev/null +++ b/tests/test_test_local_csv.py @@ -0,0 +1,26 @@ +import pytest +from typer.testing import CliRunner + +from datacontract.cli import app +from datacontract.model.run import Run +from datacontract.data_contract import DataContract +from datacontract.lint import resolve +from datacontract.engines.soda.connections.duckdb import get_duckdb_connection + +runner = CliRunner() + +#csv_file_path = "fixtures/csv/data/sample_data.csv" + + +def test_cli(): + result = runner.invoke(app, "./fixtures/csv/data/datacontract.yaml") + assert result.exit_code == 0 + + + +def test_local_json(): + data_contract = DataContract(data_contract_file="./fixtures/csv/data/datacontract.yaml") + run = data_contract.test() + print(run) + assert run.result == "passed" +