Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions soda-core/src/soda_core/common/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class File:
@staticmethod
def read(file_path: str) -> str:
with open(file_path, "r") as file:
return file.read()
43 changes: 43 additions & 0 deletions soda-core/src/soda_core/common/yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,40 @@ def dump_to_string(self, data):
return stream.getvalue()


class Yaml:

__yaml_parser = YamlParser()

@classmethod
def parse(cls, yaml_str: str, file_path: str) -> YamlObject:
location = Location(file_path=file_path)
if not isinstance(yaml_str, str):
raise YamlParserException("YAML source is not a string", str(location))

try:
root_yaml_object: any = cls.__yaml_parser.ruamel_yaml_parser.load(yaml_str)
if isinstance(root_yaml_object, dict):
yaml_source: YamlSource = YamlSource(file_path=file_path, yaml_str=yaml_str)
return YamlObject(
yaml_source=yaml_source,
yaml_dict=root_yaml_object
)
else:
yaml_type: str = root_yaml_object.__class__.__name__ if root_yaml_object is not None else "empty"
yaml_type = "a list" if yaml_type == "CommentedSeq" else yaml_type
raise YamlParserException(
f"{file_path} root must be an object, but was {yaml_type}", str(location)
)

except MarkedYAMLError as e:
mark = e.context_mark if e.context_mark else e.problem_mark
line = mark.line + 1
col = mark.column + 1
location = Location(file_path=file_path, line=line, column=col)
raise YamlParserException(f"YAML syntax error", str(location))


# Deprecated
class FileType(str, Enum):
DATA_SOURCE = "Data Source"
SODA_CLOUD = "Soda Cloud"
Expand Down Expand Up @@ -103,6 +137,15 @@ def __init_subclass__(cls, file_type: FileType, **kwargs):
def __str__(self) -> str:
return self.description

def resolve_variables(
self,
var_values: Optional[dict[str, str]] = None,
soda_values: Optional[dict[str, str]] = None,
use_env_vars: bool = True
) -> None:



@classmethod
def _build_description(cls, file_type: str, file_path: Optional[str]) -> str:
if file_type:
Expand Down
16 changes: 16 additions & 0 deletions soda-core/src/soda_core/contracts/api/verify_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,27 @@
ContractVerificationSession,
ContractVerificationSessionResult,
)
from soda_core.contracts.impl.diagnostics_configuration import DiagnosticsConfiguration
from soda_core.telemetry.soda_telemetry import SodaTelemetry

soda_telemetry = SodaTelemetry()


def verify_contract_locally(
data_source_file_path: Optional[str] = None,
data_source_yaml_str: Optional[str] = None,
contract_file_path: Optional[str] = None,
contract_yaml_str: Optional[str] = None,
contract_dataset_identifier: Optional[str] = None,
soda_cloud_file_path: Optional[str] = None,
soda_cloud_yaml_str: Optional[str] = None,
publish: bool = False,
variables: Optional[Dict[str, str]] = None,
data_timestamp: Optional[str] = None,
diagnostics_configurations: Optional[DiagnosticsConfiguration] = None
) -> ContractVerificationSessionResult:


