diff --git a/README.md b/README.md index c4827d76a..e36de0602 100644 --- a/README.md +++ b/README.md @@ -250,6 +250,7 @@ A list of available extras: | RDF | `pip install datacontract-cli[rdf]` | | API (run as web server) | `pip install datacontract-cli[api]` | | protobuf | `pip install datacontract-cli[protobuf]` | +| Sftp server | `pip install datacontract-cli[sftp]` | ## Documentation @@ -398,6 +399,7 @@ Supported server types: - [impala](#impala) - [api](#api) - [local](#local) +- [sftp](#sftp) Supported formats: @@ -1027,6 +1029,37 @@ models: type: string ``` +#### sftp + +Data Contract CLI can test sftp files in parquet, json, csv, or delta format. + +##### Example + +datacontract.yaml +```yaml +servers: + local: + type: sftp + path: sftp://data/*.parquet + format: parquet +models: + my_table_1: # corresponds to a table + type: table + fields: + my_column_1: # corresponds to a column + type: varchar + my_column_2: # corresponds to a column + type: string +``` + +##### Environment Variables + +| Environment Variable | Example | Description | +|------------------------------|--------------------|-------------| +| `DATACONTRACT_SFTP_USERNAME` | `admin` | Username | +| `DATACONTRACT_SFTP_PASSWORD` | `mysecretpassword` | Password | + + ### export ``` diff --git a/datacontract/data_contract.py b/datacontract/data_contract.py old mode 100644 new mode 100755 diff --git a/datacontract/engines/soda/check_soda_execute.py b/datacontract/engines/soda/check_soda_execute.py index c3187c96c..8cdbbfb36 100644 --- a/datacontract/engines/soda/check_soda_execute.py +++ b/datacontract/engines/soda/check_soda_execute.py @@ -43,7 +43,7 @@ def check_soda_execute( run.log_info("Running engine soda-core") scan = Scan() - if server.type in ["s3", "gcs", "azure", "local"]: + if server.type in ["s3", "gcs", "azure", "local", "sftp"]: if server.format in ["json", "parquet", "csv", "delta"]: run.log_info(f"Configuring engine soda-core to connect to {server.type} {server.format} with duckdb") con = get_duckdb_connection(data_contract, server, run, duckdb_connection) @@ -130,7 +130,6 @@ def check_soda_execute( soda_configuration_str = to_athena_soda_configuration(server) scan.add_configuration_yaml_str(soda_configuration_str) scan.set_data_source_name(server.type) - else: run.checks.append( Check( diff --git a/datacontract/engines/soda/connections/duckdb_connection.py b/datacontract/engines/soda/connections/duckdb_connection.py index 6ea58ae9d..2e54b564a 100644 --- a/datacontract/engines/soda/connections/duckdb_connection.py +++ b/datacontract/engines/soda/connections/duckdb_connection.py @@ -7,6 +7,7 @@ from datacontract.export.duckdb_type_converter import convert_to_duckdb_csv_type, convert_to_duckdb_json_type from datacontract.export.sql_type_converter import convert_to_duckdb +from datacontract.imports.importer import setup_sftp_filesystem from datacontract.model.run import Run @@ -22,18 +23,22 @@ def get_duckdb_connection( con = duckdb_connection path: str = "" - if server.type == "local": - path = server.path - if server.type == "s3": - path = server.location - setup_s3_connection(con, server) - if server.type == "gcs": - path = server.location - setup_gcs_connection(con, server) - if server.type == "azure": - path = server.location - setup_azure_connection(con, server) - + match server.type: + case "local": + path = server.path + case "s3": + path = server.location + setup_s3_connection(con, server) + case "gcs": + path = server.location + setup_gcs_connection(con, server) + case "azure": + path = server.location + setup_azure_connection(con, server) + case "sftp": + fs = setup_sftp_filesystem(server.location) + duckdb.register_filesystem(filesystem=fs, connection=con) + path = server.location if data_contract.schema_: for schema_obj in data_contract.schema_: model_name = schema_obj.name diff --git a/datacontract/imports/csv_importer.py b/datacontract/imports/csv_importer.py index ce117612d..4c80ed98e 100644 --- a/datacontract/imports/csv_importer.py +++ b/datacontract/imports/csv_importer.py @@ -4,7 +4,7 @@ import duckdb from open_data_contract_standard.model import OpenDataContractStandard -from datacontract.imports.importer import Importer +from datacontract.imports.importer import Importer, setup_sftp_filesystem from datacontract.imports.odcs_helper import ( create_odcs, create_property, @@ -29,6 +29,10 @@ def import_csv( # use duckdb to auto detect format, columns, etc. con = duckdb.connect(database=":memory:") + if source.startswith("sftp"): + fs = setup_sftp_filesystem(source) + duckdb.register_filesystem(filesystem=fs, connection=con) + con.register_filesystem(fs) con.sql( f"""CREATE VIEW "{table_name}" AS SELECT * FROM read_csv_auto('{source}', hive_partitioning=1, auto_type_candidates = ['BOOLEAN', 'INTEGER', 'BIGINT', 'DOUBLE', 'VARCHAR']);""" ) diff --git a/datacontract/imports/excel_importer.py b/datacontract/imports/excel_importer.py index df18bae00..4c40da4f9 100644 --- a/datacontract/imports/excel_importer.py +++ b/datacontract/imports/excel_importer.py @@ -1,6 +1,7 @@ import logging import os from decimal import Decimal +from io import BytesIO from typing import Any, Dict, List, Optional import openpyxl @@ -22,7 +23,7 @@ from openpyxl.workbook.workbook import Workbook from openpyxl.worksheet.worksheet import Worksheet -from datacontract.imports.importer import Importer +from datacontract.imports.importer import Importer, setup_sftp_filesystem from datacontract.model.exceptions import DataContractException logger = logging.getLogger(__name__) @@ -47,9 +48,17 @@ def import_excel_as_odcs(excel_file_path: str) -> OpenDataContractStandard: Returns: OpenDataContractStandard object """ - if not os.path.exists(excel_file_path): + if not (excel_file_path.startswith("sftp://") or os.path.exists(excel_file_path)): raise FileNotFoundError(f"Excel file not found: {excel_file_path}") + if excel_file_path.startswith("sftp://"): + fs = setup_sftp_filesystem(excel_file_path) + try: + with fs.open(excel_file_path) as excel_file: + excel_file_path = BytesIO(excel_file.read()) + except FileNotFoundError: + raise FileNotFoundError(f"SftpConnection is a success but excel file not found: {excel_file_path}") + try: workbook = openpyxl.load_workbook(excel_file_path, data_only=True) except Exception as e: @@ -984,7 +993,7 @@ def parse_threshold_values(threshold_operator: str, threshold_value: str) -> Dic # Single value for other operators try: # Try to parse as number - isFraction = "." in threshold_value + isFraction = "." in threshold_value if threshold_value.replace(".", "").replace("-", "").isdigit(): if isFraction: value = float(threshold_value) diff --git a/datacontract/imports/importer.py b/datacontract/imports/importer.py index 24961fb8c..2657a1ac6 100644 --- a/datacontract/imports/importer.py +++ b/datacontract/imports/importer.py @@ -1,6 +1,9 @@ +import os from abc import ABC, abstractmethod from enum import Enum +from urllib.parse import urlparse +import fsspec from open_data_contract_standard.model import OpenDataContractStandard @@ -42,3 +45,16 @@ class ImportFormat(str, Enum): @classmethod def get_supported_formats(cls): return list(map(lambda c: c.value, cls)) + +def setup_sftp_filesystem(url: str): + parsed_url = urlparse(url) + hostname = parsed_url.hostname if parsed_url.hostname is not None else "127.0.0.1" + port = parsed_url.port if parsed_url.port is not None else 22 + sftp_user = os.getenv("DATACONTRACT_SFTP_USER") + sftp_password = os.getenv("DATACONTRACT_SFTP_PASSWORD") + if sftp_user is None or sftp_password is None: + raise ValueError("Error: Environment variable DATACONTRACT_SFTP_USER is not set") + if sftp_password is None: + raise ValueError("Error: Environment variable DATACONTRACT_SFTP_PASSWORD is not set") + return fsspec.filesystem("sftp", host=hostname, port=port, username=sftp_user, password=sftp_password) + diff --git a/datacontract/imports/json_importer.py b/datacontract/imports/json_importer.py index 219a910d3..ebd21ca78 100644 --- a/datacontract/imports/json_importer.py +++ b/datacontract/imports/json_importer.py @@ -5,7 +5,7 @@ from open_data_contract_standard.model import OpenDataContractStandard, SchemaProperty -from datacontract.imports.importer import Importer +from datacontract.imports.importer import Importer, setup_sftp_filesystem from datacontract.imports.odcs_helper import ( create_odcs, create_property, @@ -23,7 +23,12 @@ def import_source( def is_ndjson(file_path: str) -> bool: """Check if a file contains newline-delimited JSON.""" - with open(file_path, "r", encoding="utf-8") as file: + if file_path.startswith("sftp://"): + fs = setup_sftp_filesystem(file_path) + file_ctx = fs.open(file_path, "r") + else: + file_ctx = open(file_path, "r", encoding="utf-8") + with file_ctx as file: for _ in range(5): line = file.readline().strip() if not line: @@ -39,11 +44,18 @@ def is_ndjson(file_path: str) -> bool: def import_json(source: str, include_examples: bool = False) -> OpenDataContractStandard: """Import a JSON file and create an ODCS data contract.""" base_model_name = os.path.splitext(os.path.basename(source))[0] - - # Check if file is newline-delimited JSON + + if source.startswith("sftp://"): + fs = setup_sftp_filesystem(source) + file_ctx = fs.open(source, "r") + else: + file_ctx = open(source, "r", encoding="utf-8") + + # check if file is newline-delimited JSON if is_ndjson(source): + json_data = [] - with open(source, "r", encoding="utf-8") as file: + with file_ctx as file: for line in file: line = line.strip() if line: @@ -52,7 +64,7 @@ def import_json(source: str, include_examples: bool = False) -> OpenDataContract except json.JSONDecodeError: continue else: - with open(source, "r", encoding="utf-8") as file: + with file_ctx as file: json_data = json.load(file) odcs = create_odcs() diff --git a/datacontract/imports/jsonschema_importer.py b/datacontract/imports/jsonschema_importer.py index 032eefec2..b13bef413 100644 --- a/datacontract/imports/jsonschema_importer.py +++ b/datacontract/imports/jsonschema_importer.py @@ -4,7 +4,7 @@ import fastjsonschema from open_data_contract_standard.model import DataQuality, OpenDataContractStandard, SchemaProperty -from datacontract.imports.importer import Importer +from datacontract.imports.importer import Importer, setup_sftp_filesystem from datacontract.imports.odcs_helper import ( create_odcs, create_property, @@ -50,8 +50,13 @@ def import_jsonschema(source: str) -> OpenDataContractStandard: def load_and_validate_json_schema(source: str) -> dict: """Load and validate a JSON Schema file.""" try: - with open(source, "r") as file: - json_schema = json.loads(file.read()) + if source.startswith("sftp://"): + fs = setup_sftp_filesystem(source) + with fs.open(source, "r") as file: + json_schema = json.loads(file.read()) + else: + with open(source, "r") as file: + json_schema = json.loads(file.read()) validator = fastjsonschema.compile({}) validator(json_schema) diff --git a/datacontract/imports/odcs_importer.py b/datacontract/imports/odcs_importer.py index a397ba1c5..6c7b7796a 100644 --- a/datacontract/imports/odcs_importer.py +++ b/datacontract/imports/odcs_importer.py @@ -17,6 +17,7 @@ def import_odcs(source: str) -> OpenDataContractStandard: """Import an ODCS file directly - since ODCS is now the internal format, this is simpler.""" try: odcs_yaml = yaml.safe_load(read_resource(source)) + except Exception as e: raise DataContractException( type="schema", diff --git a/datacontract/imports/parquet_importer.py b/datacontract/imports/parquet_importer.py index 3265acda8..048051f50 100644 --- a/datacontract/imports/parquet_importer.py +++ b/datacontract/imports/parquet_importer.py @@ -4,7 +4,7 @@ from open_data_contract_standard.model import OpenDataContractStandard from pyarrow import parquet -from datacontract.imports.importer import Importer +from datacontract.imports.importer import Importer, setup_sftp_filesystem from datacontract.imports.odcs_helper import ( create_odcs, create_property, @@ -24,10 +24,15 @@ def import_parquet(source: str) -> OpenDataContractStandard: """Import a Parquet file and create an ODCS data contract.""" # use filename as schema name, remove .parquet suffix, avoid breaking the yaml output by replacing dots schema_name = os.path.basename(source).removesuffix(".parquet").replace(".", "_") - + if source.startswith("sftp://"): + fs = setup_sftp_filesystem(source) + # Extract path without the sftp:// prefix and host + path = source.split("//", 1)[1] # Remove sftp:// + path = "/" + path.split("/", 1)[1] # Remove host part + arrow_schema = parquet.read_schema(path, filesystem=fs) + else: + arrow_schema = parquet.read_schema(source) properties = [] - - arrow_schema = parquet.read_schema(source) for field_name in arrow_schema.names: parquet_field = arrow_schema.field(field_name) diff --git a/datacontract/imports/unity_importer.py b/datacontract/imports/unity_importer.py index bd0fe27b3..5f99c2268 100644 --- a/datacontract/imports/unity_importer.py +++ b/datacontract/imports/unity_importer.py @@ -63,6 +63,7 @@ def import_unity_from_api(unity_table_full_name_list: List[str] = None) -> OpenD reason="", engine="datacontract", ) + if not profile and not host and not token: reason = "Either DATACONTRACT_DATABRICKS_PROFILE or both DATACONTRACT_DATABRICKS_SERVER_HOSTNAME and DATACONTRACT_DATABRICKS_TOKEN environment variables must be set" exception.reason = reason diff --git a/pyproject.toml b/pyproject.toml index d87a329e6..3d2fe226b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ classifiers = [ "Operating System :: OS Independent", ] requires-python = ">=3.10,<3.13" + dependencies = [ "typer>=0.15.1,<0.20", "pydantic>=2.8.2,<2.13.0", @@ -129,8 +130,12 @@ protobuf = [ "grpcio-tools>=1.53", ] +sftp = [ + "paramiko>=4.0.0", + ] + all = [ - "datacontract-cli[kafka,bigquery,csv,excel,snowflake,postgres,databricks,sqlserver,s3,athena,trino,dbt,dbml,iceberg,parquet,rdf,api,protobuf,oracle]" + "datacontract-cli[kafka,bigquery,csv,excel,snowflake,postgres,databricks,sqlserver,s3,athena,trino,dbt,dbml,iceberg,parquet,rdf,api,protobuf,oracle,sftp]" ] # for development, we pin all libraries to an exact version diff --git a/tests/fixtures/sftp-csv/data/sample_data.csv b/tests/fixtures/sftp-csv/data/sample_data.csv new file mode 100644 index 000000000..4ae23e482 --- /dev/null +++ b/tests/fixtures/sftp-csv/data/sample_data.csv @@ -0,0 +1,11 @@ +field_one,field_two,field_three +CX-263-DU,50,2023-06-16 13:12:56 +IK-894-MN,47,2023-10-08 22:40:57 +ER-399-JY,22,2023-05-16 01:08:22 +MT-939-FH,63,2023-03-15 05:15:21 +LV-849-MI,33,2023-09-08 20:08:43 +VS-079-OH,85,2023-04-15 00:50:32 +DN-297-XY,79,2023-11-08 12:55:42 +ZE-172-FP,14,2023-12-03 18:38:38 +ID-840-EG,89,2023-10-02 17:17:58 +FK-230-KZ,64,2023-11-27 15:21:48 diff --git a/tests/fixtures/sftp-csv/datacontract.yaml b/tests/fixtures/sftp-csv/datacontract.yaml new file mode 100644 index 000000000..34af9ebab --- /dev/null +++ b/tests/fixtures/sftp-csv/datacontract.yaml @@ -0,0 +1,30 @@ +dataContractSpecification: 1.2.0 +id: sftp-csv +info: + title: s30-csv + version: 0.0.1 + owner: my-domain-team +servers: + my-server: + type: sftp + location: sftp://localhost:22/sftp/data/sample_data.csv + format: csv + dataProductId: my-dataproduct +models: + my_table: + type: table + fields: + field_one: + type: varchar + required: true + unique: true + pattern: "[A-Za-z]{2}-\\d{3}-[A-Za-z]{2}$" + field_two: + type: bigint + minimum: 10 + field_three: + type: timestamp + quality: + - type: sql + query: "SELECT count(*) FROM my_table" + mustBeGreaterThan: 0 diff --git a/tests/test_import_sftp.py b/tests/test_import_sftp.py new file mode 100644 index 000000000..101d531c0 --- /dev/null +++ b/tests/test_import_sftp.py @@ -0,0 +1,180 @@ +import os + +os.environ["TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE"]="/var/run/docker.sock" +from time import sleep + +import paramiko +import pytest +import yaml +from testcontainers.sftp import SFTPContainer, SFTPUser +from typer.testing import CliRunner + +from datacontract.cli import app +from datacontract.data_contract import DataContract +from datacontract.imports.excel_importer import import_excel_as_odcs + +sftp_dir = "/sftp/data" + +csv_file_name = "sample_data" +csv_file_path = f"fixtures/csv/data/{csv_file_name}.csv" +csv_sftp_path = f"{sftp_dir}/{csv_file_name}.csv" + +parquet_file_name = "combined_no_time" +parquet_file_path = f"fixtures/parquet/data/{parquet_file_name}.parquet" +parquet_sftp_path = f"{sftp_dir}/{parquet_file_name}.parquet" + +avro_file_name = "orders" +avro_file_path = f"fixtures/avro/data/{avro_file_name}.avsc" +avro_sftp_path = f"{sftp_dir}/{avro_file_name}.avsc" + +dbml_file_name = "dbml" +dbml_file_path = f"fixtures/dbml/import/{dbml_file_name}.txt" +dbml_sftp_path = f"{sftp_dir}/{dbml_file_name}.txt" + +dbt_file_name = "manifest_jaffle_duckdb" +dbt_file_path = f"fixtures/dbt/import/{dbt_file_name}.json" +dbt_sftp_path = f"{sftp_dir}/{dbt_file_name}.json" + +iceberg_file_name = "simple_schema" +iceberg_file_path = f"fixtures/iceberg/{iceberg_file_name}.json" +iceberg_sftp_path = f"{sftp_dir}/{iceberg_file_name}.json" + +json_file_name = "orders" +json_file_path = f"fixtures/import/{json_file_name}.json" +json_sftp_path = f"{sftp_dir}/{json_file_name}.json" + +odcs_file_name = "full-example" +odcs_file_path = f"fixtures/odcs_v3/{odcs_file_name}.odcs.yaml" +odcs_sftp_path = f"{sftp_dir}/{odcs_file_name}.odcs.yaml" + +excel_file_path = "./fixtures/excel/shipments-odcs.xlsx" +excel_sftp_path = f"{sftp_dir}/shipments-odcs.xlsx" +username = "demo" # for emberstack +password = "demo" # for emberstack +user = SFTPUser(name = username,password=password) + + +@pytest.fixture +def sftp_container(): + """ + Initialize and provide an SFTP container for all tests in this module. + Sets up the container, uploads the test file, and provides connection details. + """ + # that image is both compatible with Mac and Linux which is not the case with the default image + + with SFTPContainer(image="emberstack/sftp:latest", users=[user]) as container: + host_ip = container.get_container_host_ip() + host_port = container.get_exposed_sftp_port() + + # Set environment variables for SFTP authentication + os.environ["DATACONTRACT_SFTP_USER"] = username + os.environ["DATACONTRACT_SFTP_PASSWORD"] = password + + # Wait for the container to be ready + sleep(3) + + # Upload test files to SFTP server + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(host_ip, host_port, username, password) + sftp = ssh.open_sftp() + try: + sftp.mkdir(sftp_dir) + sftp.put(avro_file_path, avro_sftp_path) + sftp.put(dbml_file_path, dbml_sftp_path) + sftp.put(dbt_file_path, dbt_sftp_path) + sftp.put(csv_file_path, csv_sftp_path) + sftp.put(iceberg_file_path, iceberg_sftp_path) + sftp.put(json_file_path, json_sftp_path) + sftp.put(odcs_file_path, odcs_sftp_path) + sftp.put(parquet_file_path, parquet_sftp_path) + sftp.put(excel_file_path, excel_sftp_path) + finally: + sftp.close() + ssh.close() + + + yield { + "host_ip": host_ip, + "host_port": host_port, + "container": container + } + + +def test_cli(sftp_container): + host_ip = sftp_container["host_ip"] + host_port = sftp_container["host_port"] + source = f"sftp://{host_ip}:{host_port}{csv_sftp_path}" + + runner = CliRunner() + result = runner.invoke( + app, + [ + "import", + "--format", + "csv", + "--source", + source, + ], + ) + assert result.exit_code == 0 + +def test_import_sftp_csv(sftp_container): + host_ip = sftp_container["host_ip"] + host_port = sftp_container["host_port"] + source = f"sftp://{host_ip}:{host_port}{csv_sftp_path}" + + result = DataContract.import_from_source("csv", source) + model = result.schema_[0].properties + assert model is not None + fields = {x.name:x for x in model} + assert "field_one" in fields + assert "field_two" in fields + assert "field_three" in fields + + assert model is not None + + + + + +def test_import_sftp_parquet(sftp_container): + host_ip = sftp_container["host_ip"] + host_port = sftp_container["host_port"] + source = f"sftp://{host_ip}:{host_port}{parquet_sftp_path}" + + result = DataContract.import_from_source("parquet", source) + model = result.schema_[0].properties + assert model is not None + fields = {x.name:x for x in model} + assert fields["string_field"].logicalType == "string" + assert fields["blob_field"].logicalType == "array" + assert fields["boolean_field"].logicalType == "boolean" + assert fields["struct_field"].logicalType == "object" + assert DataContract(data_contract=result).lint().has_passed() + + +def test_import_sftp_jsonschema(sftp_container): + host_ip = sftp_container["host_ip"] + host_port = sftp_container["host_port"] + source = f"sftp://{host_ip}:{host_port}{json_sftp_path}" + result = DataContract.import_from_source("jsonschema", source) + assert len(result.schema_) > 0 + assert DataContract(data_contract=result).lint().has_passed() + + +def test_import_excel_odcs(sftp_container): + host_ip = sftp_container["host_ip"] + host_port = sftp_container["host_port"] + source = f"sftp://{host_ip}:{host_port}{excel_sftp_path}" + result = import_excel_as_odcs(source) + expected_datacontract = read_file("fixtures/excel/shipments-odcs.yaml") + assert yaml.safe_load(result.to_yaml()) == yaml.safe_load(expected_datacontract) + +def read_file(file): + if not os.path.exists(file): + print(f"The file '{file}' does not exist.") + raise FileNotFoundError(f"The file '{file}' does not exist.") + with open(file, "r") as file: + file_content = file.read() + return file_content diff --git a/tests/test_lint.py b/tests/test_lint.py index 857996479..2f8dbeed4 100644 --- a/tests/test_lint.py +++ b/tests/test_lint.py @@ -1,3 +1,4 @@ + from open_data_contract_standard.model import OpenDataContractStandard from typer.testing import CliRunner @@ -82,3 +83,4 @@ def test_lint_with_references(): run = data_contract.lint() assert run.result == "passed" +