Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
56719bd
feat: enhance databricks connection with Unity Catalog using databric…
bmoreau8 Sep 19, 2025
527f70f
Merge branch 'main' into main
jochenchrist Oct 17, 2025
9186253
feat: add SFTP support for data contracts test
bmoreau8 Nov 7, 2025
68d873e
Merge remote-tracking branch 'origin/main'
bmoreau8 Nov 7, 2025
1cd452a
Merge branch 'feature/sftp' into main
bmoreau8 Nov 7, 2025
c18fddd
Merge pull request #2 from bmoreau8/main
bmoreau8 Nov 7, 2025
2c12fdb
feat: add SFTP support for data contracts import and lint
bmoreau8 Nov 13, 2025
1ed59d3
style: organize imports
bmoreau8 Nov 13, 2025
09607ea
build: add missing dependency
bmoreau8 Nov 13, 2025
cd7e028
Merge branch 'datacontract:main' into main
bmoreau8 Nov 13, 2025
6cb40f7
fix: no need to define a scope
bmoreau8 Nov 13, 2025
33e48e9
Merge pull request #3 from bmoreau8/feature/sftp
bmoreau8 Nov 13, 2025
5acc670
typo: taking review in consideration
bmoreau8 Nov 13, 2025
74fb8ab
build: add paramiko to sftp optional dependencies
bmoreau8 Nov 13, 2025
1197858
Merge branch 'main' into main
bmoreau8 Nov 20, 2025
5fa4a07
Merge branch 'main' into main
jochenchrist Nov 20, 2025
19c9735
Merge branch 'main' into main
bmoreau8 Dec 2, 2025
4542a18
Merge branch 'main' into updated
bmoreau8 Jan 6, 2026
5853d79
build: assuming only data are on sftp servers Add Excel files on sftp
bmoreau8 Jan 8, 2026
c86e5e2
Merge branch 'updated' into origin/updated
bmoreau8 Jan 8, 2026
1866434
Merge pull request #6 from bmoreau8/origin/updated
bmoreau8 Jan 8, 2026
65d509c
ci: ruffed
bmoreau8 Jan 8, 2026
78cc4ae
fix: test fails due to ODCS switch
bmoreau8 Jan 8, 2026
c5fb5df
style: ruffed
bmoreau8 Jan 8, 2026
d0bcbd1
style: remove sonar warning
bmoreau8 Jan 8, 2026
846cdc6
doc: add extra description for sftp servers
bmoreau8 Jan 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Data Contract CLI
s# Data Contract CLI