def verify_contracts_locally(
data_source_file_path: Optional[str] = None,
data_source_file_paths: list[str] = [],
Expand Down
1 change: 1 addition & 0 deletions soda-core/src/soda_core/contracts/contract_verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class ContractVerificationSession:
@classmethod
def execute(
cls,
contract_yaml_source: Optional[ContractYamlSource],
contract_yaml_sources: list[ContractYamlSource],
only_validate_without_execute: bool = False,
variables: Optional[dict[str, str]] = None,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from dataclasses import dataclass
from enum import Enum
from typing import Optional


class FailedRowsStorageStrategy(Enum):
STORE_FULL_ROWS = "store_full_rows"
STORE_KEYS = "store_keys"


@dataclass
class DiagnosticsConfiguration:
"""
All the failed rows diagnostics configurations apart from the
data source configuration file for a single contract
"""

# dwh_prefixes includes schema name
dwh_prefixes: list[str]
key_columns: list[str]
strategy: FailedRowsStorageStrategy
max_row_count: Optional[int]
154 changes: 154 additions & 0 deletions soda-core/src/soda_core/contracts/impl/verify_contract_locally_impl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
from typing import Optional

from soda_core.common.data_source_impl import DataSourceImpl
from soda_core.common.file import File
from soda_core.common.logs import Logs
from soda_core.common.soda_cloud import SodaCloud
from soda_core.common.yaml import VariableResolver, Yaml, YamlObject
from soda_core.contracts.contract_verification import (
ContractVerificationResult,
SodaException,
)
from soda_core.contracts.impl.diagnostics_configuration import DiagnosticsConfiguration


def verify_contract_locally(
contract_file_path: Optional[str],
contract_yaml_str: Optional[str],
data_source_file_path: Optional[str],
data_source_yaml_str: Optional[str],
soda_cloud_file_path: Optional[str],
soda_cloud_yaml_str: Optional[str],
publish: bool,
only_validate_without_execute: bool,
variables: dict[str, str],
data_timestamp: Optional[str],
diagnostics_configuration: Optional[DiagnosticsConfiguration],
) -> ContractVerificationResult:
"""
returns ContractVerificationResult or raises a SodaException
"""

logs: Logs = Logs()

data_source_impl: DataSourceImpl = _build_data_source_impl(
data_source_file_path=data_source_file_path,
data_source_yaml_str=data_source_yaml_str,
logs=logs,
)

soda_cloud_impl: SodaCloud = _build_soda_cloud(
soda_cloud_file_path=soda_cloud_file_path,
soda_cloud_yaml_str=soda_cloud_yaml_str,
logs=logs,
)

with data_source_impl:
return _execute_locally_impl(
data_source_impl=data_source_impl,
soda_cloud_impl=soda_cloud_impl,
contract_file_path=contract_file_path,
contract_yaml_str=contract_yaml_str,
publish=publish,
only_validate_without_execute=only_validate_without_execute,
variables=variables,
data_timestamp=data_timestamp,
diagnostics_configuration=diagnostics_configuration,
logs=logs,
)


def _execute_locally_impl(
data_source_impl: DataSourceImpl,
soda_cloud_impl: SodaCloud,
contract_file_path: Optional[str],
contract_yaml_str: Optional[str],
publish: bool,
only_validate_without_execute: bool,
variables: dict[str, str],
data_timestamp: Optional[str],
diagnostics_configuration: Optional[DiagnosticsConfiguration],
logs: Logs,
) -> ContractVerificationResult:
contract_yaml: ContractYaml = ContractYaml.parse(
contract_yaml_source=contract_yaml_source,
provided_variable_values=variables,
data_timestamp=data_timestamp,
)

contract_impl: ContractImpl = ContractImpl(
contract_yaml=contract_yaml,
only_validate_without_execute=only_validate_without_execute,
data_timestamp=contract_yaml.data_timestamp,
execution_timestamp=contract_yaml.execution_timestamp,
data_source_impl=data_source_impl,
soda_cloud=soda_cloud_impl,
publish_results=publish,
logs=logs,
)

return contract_impl.verify()


def _build_data_source_impl(
data_source_file_path: Optional[str], data_source_yaml_str: Optional[str], variables: dict[str, str], logs: Logs
) -> DataSourceImpl:
data_source_yaml_object: YamlObject = _parse_yaml_object(
file_path=data_source_file_path,
file_path_variable_name="data_source_file_path",
yaml_str=data_source_yaml_str,
yaml_str_variable_name="data_source_yaml_str",
variables=variables,
logs=logs,
)

return DataSourceImpl.parse(data_source_yaml_object=data_source_yaml_object, logs=logs)


def _build_soda_cloud(
soda_cloud_file_path: Optional[str], soda_cloud_yaml_str: Optional[str], variables: dict[str, str], logs: Logs
) -> SodaCloud:
soda_cloud_yaml_object: YamlObject = _parse_yaml_object(
file_path=soda_cloud_file_path,
file_path_variable_name="soda_cloud_file_path",
yaml_str=soda_cloud_yaml_str,
yaml_str_variable_name="soda_cloud_yaml_str",
variables=variables,
logs=logs,
)

return SodaCloud.parse(data_source_yaml_object=soda_cloud_yaml_object, logs=logs)


def _parse_yaml_object(
file_path: Optional[str],
file_path_variable_name: str,
yaml_str: Optional[str],
yaml_str_variable_name: str,
variables: dict[str, str],
) -> YamlObject:
if file_path is not None:
if not isinstance(file_path, str):
raise SodaException(
f"Expected {file_path_variable_name} to be a string, but was {type(file_path).__name__}"
)
if yaml_str is not None:
raise SodaException(
f"Both {file_path_variable_name} and {yaml_str_variable_name} are provided. Only one expected."
)

yaml_str = File.read(file_path)

if yaml_str is None:
raise SodaException(f"No {file_path_variable_name} nor {yaml_str_variable_name} specified")

if not isinstance(yaml_str, str):
raise SodaException(f"Expected {yaml_str_variable_name} to be a string, but was {type(yaml_str).__name__}")

resolved_yaml_str: str = VariableResolver.resolve(
source_text=yaml_str, variable_values=variables, use_env_vars=True
)

return Yaml.parse(
yaml_str=resolved_yaml_str,
)
Loading