Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ Supported server types:
- [impala](#impala)
- [api](#api)
- [local](#local)
- [sftp](#sftp)

Supported formats:

Expand Down Expand Up @@ -1027,6 +1028,37 @@ models:
type: string
```

#### sftp

Data Contract CLI can test sftp files in parquet, json, csv, or delta format.

##### Example

datacontract.yaml
```yaml
servers:
local:
type: sftp
path: sftp://data/*.parquet
format: parquet
models:
my_table_1: # corresponds to a table
type: table
fields:
my_column_1: # corresponds to a column
type: varchar
my_column_2: # corresponds to a column
type: string
```

##### Environment Variables

| Environment Variable | Example | Description |
|------------------------------|--------------------|-------------|
| `DATACONTRACT_SFTP_USERNAME` | `admin` | Username |
| `DATACONTRACT_SFTP_PASSWORD` | `mysecretpassword` | Password |



### export
```
Expand Down
Empty file modified datacontract/data_contract.py
100644 → 100755
Empty file.
3 changes: 1 addition & 2 deletions datacontract/engines/soda/check_soda_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def check_soda_execute(
run.log_info("Running engine soda-core")
scan = Scan()

if server.type in ["s3", "gcs", "azure", "local"]:
if server.type in ["s3", "gcs", "azure", "local", "sftp"]:
if server.format in ["json", "parquet", "csv", "delta"]:
run.log_info(f"Configuring engine soda-core to connect to {server.type} {server.format} with duckdb")
con = get_duckdb_connection(data_contract, server, run, duckdb_connection)
Expand Down Expand Up @@ -130,7 +130,6 @@ def check_soda_execute(
soda_configuration_str = to_athena_soda_configuration(server)
scan.add_configuration_yaml_str(soda_configuration_str)
scan.set_data_source_name(server.type)

else:
run.checks.append(
Check(
Expand Down
29 changes: 17 additions & 12 deletions datacontract/engines/soda/connections/duckdb_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from open_data_contract_standard.model import OpenDataContractStandard, SchemaObject, SchemaProperty, Server

from datacontract.export.duckdb_type_converter import convert_to_duckdb_csv_type, convert_to_duckdb_json_type
from datacontract.imports.importer import setup_sftp_filesystem
from datacontract.export.sql_type_converter import convert_to_duckdb
from datacontract.model.run import Run

Expand All @@ -22,18 +23,22 @@ def get_duckdb_connection(
con = duckdb_connection

path: str = ""
if server.type == "local":
path = server.path
if server.type == "s3":
path = server.location
setup_s3_connection(con, server)
if server.type == "gcs":
path = server.location
setup_gcs_connection(con, server)
if server.type == "azure":
path = server.location
setup_azure_connection(con, server)

match server.type:
case "local":
path = server.path
case "s3":
path = server.location
setup_s3_connection(con, server)
case "gcs":
path = server.location
setup_gcs_connection(con, server)
case "azure":
path = server.location
setup_azure_connection(con, server)
case "sftp":
fs = setup_sftp_filesystem(server.location)
duckdb.register_filesystem(filesystem=fs, connection=con)
path = server.location
if data_contract.schema_:
for schema_obj in data_contract.schema_:
model_name = schema_obj.name
Expand Down
5 changes: 4 additions & 1 deletion datacontract/imports/csv_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import duckdb
from open_data_contract_standard.model import OpenDataContractStandard

from datacontract.imports.importer import Importer
from datacontract.imports.importer import Importer, setup_sftp_filesystem
from datacontract.imports.odcs_helper import (
create_odcs,
create_property,
Expand All @@ -29,6 +29,9 @@ def import_csv(

# use duckdb to auto detect format, columns, etc.
con = duckdb.connect(database=":memory:")
if source.startswith("sftp"):
fs = setup_sftp_filesystem(source)
duckdb.register_filesystem(filesystem=fs, connection=con)
con.sql(
f"""CREATE VIEW "{table_name}" AS SELECT * FROM read_csv_auto('{source}', hive_partitioning=1, auto_type_candidates = ['BOOLEAN', 'INTEGER', 'BIGINT', 'DOUBLE', 'VARCHAR']);"""
)
Expand Down
1 change: 1 addition & 0 deletions datacontract/imports/dbml_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
create_schema_object,
)
from datacontract.imports.sql_importer import map_type_from_sql
from datacontract.lint.resources import setup_sftp_filesystem
from datacontract.model.exceptions import DataContractException


Expand Down
17 changes: 13 additions & 4 deletions datacontract/imports/excel_importer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
import os
from decimal import Decimal
from io import BytesIO
from typing import Any, Dict, List, Optional

from tempfile import TemporaryDirectory
import openpyxl
from open_data_contract_standard.model import (
AuthoritativeDefinition,
Expand All @@ -22,7 +23,7 @@
from openpyxl.workbook.workbook import Workbook
from openpyxl.worksheet.worksheet import Worksheet

from datacontract.imports.importer import Importer
from datacontract.imports.importer import Importer, setup_sftp_filesystem
from datacontract.model.exceptions import DataContractException

logger = logging.getLogger(__name__)
Expand All @@ -47,9 +48,17 @@ def import_excel_as_odcs(excel_file_path: str) -> OpenDataContractStandard:
Returns:
OpenDataContractStandard object
"""
if not os.path.exists(excel_file_path):
if not (excel_file_path.startswith("sftp://") or os.path.exists(excel_file_path)):
raise FileNotFoundError(f"Excel file not found: {excel_file_path}")

if excel_file_path.startswith("sftp://"):
fs = setup_sftp_filesystem(excel_file_path)
try:
with fs.open(excel_file_path) as excel_file:
excel_file_path = BytesIO(excel_file.read())
except FileNotFoundError:
raise FileNotFoundError(f"SftpConnection is a success but excel file not found: {excel_file_path}")

try:
workbook = openpyxl.load_workbook(excel_file_path, data_only=True)
except Exception as e:
Expand Down Expand Up @@ -984,7 +993,7 @@ def parse_threshold_values(threshold_operator: str, threshold_value: str) -> Dic
# Single value for other operators
try:
# Try to parse as number
isFraction = "." in threshold_value
isFraction = "." in threshold_value
if threshold_value.replace(".", "").replace("-", "").isdigit():
if isFraction:
value = float(threshold_value)
Expand Down
16 changes: 16 additions & 0 deletions datacontract/imports/importer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import os
from abc import ABC, abstractmethod
from enum import Enum
from urllib.parse import urlparse

import fsspec
from open_data_contract_standard.model import OpenDataContractStandard


Expand Down Expand Up @@ -42,3 +45,16 @@ class ImportFormat(str, Enum):
@classmethod
def get_supported_formats(cls):
return list(map(lambda c: c.value, cls))

def setup_sftp_filesystem(url: str):
parsed_url = urlparse(url)
hostname = parsed_url.hostname if parsed_url.hostname is not None else "127.0.0.1"
port = parsed_url.port if parsed_url.port is not None else 22
sftp_user = os.getenv("DATACONTRACT_SFTP_USER")
sftp_password = os.getenv("DATACONTRACT_SFTP_PASSWORD")
if sftp_user is None or sftp_password is None:
raise ValueError("Error: Environment variable DATACONTRACT_SFTP_USER is not set")
if sftp_password is None:
raise ValueError("Error: Environment variable DATACONTRACT_SFTP_PASSWORD is not set")
return fsspec.filesystem("sftp", host=hostname, port=port, username=sftp_user, password=sftp_password)

21 changes: 14 additions & 7 deletions datacontract/imports/json_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import re
from typing import Any, Dict, List, Optional, Tuple

from datacontract.imports.importer import Importer, setup_sftp_filesystem
from open_data_contract_standard.model import OpenDataContractStandard, SchemaProperty

from datacontract.imports.importer import Importer
from datacontract.imports.odcs_helper import (
create_odcs,
create_property,
Expand All @@ -21,9 +21,9 @@ def import_source(
return import_json(source)


def is_ndjson(file_path: str) -> bool:
def is_ndjson(file_ctx) -> bool:
"""Check if a file contains newline-delimited JSON."""
with open(file_path, "r", encoding="utf-8") as file:
with file_ctx as file:
for _ in range(5):
line = file.readline().strip()
if not line:
Expand All @@ -39,11 +39,18 @@ def is_ndjson(file_path: str) -> bool:
def import_json(source: str, include_examples: bool = False) -> OpenDataContractStandard:
"""Import a JSON file and create an ODCS data contract."""
base_model_name = os.path.splitext(os.path.basename(source))[0]

if source.startswith("sftp://"):
fs = setup_sftp_filesystem(source)
file_ctx = fs.open(source, "r")
else:
file_ctx = open(source, "r", encoding="utf-8")

# check if file is newline-delimited JSON
if is_ndjson(file_ctx):

# Check if file is newline-delimited JSON
if is_ndjson(source):
json_data = []
with open(source, "r", encoding="utf-8") as file:
with file_ctx as file:
for line in file:
line = line.strip()
if line:
Expand All @@ -52,7 +59,7 @@ def import_json(source: str, include_examples: bool = False) -> OpenDataContract
except json.JSONDecodeError:
continue
else:
with open(source, "r", encoding="utf-8") as file:
with file_ctx as file:
json_data = json.load(file)

odcs = create_odcs()
Expand Down
13 changes: 10 additions & 3 deletions datacontract/imports/jsonschema_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import fastjsonschema
from open_data_contract_standard.model import DataQuality, OpenDataContractStandard, SchemaProperty

from datacontract.imports.importer import Importer
from datacontract.imports.importer import Importer, setup_sftp_filesystem

from datacontract.imports.odcs_helper import (
create_odcs,
create_property,
create_schema_object,
)

from datacontract.model.exceptions import DataContractException


Expand Down Expand Up @@ -50,8 +52,13 @@ def import_jsonschema(source: str) -> OpenDataContractStandard:
def load_and_validate_json_schema(source: str) -> dict:
"""Load and validate a JSON Schema file."""
try:
with open(source, "r") as file:
json_schema = json.loads(file.read())
if source.startswith("sftp://"):
fs = setup_sftp_filesystem(source)
with fs.open(source, "r") as file:
json_schema = json.loads(file.read())
else:
with open(source, "r") as file:
json_schema = json.loads(file.read())

validator = fastjsonschema.compile({})
validator(json_schema)
Expand Down
4 changes: 3 additions & 1 deletion datacontract/imports/odcs_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from datacontract.imports.importer import Importer
from datacontract.lint.resources import read_resource

from datacontract.model.exceptions import DataContractException


Expand All @@ -16,7 +17,8 @@ def import_source(
def import_odcs(source: str) -> OpenDataContractStandard:
"""Import an ODCS file directly - since ODCS is now the internal format, this is simpler."""
try:
odcs_yaml = yaml.safe_load(read_resource(source))
odcs_contract = yaml.safe_load(read_resource(source))

except Exception as e:
raise DataContractException(
type="schema",
Expand Down
11 changes: 9 additions & 2 deletions datacontract/imports/parquet_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from open_data_contract_standard.model import OpenDataContractStandard
from pyarrow import parquet


from datacontract.imports.importer import Importer
from datacontract.imports.odcs_helper import (
create_odcs,
Expand All @@ -24,9 +25,15 @@ def import_parquet(source: str) -> OpenDataContractStandard:
"""Import a Parquet file and create an ODCS data contract."""
# use filename as schema name, remove .parquet suffix, avoid breaking the yaml output by replacing dots
schema_name = os.path.basename(source).removesuffix(".parquet").replace(".", "_")

if source.startswith("sftp://"):
fs = setup_sftp_filesystem(source)
# Extract path without the sftp:// prefix and host
path = source.split("//", 1)[1] # Remove sftp://
path = "/" + path.split("/", 1)[1] # Remove host part
arrow_schema = parquet.read_schema(path, filesystem=fs)
else:
arrow_schema = parquet.read_schema(source)
properties = []

arrow_schema = parquet.read_schema(source)
for field_name in arrow_schema.names:
parquet_field = arrow_schema.field(field_name)
Expand Down
1 change: 1 addition & 0 deletions datacontract/imports/unity_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def import_unity_from_api(unity_table_full_name_list: List[str] = None) -> OpenD
reason="",
engine="datacontract",
)

if not profile and not host and not token:
reason = "Either DATACONTRACT_DATABRICKS_PROFILE or both DATACONTRACT_DATABRICKS_SERVER_HOSTNAME and DATACONTRACT_DATABRICKS_TOKEN environment variables must be set"
exception.reason = reason
Expand Down
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ classifiers = [
"Operating System :: OS Independent",
]
requires-python = ">=3.10,<3.13"

dependencies = [
"typer>=0.15.1,<0.20",
"pydantic>=2.8.2,<2.13.0",
Expand Down Expand Up @@ -129,8 +130,12 @@ protobuf = [
"grpcio-tools>=1.53",
]

sftp = [
"paramiko>=4.0.0",
]

all = [
"datacontract-cli[kafka,bigquery,csv,excel,snowflake,postgres,databricks,sqlserver,s3,athena,trino,dbt,dbml,iceberg,parquet,rdf,api,protobuf,oracle]"
"datacontract-cli[kafka,bigquery,csv,excel,snowflake,postgres,databricks,sqlserver,s3,athena,trino,dbt,dbml,iceberg,parquet,rdf,api,protobuf,oracle,sftp]"
]

# for development, we pin all libraries to an exact version
Expand Down
11 changes: 11 additions & 0 deletions tests/fixtures/sftp-csv/data/sample_data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
field_one,field_two,field_three
CX-263-DU,50,2023-06-16 13:12:56
IK-894-MN,47,2023-10-08 22:40:57
ER-399-JY,22,2023-05-16 01:08:22
MT-939-FH,63,2023-03-15 05:15:21
LV-849-MI,33,2023-09-08 20:08:43
VS-079-OH,85,2023-04-15 00:50:32
DN-297-XY,79,2023-11-08 12:55:42
ZE-172-FP,14,2023-12-03 18:38:38
ID-840-EG,89,2023-10-02 17:17:58
FK-230-KZ,64,2023-11-27 15:21:48
Loading
Loading