diff --git a/soda-core/src/soda_core/common/file.py b/soda-core/src/soda_core/common/file.py new file mode 100644 index 000000000..4e57287bc --- /dev/null +++ b/soda-core/src/soda_core/common/file.py @@ -0,0 +1,5 @@ +class File: + @staticmethod + def read(file_path: str) -> str: + with open(file_path, "r") as file: + return file.read() diff --git a/soda-core/src/soda_core/common/yaml.py b/soda-core/src/soda_core/common/yaml.py index 4b1b9e04d..9a7085736 100644 --- a/soda-core/src/soda_core/common/yaml.py +++ b/soda-core/src/soda_core/common/yaml.py @@ -28,6 +28,40 @@ def dump_to_string(self, data): return stream.getvalue() +class Yaml: + + __yaml_parser = YamlParser() + + @classmethod + def parse(cls, yaml_str: str, file_path: str) -> YamlObject: + location = Location(file_path=file_path) + if not isinstance(yaml_str, str): + raise YamlParserException("YAML source is not a string", str(location)) + + try: + root_yaml_object: any = cls.__yaml_parser.ruamel_yaml_parser.load(yaml_str) + if isinstance(root_yaml_object, dict): + yaml_source: YamlSource = YamlSource(file_path=file_path, yaml_str=yaml_str) + return YamlObject( + yaml_source=yaml_source, + yaml_dict=root_yaml_object + ) + else: + yaml_type: str = root_yaml_object.__class__.__name__ if root_yaml_object is not None else "empty" + yaml_type = "a list" if yaml_type == "CommentedSeq" else yaml_type + raise YamlParserException( + f"{file_path} root must be an object, but was {yaml_type}", str(location) + ) + + except MarkedYAMLError as e: + mark = e.context_mark if e.context_mark else e.problem_mark + line = mark.line + 1 + col = mark.column + 1 + location = Location(file_path=file_path, line=line, column=col) + raise YamlParserException(f"YAML syntax error", str(location)) + + +# Deprecated class FileType(str, Enum): DATA_SOURCE = "Data Source" SODA_CLOUD = "Soda Cloud" @@ -103,6 +137,15 @@ def __init_subclass__(cls, file_type: FileType, **kwargs): def __str__(self) -> str: return self.description + def resolve_variables( + self, + var_values: Optional[dict[str, str]] = None, + soda_values: Optional[dict[str, str]] = None, + use_env_vars: bool = True + ) -> None: + + + @classmethod def _build_description(cls, file_type: str, file_path: Optional[str]) -> str: if file_type: diff --git a/soda-core/src/soda_core/contracts/api/verify_api.py b/soda-core/src/soda_core/contracts/api/verify_api.py index 9984307b0..fcdf055b4 100644 --- a/soda-core/src/soda_core/contracts/api/verify_api.py +++ b/soda-core/src/soda_core/contracts/api/verify_api.py @@ -13,11 +13,27 @@ ContractVerificationSession, ContractVerificationSessionResult, ) +from soda_core.contracts.impl.diagnostics_configuration import DiagnosticsConfiguration from soda_core.telemetry.soda_telemetry import SodaTelemetry soda_telemetry = SodaTelemetry() +def verify_contract_locally( + data_source_file_path: Optional[str] = None, + data_source_yaml_str: Optional[str] = None, + contract_file_path: Optional[str] = None, + contract_yaml_str: Optional[str] = None, + contract_dataset_identifier: Optional[str] = None, + soda_cloud_file_path: Optional[str] = None, + soda_cloud_yaml_str: Optional[str] = None, + publish: bool = False, + variables: Optional[Dict[str, str]] = None, + data_timestamp: Optional[str] = None, + diagnostics_configurations: Optional[DiagnosticsConfiguration] = None +) -> ContractVerificationSessionResult: + + def verify_contracts_locally( data_source_file_path: Optional[str] = None, data_source_file_paths: list[str] = [], diff --git a/soda-core/src/soda_core/contracts/contract_verification.py b/soda-core/src/soda_core/contracts/contract_verification.py index 4894c02cc..aca19d91a 100644 --- a/soda-core/src/soda_core/contracts/contract_verification.py +++ b/soda-core/src/soda_core/contracts/contract_verification.py @@ -20,6 +20,7 @@ class ContractVerificationSession: @classmethod def execute( cls, + contract_yaml_source: Optional[ContractYamlSource], contract_yaml_sources: list[ContractYamlSource], only_validate_without_execute: bool = False, variables: Optional[dict[str, str]] = None, diff --git a/soda-core/src/soda_core/contracts/impl/diagnostics_configuration.py b/soda-core/src/soda_core/contracts/impl/diagnostics_configuration.py new file mode 100644 index 000000000..a9cade51d --- /dev/null +++ b/soda-core/src/soda_core/contracts/impl/diagnostics_configuration.py @@ -0,0 +1,22 @@ +from dataclasses import dataclass +from enum import Enum +from typing import Optional + + +class FailedRowsStorageStrategy(Enum): + STORE_FULL_ROWS = "store_full_rows" + STORE_KEYS = "store_keys" + + +@dataclass +class DiagnosticsConfiguration: + """ + All the failed rows diagnostics configurations apart from the + data source configuration file for a single contract + """ + + # dwh_prefixes includes schema name + dwh_prefixes: list[str] + key_columns: list[str] + strategy: FailedRowsStorageStrategy + max_row_count: Optional[int] diff --git a/soda-core/src/soda_core/contracts/impl/verify_contract_locally_impl.py b/soda-core/src/soda_core/contracts/impl/verify_contract_locally_impl.py new file mode 100644 index 000000000..248066879 --- /dev/null +++ b/soda-core/src/soda_core/contracts/impl/verify_contract_locally_impl.py @@ -0,0 +1,154 @@ +from typing import Optional + +from soda_core.common.data_source_impl import DataSourceImpl +from soda_core.common.file import File +from soda_core.common.logs import Logs +from soda_core.common.soda_cloud import SodaCloud +from soda_core.common.yaml import VariableResolver, Yaml, YamlObject +from soda_core.contracts.contract_verification import ( + ContractVerificationResult, + SodaException, +) +from soda_core.contracts.impl.diagnostics_configuration import DiagnosticsConfiguration + + +def verify_contract_locally( + contract_file_path: Optional[str], + contract_yaml_str: Optional[str], + data_source_file_path: Optional[str], + data_source_yaml_str: Optional[str], + soda_cloud_file_path: Optional[str], + soda_cloud_yaml_str: Optional[str], + publish: bool, + only_validate_without_execute: bool, + variables: dict[str, str], + data_timestamp: Optional[str], + diagnostics_configuration: Optional[DiagnosticsConfiguration], +) -> ContractVerificationResult: + """ + returns ContractVerificationResult or raises a SodaException + """ + + logs: Logs = Logs() + + data_source_impl: DataSourceImpl = _build_data_source_impl( + data_source_file_path=data_source_file_path, + data_source_yaml_str=data_source_yaml_str, + logs=logs, + ) + + soda_cloud_impl: SodaCloud = _build_soda_cloud( + soda_cloud_file_path=soda_cloud_file_path, + soda_cloud_yaml_str=soda_cloud_yaml_str, + logs=logs, + ) + + with data_source_impl: + return _execute_locally_impl( + data_source_impl=data_source_impl, + soda_cloud_impl=soda_cloud_impl, + contract_file_path=contract_file_path, + contract_yaml_str=contract_yaml_str, + publish=publish, + only_validate_without_execute=only_validate_without_execute, + variables=variables, + data_timestamp=data_timestamp, + diagnostics_configuration=diagnostics_configuration, + logs=logs, + ) + + +def _execute_locally_impl( + data_source_impl: DataSourceImpl, + soda_cloud_impl: SodaCloud, + contract_file_path: Optional[str], + contract_yaml_str: Optional[str], + publish: bool, + only_validate_without_execute: bool, + variables: dict[str, str], + data_timestamp: Optional[str], + diagnostics_configuration: Optional[DiagnosticsConfiguration], + logs: Logs, +) -> ContractVerificationResult: + contract_yaml: ContractYaml = ContractYaml.parse( + contract_yaml_source=contract_yaml_source, + provided_variable_values=variables, + data_timestamp=data_timestamp, + ) + + contract_impl: ContractImpl = ContractImpl( + contract_yaml=contract_yaml, + only_validate_without_execute=only_validate_without_execute, + data_timestamp=contract_yaml.data_timestamp, + execution_timestamp=contract_yaml.execution_timestamp, + data_source_impl=data_source_impl, + soda_cloud=soda_cloud_impl, + publish_results=publish, + logs=logs, + ) + + return contract_impl.verify() + + +def _build_data_source_impl( + data_source_file_path: Optional[str], data_source_yaml_str: Optional[str], variables: dict[str, str], logs: Logs +) -> DataSourceImpl: + data_source_yaml_object: YamlObject = _parse_yaml_object( + file_path=data_source_file_path, + file_path_variable_name="data_source_file_path", + yaml_str=data_source_yaml_str, + yaml_str_variable_name="data_source_yaml_str", + variables=variables, + logs=logs, + ) + + return DataSourceImpl.parse(data_source_yaml_object=data_source_yaml_object, logs=logs) + + +def _build_soda_cloud( + soda_cloud_file_path: Optional[str], soda_cloud_yaml_str: Optional[str], variables: dict[str, str], logs: Logs +) -> SodaCloud: + soda_cloud_yaml_object: YamlObject = _parse_yaml_object( + file_path=soda_cloud_file_path, + file_path_variable_name="soda_cloud_file_path", + yaml_str=soda_cloud_yaml_str, + yaml_str_variable_name="soda_cloud_yaml_str", + variables=variables, + logs=logs, + ) + + return SodaCloud.parse(data_source_yaml_object=soda_cloud_yaml_object, logs=logs) + + +def _parse_yaml_object( + file_path: Optional[str], + file_path_variable_name: str, + yaml_str: Optional[str], + yaml_str_variable_name: str, + variables: dict[str, str], +) -> YamlObject: + if file_path is not None: + if not isinstance(file_path, str): + raise SodaException( + f"Expected {file_path_variable_name} to be a string, but was {type(file_path).__name__}" + ) + if yaml_str is not None: + raise SodaException( + f"Both {file_path_variable_name} and {yaml_str_variable_name} are provided. Only one expected." + ) + + yaml_str = File.read(file_path) + + if yaml_str is None: + raise SodaException(f"No {file_path_variable_name} nor {yaml_str_variable_name} specified") + + if not isinstance(yaml_str, str): + raise SodaException(f"Expected {yaml_str_variable_name} to be a string, but was {type(yaml_str).__name__}") + + resolved_yaml_str: str = VariableResolver.resolve( + source_text=yaml_str, variable_values=variables, use_env_vars=True + ) + + return Yaml.parse( + yaml_str=resolved_yaml_str, + )