diff --git a/README.md b/README.md index c4827d76a..f37ade42e 100644 --- a/README.md +++ b/README.md @@ -398,6 +398,7 @@ Supported server types: - [impala](#impala) - [api](#api) - [local](#local) +- [sftp](#sftp) Supported formats: @@ -1027,6 +1028,37 @@ models: type: string ``` +#### sftp + +Data Contract CLI can test sftp files in parquet, json, csv, or delta format. + +##### Example + +datacontract.yaml +```yaml +servers: + local: + type: sftp + path: sftp://data/*.parquet + format: parquet +models: + my_table_1: # corresponds to a table + type: table + fields: + my_column_1: # corresponds to a column + type: varchar + my_column_2: # corresponds to a column + type: string +``` + +##### Environment Variables + +| Environment Variable | Example | Description | +|------------------------------|--------------------|-------------| +| `DATACONTRACT_SFTP_USERNAME` | `admin` | Username | +| `DATACONTRACT_SFTP_PASSWORD` | `mysecretpassword` | Password | + + ### export ``` diff --git a/datacontract/data_contract.py b/datacontract/data_contract.py old mode 100644 new mode 100755 diff --git a/datacontract/engines/soda/check_soda_execute.py b/datacontract/engines/soda/check_soda_execute.py index c3187c96c..8cdbbfb36 100644 --- a/datacontract/engines/soda/check_soda_execute.py +++ b/datacontract/engines/soda/check_soda_execute.py @@ -43,7 +43,7 @@ def check_soda_execute( run.log_info("Running engine soda-core") scan = Scan() - if server.type in ["s3", "gcs", "azure", "local"]: + if server.type in ["s3", "gcs", "azure", "local", "sftp"]: if server.format in ["json", "parquet", "csv", "delta"]: run.log_info(f"Configuring engine soda-core to connect to {server.type} {server.format} with duckdb") con = get_duckdb_connection(data_contract, server, run, duckdb_connection) @@ -130,7 +130,6 @@ def check_soda_execute( soda_configuration_str = to_athena_soda_configuration(server) scan.add_configuration_yaml_str(soda_configuration_str) scan.set_data_source_name(server.type) - else: run.checks.append( Check( diff --git a/datacontract/engines/soda/connections/duckdb_connection.py b/datacontract/engines/soda/connections/duckdb_connection.py index 6ea58ae9d..3e1bb660d 100644 --- a/datacontract/engines/soda/connections/duckdb_connection.py +++ b/datacontract/engines/soda/connections/duckdb_connection.py @@ -6,6 +6,7 @@ from open_data_contract_standard.model import OpenDataContractStandard, SchemaObject, SchemaProperty, Server from datacontract.export.duckdb_type_converter import convert_to_duckdb_csv_type, convert_to_duckdb_json_type +from datacontract.imports.importer import setup_sftp_filesystem from datacontract.export.sql_type_converter import convert_to_duckdb from datacontract.model.run import Run @@ -22,18 +23,22 @@ def get_duckdb_connection( con = duckdb_connection path: str = "" - if server.type == "local": - path = server.path - if server.type == "s3": - path = server.location - setup_s3_connection(con, server) - if server.type == "gcs": - path = server.location - setup_gcs_connection(con, server) - if server.type == "azure": - path = server.location - setup_azure_connection(con, server) - + match server.type: + case "local": + path = server.path + case "s3": + path = server.location + setup_s3_connection(con, server) + case "gcs": + path = server.location + setup_gcs_connection(con, server) + case "azure": + path = server.location + setup_azure_connection(con, server) + case "sftp": + fs = setup_sftp_filesystem(server.location) + duckdb.register_filesystem(filesystem=fs, connection=con) + path = server.location if data_contract.schema_: for schema_obj in data_contract.schema_: model_name = schema_obj.name diff --git a/datacontract/imports/csv_importer.py b/datacontract/imports/csv_importer.py index ce117612d..cdfb2d972 100644 --- a/datacontract/imports/csv_importer.py +++ b/datacontract/imports/csv_importer.py @@ -4,7 +4,7 @@ import duckdb from open_data_contract_standard.model import OpenDataContractStandard -from datacontract.imports.importer import Importer +from datacontract.imports.importer import Importer, setup_sftp_filesystem from datacontract.imports.odcs_helper import ( create_odcs, create_property, @@ -29,6 +29,9 @@ def import_csv( # use duckdb to auto detect format, columns, etc. con = duckdb.connect(database=":memory:") + if source.startswith("sftp"): + fs = setup_sftp_filesystem(source) + duckdb.register_filesystem(filesystem=fs, connection=con) con.sql( f"""CREATE VIEW "{table_name}" AS SELECT * FROM read_csv_auto('{source}', hive_partitioning=1, auto_type_candidates = ['BOOLEAN', 'INTEGER', 'BIGINT', 'DOUBLE', 'VARCHAR']);""" ) diff --git a/datacontract/imports/dbml_importer.py b/datacontract/imports/dbml_importer.py index 70568a9de..8c24a6dee 100644 --- a/datacontract/imports/dbml_importer.py +++ b/datacontract/imports/dbml_importer.py @@ -11,6 +11,7 @@ create_schema_object, ) from datacontract.imports.sql_importer import map_type_from_sql +from datacontract.lint.resources import setup_sftp_filesystem from datacontract.model.exceptions import DataContractException diff --git a/datacontract/imports/excel_importer.py b/datacontract/imports/excel_importer.py index df18bae00..9cb0629d5 100644 --- a/datacontract/imports/excel_importer.py +++ b/datacontract/imports/excel_importer.py @@ -1,8 +1,9 @@ import logging import os from decimal import Decimal +from io import BytesIO from typing import Any, Dict, List, Optional - +from tempfile import TemporaryDirectory import openpyxl from open_data_contract_standard.model import ( AuthoritativeDefinition, @@ -22,7 +23,7 @@ from openpyxl.workbook.workbook import Workbook from openpyxl.worksheet.worksheet import Worksheet -from datacontract.imports.importer import Importer +from datacontract.imports.importer import Importer, setup_sftp_filesystem from datacontract.model.exceptions import DataContractException logger = logging.getLogger(__name__) @@ -47,9 +48,17 @@ def import_excel_as_odcs(excel_file_path: str) -> OpenDataContractStandard: Returns: OpenDataContractStandard object """ - if not os.path.exists(excel_file_path): + if not (excel_file_path.startswith("sftp://") or os.path.exists(excel_file_path)): raise FileNotFoundError(f"Excel file not found: {excel_file_path}") + if excel_file_path.startswith("sftp://"): + fs = setup_sftp_filesystem(excel_file_path) + try: + with fs.open(excel_file_path) as excel_file: + excel_file_path = BytesIO(excel_file.read()) + except FileNotFoundError: + raise FileNotFoundError(f"SftpConnection is a success but excel file not found: {excel_file_path}") + try: workbook = openpyxl.load_workbook(excel_file_path, data_only=True) except Exception as e: @@ -984,7 +993,7 @@ def parse_threshold_values(threshold_operator: str, threshold_value: str) -> Dic # Single value for other operators try: # Try to parse as number - isFraction = "." in threshold_value + isFraction = "." in threshold_value if threshold_value.replace(".", "").replace("-", "").isdigit(): if isFraction: value = float(threshold_value) diff --git a/datacontract/imports/importer.py b/datacontract/imports/importer.py index 24961fb8c..2657a1ac6 100644 --- a/datacontract/imports/importer.py +++ b/datacontract/imports/importer.py @@ -1,6 +1,9 @@ +import os from abc import ABC, abstractmethod from enum import Enum +from urllib.parse import urlparse +import fsspec from open_data_contract_standard.model import OpenDataContractStandard @@ -42,3 +45,16 @@ class ImportFormat(str, Enum): @classmethod def get_supported_formats(cls): return list(map(lambda c: c.value, cls)) + +def setup_sftp_filesystem(url: str): + parsed_url = urlparse(url) + hostname = parsed_url.hostname if parsed_url.hostname is not None else "127.0.0.1" + port = parsed_url.port if parsed_url.port is not None else 22 + sftp_user = os.getenv("DATACONTRACT_SFTP_USER") + sftp_password = os.getenv("DATACONTRACT_SFTP_PASSWORD") + if sftp_user is None or sftp_password is None: + raise ValueError("Error: Environment variable DATACONTRACT_SFTP_USER is not set") + if sftp_password is None: + raise ValueError("Error: Environment variable DATACONTRACT_SFTP_PASSWORD is not set") + return fsspec.filesystem("sftp", host=hostname, port=port, username=sftp_user, password=sftp_password) + diff --git a/datacontract/imports/json_importer.py b/datacontract/imports/json_importer.py index 219a910d3..c2593dd38 100644 --- a/datacontract/imports/json_importer.py +++ b/datacontract/imports/json_importer.py @@ -3,9 +3,9 @@ import re from typing import Any, Dict, List, Optional, Tuple +from datacontract.imports.importer import Importer, setup_sftp_filesystem from open_data_contract_standard.model import OpenDataContractStandard, SchemaProperty -from datacontract.imports.importer import Importer from datacontract.imports.odcs_helper import ( create_odcs, create_property, @@ -21,9 +21,9 @@ def import_source( return import_json(source) -def is_ndjson(file_path: str) -> bool: +def is_ndjson(file_ctx) -> bool: """Check if a file contains newline-delimited JSON.""" - with open(file_path, "r", encoding="utf-8") as file: + with file_ctx as file: for _ in range(5): line = file.readline().strip() if not line: @@ -39,11 +39,18 @@ def is_ndjson(file_path: str) -> bool: def import_json(source: str, include_examples: bool = False) -> OpenDataContractStandard: """Import a JSON file and create an ODCS data contract.""" base_model_name = os.path.splitext(os.path.basename(source))[0] + + if source.startswith("sftp://"): + fs = setup_sftp_filesystem(source) + file_ctx = fs.open(source, "r") + else: + file_ctx = open(source, "r", encoding="utf-8") + + # check if file is newline-delimited JSON + if is_ndjson(file_ctx): - # Check if file is newline-delimited JSON - if is_ndjson(source): json_data = [] - with open(source, "r", encoding="utf-8") as file: + with file_ctx as file: for line in file: line = line.strip() if line: @@ -52,7 +59,7 @@ def import_json(source: str, include_examples: bool = False) -> OpenDataContract except json.JSONDecodeError: continue else: - with open(source, "r", encoding="utf-8") as file: + with file_ctx as file: json_data = json.load(file) odcs = create_odcs() diff --git a/datacontract/imports/jsonschema_importer.py b/datacontract/imports/jsonschema_importer.py index 032eefec2..051dc0a23 100644 --- a/datacontract/imports/jsonschema_importer.py +++ b/datacontract/imports/jsonschema_importer.py @@ -4,12 +4,14 @@ import fastjsonschema from open_data_contract_standard.model import DataQuality, OpenDataContractStandard, SchemaProperty -from datacontract.imports.importer import Importer +from datacontract.imports.importer import Importer, setup_sftp_filesystem + from datacontract.imports.odcs_helper import ( create_odcs, create_property, create_schema_object, ) + from datacontract.model.exceptions import DataContractException @@ -50,8 +52,13 @@ def import_jsonschema(source: str) -> OpenDataContractStandard: def load_and_validate_json_schema(source: str) -> dict: """Load and validate a JSON Schema file.""" try: - with open(source, "r") as file: - json_schema = json.loads(file.read()) + if source.startswith("sftp://"): + fs = setup_sftp_filesystem(source) + with fs.open(source, "r") as file: + json_schema = json.loads(file.read()) + else: + with open(source, "r") as file: + json_schema = json.loads(file.read()) validator = fastjsonschema.compile({}) validator(json_schema) diff --git a/datacontract/imports/odcs_importer.py b/datacontract/imports/odcs_importer.py index a397ba1c5..e3c943cc8 100644 --- a/datacontract/imports/odcs_importer.py +++ b/datacontract/imports/odcs_importer.py @@ -3,6 +3,7 @@ from datacontract.imports.importer import Importer from datacontract.lint.resources import read_resource + from datacontract.model.exceptions import DataContractException @@ -16,7 +17,8 @@ def import_source( def import_odcs(source: str) -> OpenDataContractStandard: """Import an ODCS file directly - since ODCS is now the internal format, this is simpler.""" try: - odcs_yaml = yaml.safe_load(read_resource(source)) + odcs_contract = yaml.safe_load(read_resource(source)) + except Exception as e: raise DataContractException( type="schema", diff --git a/datacontract/imports/parquet_importer.py b/datacontract/imports/parquet_importer.py index 3265acda8..96cde1cff 100644 --- a/datacontract/imports/parquet_importer.py +++ b/datacontract/imports/parquet_importer.py @@ -4,6 +4,7 @@ from open_data_contract_standard.model import OpenDataContractStandard from pyarrow import parquet + from datacontract.imports.importer import Importer from datacontract.imports.odcs_helper import ( create_odcs, @@ -24,9 +25,15 @@ def import_parquet(source: str) -> OpenDataContractStandard: """Import a Parquet file and create an ODCS data contract.""" # use filename as schema name, remove .parquet suffix, avoid breaking the yaml output by replacing dots schema_name = os.path.basename(source).removesuffix(".parquet").replace(".", "_") - + if source.startswith("sftp://"): + fs = setup_sftp_filesystem(source) + # Extract path without the sftp:// prefix and host + path = source.split("//", 1)[1] # Remove sftp:// + path = "/" + path.split("/", 1)[1] # Remove host part + arrow_schema = parquet.read_schema(path, filesystem=fs) + else: + arrow_schema = parquet.read_schema(source) properties = [] - arrow_schema = parquet.read_schema(source) for field_name in arrow_schema.names: parquet_field = arrow_schema.field(field_name) diff --git a/datacontract/imports/unity_importer.py b/datacontract/imports/unity_importer.py index bd0fe27b3..5f99c2268 100644 --- a/datacontract/imports/unity_importer.py +++ b/datacontract/imports/unity_importer.py @@ -63,6 +63,7 @@ def import_unity_from_api(unity_table_full_name_list: List[str] = None) -> OpenD reason="", engine="datacontract", ) + if not profile and not host and not token: reason = "Either DATACONTRACT_DATABRICKS_PROFILE or both DATACONTRACT_DATABRICKS_SERVER_HOSTNAME and DATACONTRACT_DATABRICKS_TOKEN environment variables must be set" exception.reason = reason diff --git a/pyproject.toml b/pyproject.toml index d87a329e6..3d2fe226b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ classifiers = [ "Operating System :: OS Independent", ] requires-python = ">=3.10,<3.13" + dependencies = [ "typer>=0.15.1,<0.20", "pydantic>=2.8.2,<2.13.0", @@ -129,8 +130,12 @@ protobuf = [ "grpcio-tools>=1.53", ] +sftp = [ + "paramiko>=4.0.0", + ] + all = [ - "datacontract-cli[kafka,bigquery,csv,excel,snowflake,postgres,databricks,sqlserver,s3,athena,trino,dbt,dbml,iceberg,parquet,rdf,api,protobuf,oracle]" + "datacontract-cli[kafka,bigquery,csv,excel,snowflake,postgres,databricks,sqlserver,s3,athena,trino,dbt,dbml,iceberg,parquet,rdf,api,protobuf,oracle,sftp]" ] # for development, we pin all libraries to an exact version diff --git a/tests/fixtures/sftp-csv/data/sample_data.csv b/tests/fixtures/sftp-csv/data/sample_data.csv new file mode 100644 index 000000000..4ae23e482 --- /dev/null +++ b/tests/fixtures/sftp-csv/data/sample_data.csv @@ -0,0 +1,11 @@ +field_one,field_two,field_three +CX-263-DU,50,2023-06-16 13:12:56 +IK-894-MN,47,2023-10-08 22:40:57 +ER-399-JY,22,2023-05-16 01:08:22 +MT-939-FH,63,2023-03-15 05:15:21 +LV-849-MI,33,2023-09-08 20:08:43 +VS-079-OH,85,2023-04-15 00:50:32 +DN-297-XY,79,2023-11-08 12:55:42 +ZE-172-FP,14,2023-12-03 18:38:38 +ID-840-EG,89,2023-10-02 17:17:58 +FK-230-KZ,64,2023-11-27 15:21:48 diff --git a/tests/fixtures/sftp-csv/datacontract.yaml b/tests/fixtures/sftp-csv/datacontract.yaml new file mode 100644 index 000000000..34af9ebab --- /dev/null +++ b/tests/fixtures/sftp-csv/datacontract.yaml @@ -0,0 +1,30 @@ +dataContractSpecification: 1.2.0 +id: sftp-csv +info: + title: s30-csv + version: 0.0.1 + owner: my-domain-team +servers: + my-server: + type: sftp + location: sftp://localhost:22/sftp/data/sample_data.csv + format: csv + dataProductId: my-dataproduct +models: + my_table: + type: table + fields: + field_one: + type: varchar + required: true + unique: true + pattern: "[A-Za-z]{2}-\\d{3}-[A-Za-z]{2}$" + field_two: + type: bigint + minimum: 10 + field_three: + type: timestamp + quality: + - type: sql + query: "SELECT count(*) FROM my_table" + mustBeGreaterThan: 0 diff --git a/tests/test_import_sftp.py b/tests/test_import_sftp.py new file mode 100644 index 000000000..21f1b9335 --- /dev/null +++ b/tests/test_import_sftp.py @@ -0,0 +1,212 @@ +import os +os.environ["TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE"]="/var/run/docker.sock" +from time import sleep + +import paramiko +import pytest +import yaml +from testcontainers.sftp import SFTPContainer, SFTPUser +from typer.testing import CliRunner + +from datacontract.cli import app +from datacontract.data_contract import DataContract +from datacontract.imports.excel_importer import import_excel_as_odcs + +sftp_dir = "/sftp/data" + +csv_file_name = "sample_data" +csv_file_path = f"fixtures/csv/data/{csv_file_name}.csv" +csv_sftp_path = f"{sftp_dir}/{csv_file_name}.csv" + +parquet_file_name = "combined_no_time" +parquet_file_path = f"fixtures/parquet/data/{parquet_file_name}.parquet" +parquet_sftp_path = f"{sftp_dir}/{parquet_file_name}.parquet" + +avro_file_name = "orders" +avro_file_path = f"fixtures/avro/data/{avro_file_name}.avsc" +avro_sftp_path = f"{sftp_dir}/{avro_file_name}.avsc" + +dbml_file_name = "dbml" +dbml_file_path = f"fixtures/dbml/import/{dbml_file_name}.txt" +dbml_sftp_path = f"{sftp_dir}/{dbml_file_name}.txt" + +dbt_file_name = "manifest_jaffle_duckdb" +dbt_file_path = f"fixtures/dbt/import/{dbt_file_name}.json" +dbt_sftp_path = f"{sftp_dir}/{dbt_file_name}.json" + +iceberg_file_name = "simple_schema" +iceberg_file_path = f"fixtures/iceberg/{iceberg_file_name}.json" +iceberg_sftp_path = f"{sftp_dir}/{iceberg_file_name}.json" + +json_file_name = "orders" +json_file_path = f"fixtures/import/{json_file_name}.json" +json_sftp_path = f"{sftp_dir}/{json_file_name}.json" + +odcs_file_name = "full-example" +odcs_file_path = f"fixtures/odcs_v3/{odcs_file_name}.odcs.yaml" +odcs_sftp_path = f"{sftp_dir}/{odcs_file_name}.odcs.yaml" + +excel_file_path = "./fixtures/excel/shipments-odcs.xlsx" +excel_sftp_path = f"{sftp_dir}/shipments-odcs.xlsx" +username = "demo" # for emberstack +password = "demo" # for emberstack +user = SFTPUser(name = username,password=password) + + +@pytest.fixture +def sftp_container(): + """ + Initialize and provide an SFTP container for all tests in this module. + Sets up the container, uploads the test file, and provides connection details. + """ + # that image is both compatible with Mac and Linux which is not the case with the default image + + with SFTPContainer(image="emberstack/sftp:latest", users=[user]) as container: + host_ip = container.get_container_host_ip() + host_port = container.get_exposed_sftp_port() + + # Set environment variables for SFTP authentication + os.environ["DATACONTRACT_SFTP_USER"] = username + os.environ["DATACONTRACT_SFTP_PASSWORD"] = password + + # Wait for the container to be ready + sleep(3) + + # Upload test files to SFTP server + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(host_ip, host_port, username, password) + sftp = ssh.open_sftp() + try: + sftp.mkdir(sftp_dir) + sftp.put(avro_file_path, avro_sftp_path) + sftp.put(dbml_file_path, dbml_sftp_path) + sftp.put(dbt_file_path, dbt_sftp_path) + sftp.put(csv_file_path, csv_sftp_path) + sftp.put(iceberg_file_path, iceberg_sftp_path) + sftp.put(json_file_path, json_sftp_path) + sftp.put(odcs_file_path, odcs_sftp_path) + sftp.put(parquet_file_path, parquet_sftp_path) + sftp.put(excel_file_path, excel_sftp_path) + finally: + sftp.close() + ssh.close() + + + yield { + "host_ip": host_ip, + "host_port": host_port, + "container": container + } + + +def test_cli(sftp_container): + host_ip = sftp_container["host_ip"] + host_port = sftp_container["host_port"] + source = f"sftp://{host_ip}:{host_port}{csv_sftp_path}" + + runner = CliRunner() + result = runner.invoke( + app, + [ + "import", + "--format", + "csv", + "--source", + source, + ], + ) + assert result.exit_code == 0 + +def test_import_sftp_csv(sftp_container): + host_ip = sftp_container["host_ip"] + host_port = sftp_container["host_port"] + source = f"sftp://{host_ip}:{host_port}{csv_sftp_path}" + + result = DataContract.import_from_source("csv", source) + model = result.models[csv_file_name] + assert model is not None + assert len(model.fields["field_one"].examples) == 5 + assert len(model.fields["field_two"].examples) > 0 + assert len(model.fields["field_three"].examples) > 0 + + for k in model.fields.keys(): + model.fields[k].examples = None + + expected = f""" + dataContractSpecification: 1.2.1 + id: my-data-contract-id + info: + title: My Data Contract + version: 0.0.1 + servers: + production: + type: local + format: csv + path: sftp://{host_ip}:{host_port}{csv_sftp_path} + delimiter: ',' + models: + {csv_file_name}: + description: Generated model of sftp://{host_ip}:{host_port}{csv_sftp_path} + type: table + fields: + field_one: + type: string + required: true + unique: true + field_two: + type: integer + required: true + unique: true + minimum: 14.0 + maximum: 89.0 + field_three: + type: timestamp + required: true + unique: true + """ + assert yaml.safe_load(result.to_yaml()) == yaml.safe_load(expected) + # Disable linters so we don't get "missing description" warnings + assert DataContract(data_contract_str=expected).lint().has_passed() + + + + +def test_import_sftp_parquet(sftp_container): + host_ip = sftp_container["host_ip"] + host_port = sftp_container["host_port"] + source = f"sftp://{host_ip}:{host_port}{parquet_sftp_path}" + + result = DataContract.import_from_source("parquet", source) + model = result.models[parquet_file_name] + assert model is not None + assert model.fields["string_field"].type == "string" + assert model.fields["blob_field"].type == "bytes" + assert model.fields["boolean_field"].type == "boolean" + assert DataContract(data_contract=result).lint().has_passed() + + +def test_import_sftp_jsonschema(sftp_container): + host_ip = sftp_container["host_ip"] + host_port = sftp_container["host_port"] + source = f"sftp://{host_ip}:{host_port}{json_sftp_path}" + result = DataContract.import_from_source("jsonschema", source) + assert len(result.models.keys()) > 0 + assert DataContract(data_contract=result).lint().has_passed() + + +def test_import_excel_odcs(sftp_container): + host_ip = sftp_container["host_ip"] + host_port = sftp_container["host_port"] + source = f"sftp://{host_ip}:{host_port}{excel_sftp_path}" + result = import_excel_as_odcs(source) + expected_datacontract = read_file("fixtures/excel/shipments-odcs.yaml") + assert yaml.safe_load(result.to_yaml()) == yaml.safe_load(expected_datacontract) + +def read_file(file): + if not os.path.exists(file): + print(f"The file '{file}' does not exist.") + raise FileNotFoundError(f"The file '{file}' does not exist.") + with open(file, "r") as file: + file_content = file.read() + return file_content diff --git a/tests/test_lint.py b/tests/test_lint.py index 857996479..670880f61 100644 --- a/tests/test_lint.py +++ b/tests/test_lint.py @@ -1,3 +1,8 @@ +import os +from time import sleep +import paramiko +from datacontract_specification.model import Server +from testcontainers.sftp import SFTPContainer, SFTPUser from open_data_contract_standard.model import OpenDataContractStandard from typer.testing import CliRunner @@ -82,3 +87,36 @@ def test_lint_with_references(): run = data_contract.lint() assert run.result == "passed" + + + + +def test_lint_sftp(): + sftp_dir = "/sftp/data" + sftp_path = f"{sftp_dir}/valid_datacontract.yaml" + datacontract = "fixtures/lint/valid_datacontract.yaml" + username = "demo" # for emberstack + password = "demo" # for emberstack + user = SFTPUser(name=username, password=password) + os.environ["DATACONTRACT_SFTP_USER"] = username + os.environ["DATACONTRACT_SFTP_PASSWORD"] = password + + # that image is both compatible with Mac and Linux which is not the case with the default image + with SFTPContainer(image="emberstack/sftp:latest",users=[user]) as sftp_container: + host_ip = sftp_container.get_container_host_ip() + host_port = sftp_container.get_exposed_sftp_port() + sleep(3) #waiting for the container to be really ready + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(host_ip, host_port, username, password) + sftp = ssh.open_sftp() + dc = DataContract(data_contract_file=datacontract).get_data_contract_specification() + try: + sftp.mkdir(sftp_dir) + sftp.put(datacontract, sftp_path) + dc.servers["my-server"]=Server(location =f"sftp://{host_ip}:{host_port}{sftp_path}") + run = DataContract(data_contract=dc).lint() + assert run.result == "passed" + finally: + sftp.close() + ssh.close() diff --git a/tests/test_test_sftp_csv.py b/tests/test_test_sftp_csv.py new file mode 100644 index 000000000..61e30642f --- /dev/null +++ b/tests/test_test_sftp_csv.py @@ -0,0 +1,41 @@ +import os +from time import sleep + +import paramiko +from testcontainers.sftp import SFTPContainer, SFTPUser + +from datacontract.data_contract import DataContract + +sftp_dir = "/sftp/data" +sftp_path = f"{sftp_dir}/sample_data.csv" + +datacontract = "fixtures/sftp-csv/datacontract.yaml" +file_name = "fixtures/sftp-csv/data/sample_data.csv" + + +username = "demo" # for emberstack +password = "demo" # for emberstack +user = SFTPUser(name = username,password=password) + +def test_test_sftp_csv(): + os.environ["DATACONTRACT_SFTP_USER"] = username + os.environ["DATACONTRACT_SFTP_PASSWORD"] = password + # that image is both compatible with Mac and Linux which is not the case with the default image + with SFTPContainer(image="emberstack/sftp:latest",users=[user]) as sftp_container: + host_ip = sftp_container.get_container_host_ip() + host_port = sftp_container.get_exposed_sftp_port() + sleep(3) #waiting for the container to be really ready + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(host_ip, host_port, username, password) + sftp = ssh.open_sftp() + dc = DataContract(data_contract_file=datacontract).get_data_contract_specification() + try: + sftp.mkdir(sftp_dir) + sftp.put(file_name, sftp_path) + dc.servers["my-server"].location = f"sftp://{host_ip}:{host_port}{sftp_path}" + run = DataContract(data_contract=dc).test() + assert run.result == "passed" + finally: + sftp.close() + ssh.close()