Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion airbyte_cdk/test/standard_tests/_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def run_test_job(
connector: IConnector | type[IConnector] | Callable[[], IConnector],
verb: Literal["spec", "read", "check", "discover"],
*,
connector_root: Path,
test_scenario: ConnectorTestScenario | None = None,
catalog: ConfiguredAirbyteCatalog | dict[str, Any] | None = None,
) -> entrypoint_wrapper.EntrypointOutput:
Expand All @@ -84,7 +85,10 @@ def run_test_job(
)

args: list[str] = [verb]
config_dict = test_scenario.get_config_dict(empty_if_missing=True)
config_dict = test_scenario.get_config_dict(
empty_if_missing=True,
connector_root=connector_root,
)
if config_dict and verb != "spec":
# Write the config to a temp json file and pass the path to the file as an argument.
config_path = (
Expand Down
31 changes: 18 additions & 13 deletions airbyte_cdk/test/standard_tests/connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def test_check(
self.create_connector(scenario),
"check",
test_scenario=scenario,
connector_root=self.get_connector_root_dir(),
)
conn_status_messages: list[AirbyteMessage] = [
msg for msg in result._messages if msg.type == Type.CONNECTION_STATUS
Expand Down Expand Up @@ -163,19 +164,23 @@ def get_scenarios(
):
continue

test_scenarios.extend(
[
ConnectorTestScenario.model_validate(test)
for test in all_tests_config["acceptance_tests"][category]["tests"]
if "config_path" in test and "iam_role" not in test["config_path"]
]
)
for test in all_tests_config["acceptance_tests"][category]["tests"]:
if "config_path" not in test:
# Skip tests without a config_path
continue

if "iam_role" in test["config_path"]:
# We skip iam_role tests for now, as they are not supported in the test suite.
continue

scenario = ConnectorTestScenario.model_validate(test)

if scenario.config_path and scenario.config_path in [
s.config_path for s in test_scenarios
]:
# Skip duplicate scenarios based on config_path
continue

connector_root = cls.get_connector_root_dir().absolute()
for test in test_scenarios:
if test.config_path:
test.config_path = connector_root / test.config_path
if test.configured_catalog_path:
test.configured_catalog_path = connector_root / test.configured_catalog_path
test_scenarios.append(scenario)

return test_scenarios
7 changes: 6 additions & 1 deletion airbyte_cdk/test/standard_tests/declarative_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ def create_connector(
config = {
"__injected_manifest": manifest_dict,
}
config.update(scenario.get_config_dict(empty_if_missing=True))
config.update(
scenario.get_config_dict(
empty_if_missing=True,
connector_root=cls.get_connector_root_dir(),
),
)

if cls.components_py_path and cls.components_py_path.exists():
os.environ["AIRBYTE_ENABLE_UNSAFE_CODE"] = "true"
Expand Down
43 changes: 41 additions & 2 deletions airbyte_cdk/test/standard_tests/models/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from typing import Any, Literal, cast

import yaml
from pydantic import BaseModel
from pydantic import BaseModel, ConfigDict


class ConnectorTestScenario(BaseModel):
Expand All @@ -24,6 +24,10 @@ class ConnectorTestScenario(BaseModel):
acceptance test configuration file.
"""

# Allows the class to be hashable, which PyTest will require
# when we use to parameterize tests.
model_config = ConfigDict(frozen=True)

class AcceptanceTestExpectRecords(BaseModel):
path: Path
exact_order: bool = False
Expand All @@ -46,6 +50,7 @@ class AcceptanceTestFileTypes(BaseModel):
def get_config_dict(
self,
*,
connector_root: Path,
empty_if_missing: bool,
) -> dict[str, Any]:
"""Return the config dictionary.
Expand All @@ -61,7 +66,15 @@ def get_config_dict(
return self.config_dict

if self.config_path is not None:
return cast(dict[str, Any], yaml.safe_load(self.config_path.read_text()))
config_path = self.config_path
if not config_path.is_absolute():
# We usually receive a relative path here. Let's resolve it.
config_path = (connector_root / self.config_path).resolve().absolute()

return cast(
dict[str, Any],
yaml.safe_load(config_path.read_text()),
)

if empty_if_missing:
return {}
Expand All @@ -83,3 +96,29 @@ def __str__(self) -> str:
return f"'{self.config_path.name}' Test Scenario"

return f"'{hash(self)}' Test Scenario"

def without_expecting_failure(self) -> ConnectorTestScenario:
"""Return a copy of the scenario that does not expect failure.

This is useful when you need to run multiple steps and you
want to defer failure expectation for one or more steps.
"""
if self.status != "failed":
return self

return ConnectorTestScenario(
**self.model_dump(exclude={"status"}),
)

def with_expecting_failure(self) -> ConnectorTestScenario:
"""Return a copy of the scenario that expects failure.

This is useful when deriving new scenarios from existing ones.
"""
if self.status == "failed":
return self

return ConnectorTestScenario(
**self.model_dump(exclude={"status"}),
status="failed",
)
20 changes: 12 additions & 8 deletions airbyte_cdk/test/standard_tests/source_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def test_check(
self.create_connector(scenario),
"check",
test_scenario=scenario,
connector_root=self.get_connector_root_dir(),
)
conn_status_messages: list[AirbyteMessage] = [
msg for msg in result._messages if msg.type == Type.CONNECTION_STATUS
Expand All @@ -61,6 +62,7 @@ def test_discover(
run_test_job(
self.create_connector(scenario),
"discover",
connector_root=self.get_connector_root_dir(),
test_scenario=scenario,
)

Expand All @@ -80,6 +82,7 @@ def test_spec(self) -> None:
verb="spec",
test_scenario=None,
connector=self.create_connector(scenario=None),
connector_root=self.get_connector_root_dir(),
)
# If an error occurs, it will be raised above.

Expand All @@ -102,10 +105,11 @@ def test_basic_read(
discover_result = run_test_job(
self.create_connector(scenario),
"discover",
test_scenario=scenario,
connector_root=self.get_connector_root_dir(),
test_scenario=scenario.without_expecting_failure(),
)
if scenario.expect_exception:
assert discover_result.errors, "Expected exception but got none."
if scenario.expect_exception and discover_result.errors:
# Failed as expected; we're done.
return

configured_catalog = ConfiguredAirbyteCatalog(
Expand All @@ -122,6 +126,7 @@ def test_basic_read(
self.create_connector(scenario),
"read",
test_scenario=scenario,
connector_root=self.get_connector_root_dir(),
catalog=configured_catalog,
)

Expand Down Expand Up @@ -149,15 +154,14 @@ def test_fail_read_with_bad_catalog(
),
sync_mode="INVALID", # type: ignore [reportArgumentType]
destination_sync_mode="INVALID", # type: ignore [reportArgumentType]
)
]
),
],
)
# Set expected status to "failed" to ensure the test fails if the connector.
scenario.status = "failed"
result: entrypoint_wrapper.EntrypointOutput = run_test_job(
self.create_connector(scenario),
"read",
test_scenario=scenario,
connector_root=self.get_connector_root_dir(),
test_scenario=scenario.with_expecting_failure(), # Expect failure due to bad catalog
catalog=asdict(invalid_configured_catalog),
)
assert result.errors, "Expected errors but got none."
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/test/standard_tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def create_connector_test_suite(
)

subclass_overrides: dict[str, Any] = {
"get_connector_root_dir": lambda: connector_directory,
"get_connector_root_dir": classmethod(lambda cls: connector_directory),
}

TestSuiteAuto = type(
Expand Down
Loading