<p>
<a href="https://github.com/datacontract/datacontract-cli/actions/workflows/ci.yaml?query=branch%3Amain">
Expand Down Expand Up @@ -389,6 +389,7 @@ Supported server types:
- [trino](#trino)
- [api](#api)
- [local](#local)
- [sftp](#sftp) (coming soon)

Supported formats:

Expand Down Expand Up @@ -967,6 +968,37 @@ models:
type: string
```

#### sftp

Data Contract CLI can test sftp files in parquet, json, csv, or delta format.

##### Example

datacontract.yaml
```yaml
servers:
local:
type: sftp
path: sftp://data/*.parquet
format: parquet
models:
my_table_1: # corresponds to a table
type: table
fields:
my_column_1: # corresponds to a column
type: varchar
my_column_2: # corresponds to a column
type: string
```

##### Environment Variables

| Environment Variable | Example | Description |
|------------------------------|--------------------|-------------|
| `DATACONTRACT_SFTP_USERNAME` | `admin` | Username |
| `DATACONTRACT_SFTP_PASSWORD` | `mysecretpassword` | Password |



### export
```
Expand Down
Empty file modified datacontract/data_contract.py
100644 → 100755
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was the file mode changed?

Empty file.
3 changes: 1 addition & 2 deletions datacontract/engines/soda/check_soda_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def check_soda_execute(
run.log_info("Running engine soda-core")
scan = Scan()

if server.type in ["s3", "gcs", "azure", "local"]:
if server.type in ["s3", "gcs", "azure", "local","sftp"]:
if server.format in ["json", "parquet", "csv", "delta"]:
run.log_info(f"Configuring engine soda-core to connect to {server.type} {server.format} with duckdb")
con = get_duckdb_connection(data_contract, server, run, duckdb_connection)
Expand Down Expand Up @@ -117,7 +117,6 @@ def check_soda_execute(
soda_configuration_str = to_athena_soda_configuration(server)
scan.add_configuration_yaml_str(soda_configuration_str)
scan.set_data_source_name(server.type)

else:
run.checks.append(
Check(
Expand Down
28 changes: 17 additions & 11 deletions datacontract/engines/soda/connections/duckdb_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import duckdb

from datacontract.export.duckdb_type_converter import convert_to_duckdb_csv_type, convert_to_duckdb_json_type
from datacontract.lint.resources import setup_sftp_filesystem
from datacontract.model.data_contract_specification import DataContractSpecification, Field, Model, Server
from datacontract.model.run import Run

Expand All @@ -20,17 +21,22 @@ def get_duckdb_connection(
con = duckdb_connection

path: str = ""
if server.type == "local":
path = server.path
if server.type == "s3":
path = server.location
setup_s3_connection(con, server)
if server.type == "gcs":
path = server.location
setup_gcs_connection(con, server)
if server.type == "azure":
path = server.location
setup_azure_connection(con, server)
match server.type :
case "local":
path = server.path
case "s3":
path = server.location
setup_s3_connection(con, server)
case"gcs":
path = server.location
setup_gcs_connection(con, server)
case "azure":
path = server.location
setup_azure_connection(con, server)
case "sftp":
fs = setup_sftp_filesystem(server.location)
duckdb.register_filesystem(filesystem=fs, connection=con)
path = server.location
for model_name, model in data_contract.models.items():
model_path = path
if "{model}" in model_path:
Expand Down
10 changes: 8 additions & 2 deletions datacontract/imports/avro_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import avro.schema

from datacontract.imports.importer import Importer
from datacontract.lint.resources import setup_sftp_filesystem
from datacontract.model.data_contract_specification import DataContractSpecification, Field, Model
from datacontract.model.exceptions import DataContractException

Expand Down Expand Up @@ -45,8 +46,13 @@ def import_avro(data_contract_specification: DataContractSpecification, source:
data_contract_specification.models = {}

try:
with open(source, "r") as file:
avro_schema = avro.schema.parse(file.read())
if source.startswith("sftp://"):
fs = setup_sftp_filesystem(source)
with fs.open(source, "r") as file:
avro_schema = avro.schema.parse(file.read())
else:
with open(source, "r") as file:
avro_schema = avro.schema.parse(file.read())
except Exception as e:
raise DataContractException(
type="schema",
Expand Down
4 changes: 4 additions & 0 deletions datacontract/imports/csv_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import duckdb

from datacontract.imports.importer import Importer
from datacontract.lint.resources import setup_sftp_filesystem
from datacontract.model.data_contract_specification import DataContractSpecification, Model, Server


Expand All @@ -22,6 +23,9 @@ def import_csv(

# use duckdb to auto detect format, columns, etc.
con = duckdb.connect(database=":memory:")
if source.startswith('sftp'):
fs = setup_sftp_filesystem(source)
duckdb.register_filesystem(filesystem=fs, connection=con)
con.sql(
f"""CREATE VIEW "{table_name}" AS SELECT * FROM read_csv_auto('{source}', hive_partitioning=1, auto_type_candidates = ['BOOLEAN', 'INTEGER', 'BIGINT', 'DOUBLE', 'VARCHAR']);"""
)
Expand Down
10 changes: 8 additions & 2 deletions datacontract/imports/dbml_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from datacontract.imports.importer import Importer
from datacontract.imports.sql_importer import map_type_from_sql
from datacontract.lint.resources import setup_sftp_filesystem
from datacontract.model.data_contract_specification import DataContractSpecification, Field, Model
from datacontract.model.exceptions import DataContractException

Expand All @@ -29,8 +30,13 @@ def import_dbml_from_source(
import_tables: List[str],
) -> DataContractSpecification:
try:
with open(source, "r") as file:
dbml_schema = PyDBML(file)
if source.startswith("sftp://"):
fs = setup_sftp_filesystem(source)
with fs.open(source, "r") as file:
dbml_schema = PyDBML(file)
else:
with open(source, "r") as file:
dbml_schema = PyDBML(file)
except ParseException as e:
raise DataContractException(
type="schema",
Expand Down
10 changes: 8 additions & 2 deletions datacontract/imports/dbt_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from datacontract.imports.bigquery_importer import map_type_from_bigquery
from datacontract.imports.importer import Importer
from datacontract.lint.resources import setup_sftp_filesystem
from datacontract.model.data_contract_specification import DataContractSpecification, Field, Model


Expand Down Expand Up @@ -44,8 +45,13 @@ def import_source(

def read_dbt_manifest(manifest_path: str) -> Manifest:
"""Read a manifest from file."""
with open(file=manifest_path, mode="r", encoding="utf-8") as f:
manifest_dict: dict = json.load(f)
if manifest_path.startswith("sftp://"):
fs = setup_sftp_filesystem(manifest_path)
with fs.open(manifest_path, "r") as f:
manifest_dict: dict = json.load(f)
else:
with open(file=manifest_path, mode="r", encoding="utf-8") as f:
manifest_dict: dict = json.load(f)
manifest = Manifest.from_dict(manifest_dict)
manifest.build_parent_and_child_maps()
return manifest
Expand Down
24 changes: 15 additions & 9 deletions datacontract/imports/iceberg_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pyiceberg.schema import Schema

from datacontract.imports.importer import Importer
from datacontract.lint.resources import setup_sftp_filesystem
from datacontract.model.data_contract_specification import DataContractSpecification, Field, Model
from datacontract.model.exceptions import DataContractException

Expand All @@ -22,16 +23,21 @@ def import_source(


def load_and_validate_iceberg_schema(source: str) -> Schema:
with open(source, "r") as file:
try:
if source.startswith("sftp://"):
fs = setup_sftp_filesystem(source)
with fs.open(source, "r") as file:
return Schema.model_validate_json(file.read())
except ValidationError as e:
raise DataContractException(
type="schema",
name="Parse iceberg schema",
reason=f"Failed to validate iceberg schema from {source}: {e}",
engine="datacontract",
)
else:
with open(source, "r") as file:
try:
return Schema.model_validate_json(file.read())
except ValidationError as e:
raise DataContractException(
type="schema",
name="Parse iceberg schema",
reason=f"Failed to validate iceberg schema from {source}: {e}",
engine="datacontract",
)


def import_iceberg(
Expand Down
2 changes: 1 addition & 1 deletion datacontract/imports/importer_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def register_importer(self, name, importer: Importer):
def register_lazy_importer(self, name: str, module_path: str, class_name: str):
self.dict_lazy_importer.update({name: (module_path, class_name)})

def create(self, name) -> Importer:
def create(self, name,server_type:str=None) -> Importer:
importers = self.dict_importer.copy()
importers.update(self.dict_lazy_importer.copy())
if name not in importers.keys():
Expand Down
10 changes: 8 additions & 2 deletions datacontract/imports/jsonschema_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import fastjsonschema

from datacontract.imports.importer import Importer
from datacontract.lint.resources import setup_sftp_filesystem
from datacontract.model.data_contract_specification import DataContractSpecification, Definition, Field, Model
from datacontract.model.exceptions import DataContractException

Expand Down Expand Up @@ -42,8 +43,13 @@ def import_jsonschema(data_contract_specification: DataContractSpecification, so

def load_and_validate_json_schema(source):
try:
with open(source, "r") as file:
json_schema = json.loads(file.read())
if source.startswith("sftp://"):
fs = setup_sftp_filesystem(source)
with fs.open(source, "r") as file:
json_schema = json.loads(file.read())
else:
with open(source, "r") as file:
json_schema = json.loads(file.read())

validator = fastjsonschema.compile({})
validator(json_schema)
Expand Down
9 changes: 7 additions & 2 deletions datacontract/imports/odcs_importer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import yaml

from datacontract.imports.importer import Importer
from datacontract.lint.resources import read_resource
from datacontract.lint.resources import read_resource, setup_sftp_filesystem
from datacontract.model.data_contract_specification import (
DataContractSpecification,
)
Expand All @@ -17,7 +17,12 @@ def import_source(

def import_odcs(data_contract_specification: DataContractSpecification, source: str) -> DataContractSpecification:
try:
odcs_contract = yaml.safe_load(read_resource(source))
if source.startswith("sftp://"):
fs = setup_sftp_filesystem(source)
with fs.open(source, "r") as file:
odcs_contract = yaml.safe_load(file.read())
else:
odcs_contract = yaml.safe_load(read_resource(source))

except Exception as e:
raise DataContractException(
Expand Down
11 changes: 9 additions & 2 deletions datacontract/imports/parquet_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pyarrow import parquet

from datacontract.imports.importer import Importer
from datacontract.lint.resources import setup_sftp_filesystem
from datacontract.model.data_contract_specification import (
DataContractSpecification,
Field,
Expand All @@ -24,8 +25,14 @@ def import_parquet(data_contract_specification: DataContractSpecification, sourc
schema_name = os.path.basename(source).removesuffix(".parquet").replace(".", "_")

fields: dict[str, Field] = {}

arrow_schema = parquet.read_schema(source)
if source.startswith("sftp://"):
fs = setup_sftp_filesystem(source)
# Extract path without the sftp:// prefix and host
path = source.split("//", 1)[1] # Remove sftp://
path = '/' + path.split("/",1)[1] # Remove host part
arrow_schema = parquet.read_schema(path, filesystem=fs)
else:
arrow_schema = parquet.read_schema(source)
for field_name in arrow_schema.names:
parquet_field = arrow_schema.field(field_name)

Expand Down
1 change: 1 addition & 0 deletions datacontract/imports/unity_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def import_unity_from_api(
reason="",
engine="datacontract",
)

if not profile and not host and not token:
reason = "Either DATACONTRACT_DATABRICKS_PROFILE or both DATACONTRACT_DATABRICKS_SERVER_HOSTNAME and DATACONTRACT_DATABRICKS_TOKEN environment variables must be set"
exception.reason = reason
Expand Down
22 changes: 22 additions & 0 deletions datacontract/lint/resources.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import os
from urllib.parse import urlparse

import fsspec

from datacontract.lint.files import read_file
from datacontract.lint.urls import fetch_resource

Expand All @@ -17,5 +22,22 @@ def read_resource(location: str) -> str:
"""
if location.startswith("http://") or location.startswith("https://"):
return fetch_resource(location)
elif location.startswith("sftp://"):
fs = setup_sftp_filesystem(location)
with fs.open(location, "r") as file:
return file.read()
else:
return read_file(location)

def setup_sftp_filesystem(url:str):
parsed_url = urlparse(url)
hostname = parsed_url.hostname if parsed_url.hostname is not None else "127.0.0.1"
port = parsed_url.port if parsed_url.port is not None else 22
sftp_user = os.getenv("DATACONTRACT_SFTP_USER")
sftp_password = os.getenv("DATACONTRACT_SFTP_PASSWORD")
if sftp_user is None or sftp_password is None:
raise ValueError("Error: Environment variable DATACONTRACT_SFTP_USER is not set")
if sftp_password is None :
raise ValueError("Error: Environment variable DATACONTRACT_SFTP_PASSWORD is not set")
return fsspec.filesystem("sftp", host=hostname,port=port,username=sftp_user,password=sftp_password)

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ classifiers = [
"Operating System :: OS Independent",
]
requires-python = ">=3.10"

dependencies = [
"typer>=0.15.1,<0.20",
"pydantic>=2.8.2,<2.13.0",
Expand All @@ -35,6 +36,7 @@ dependencies = [
"jinja_partials>=0.2.1,<1.0.0",
"datacontract-specification>=1.2.3,<2.0.0",
"open-data-contract-standard>=3.0.5,<4.0.0",
"paramiko>=4.0.0",
]

[project.optional-dependencies]
Expand Down
11 changes: 11 additions & 0 deletions tests/fixtures/sftp-csv/data/sample_data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
field_one,field_two,field_three
CX-263-DU,50,2023-06-16 13:12:56
IK-894-MN,47,2023-10-08 22:40:57
ER-399-JY,22,2023-05-16 01:08:22
MT-939-FH,63,2023-03-15 05:15:21
LV-849-MI,33,2023-09-08 20:08:43
VS-079-OH,85,2023-04-15 00:50:32
DN-297-XY,79,2023-11-08 12:55:42
ZE-172-FP,14,2023-12-03 18:38:38
ID-840-EG,89,2023-10-02 17:17:58
FK-230-KZ,64,2023-11-27 15:21:48
Loading
Loading