Skip to content

Commit 7504a59

Browse files
authored
fix: fast tests conditionally skip streams (#654)
1 parent 312f2e1 commit 7504a59

File tree

6 files changed

+79
-38
lines changed

6 files changed

+79
-38
lines changed

airbyte_cdk/test/entrypoint_wrapper.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
from airbyte_cdk.test.models.scenario import ExpectedOutcome
5252

5353

54-
@dataclass
5554
class AirbyteEntrypointException(Exception):
5655
"""Exception raised for errors in the AirbyteEntrypoint execution.
5756

airbyte_cdk/test/models/scenario.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,21 @@ class AcceptanceTestFileTypes(BaseModel):
4444
skip_test: bool
4545
bypass_reason: str
4646

47+
class AcceptanceTestEmptyStream(BaseModel):
48+
name: str
49+
bypass_reason: str | None = None
50+
51+
# bypass reason does not affect equality
52+
def __hash__(self) -> int:
53+
return hash(self.name)
54+
4755
config_path: Path | None = None
4856
config_dict: dict[str, Any] | None = None
4957

5058
_id: str | None = None # Used to override the default ID generation
5159

5260
configured_catalog_path: Path | None = None
61+
empty_streams: list[AcceptanceTestEmptyStream] | None = None
5362
timeout_seconds: int | None = None
5463
expect_records: AcceptanceTestExpectRecords | None = None
5564
file_types: AcceptanceTestFileTypes | None = None

airbyte_cdk/test/standard_tests/connector_base.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,12 @@
1010

1111
from boltons.typeutils import classproperty
1212

13-
from airbyte_cdk.models import (
14-
AirbyteMessage,
15-
Type,
16-
)
1713
from airbyte_cdk.test import entrypoint_wrapper
1814
from airbyte_cdk.test.models import (
1915
ConnectorTestScenario,
2016
)
2117
from airbyte_cdk.test.standard_tests._job_runner import IConnector, run_test_job
2218
from airbyte_cdk.test.standard_tests.docker_base import DockerConnectorTestSuite
23-
from airbyte_cdk.utils.connector_paths import (
24-
ACCEPTANCE_TEST_CONFIG,
25-
find_connector_root,
26-
)
2719

2820
if TYPE_CHECKING:
2921
from collections.abc import Callable

airbyte_cdk/test/standard_tests/docker_base.py

Lines changed: 62 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@
1010
import warnings
1111
from dataclasses import asdict
1212
from pathlib import Path
13-
from subprocess import CompletedProcess, SubprocessError
14-
from typing import Literal, cast
13+
from typing import Any, Literal, cast
1514

1615
import orjson
1716
import pytest
@@ -35,7 +34,6 @@
3534
from airbyte_cdk.utils.docker import (
3635
build_connector_image,
3736
run_docker_airbyte_command,
38-
run_docker_command,
3937
)
4038

4139

@@ -66,13 +64,57 @@ def is_destination_connector(cls) -> bool:
6664
return cast(str, cls.connector_name).startswith("destination-")
6765

6866
@classproperty
69-
def acceptance_test_config_path(cls) -> Path:
70-
"""Get the path to the acceptance test config file."""
71-
result = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG
72-
if result.exists():
73-
return result
67+
def acceptance_test_config(cls) -> Any:
68+
"""Get the contents of acceptance test config file.
7469
75-
raise FileNotFoundError(f"Acceptance test config file not found at: {str(result)}")
70+
Also perform some basic validation that the file has the expected structure.
71+
"""
72+
acceptance_test_config_path = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG
73+
if not acceptance_test_config_path.exists():
74+
raise FileNotFoundError(
75+
f"Acceptance test config file not found at: {str(acceptance_test_config_path)}"
76+
)
77+
78+
tests_config = yaml.safe_load(acceptance_test_config_path.read_text())
79+
80+
if "acceptance_tests" not in tests_config:
81+
raise ValueError(
82+
f"Acceptance tests config not found in {acceptance_test_config_path}."
83+
f" Found only: {str(tests_config)}."
84+
)
85+
return tests_config
86+
87+
@staticmethod
88+
def _dedup_scenarios(scenarios: list[ConnectorTestScenario]) -> list[ConnectorTestScenario]:
89+
"""
90+
For FAST tests, we treat each config as a separate test scenario to run against, whereas CATs defined
91+
a series of more granular scenarios specifying a config_path and empty_streams among other things.
92+
93+
This method deduplicates the CATs scenarios based on their config_path. In doing so, we choose to
94+
take the union of any defined empty_streams, to have high confidence that runnning a read with the
95+
config will not error on the lack of data in the empty streams or lack of permissions to read them.
96+
97+
"""
98+
deduped_scenarios: list[ConnectorTestScenario] = []
99+
100+
for scenario in scenarios:
101+
for existing_scenario in deduped_scenarios:
102+
if scenario.config_path == existing_scenario.config_path:
103+
# If a scenario with the same config_path already exists, we merge the empty streams.
104+
# scenarios are immutable, so we create a new one.
105+
all_empty_streams = (existing_scenario.empty_streams or []) + (
106+
scenario.empty_streams or []
107+
)
108+
merged_scenario = existing_scenario.model_copy(
109+
update={"empty_streams": list(set(all_empty_streams))}
110+
)
111+
deduped_scenarios.remove(existing_scenario)
112+
deduped_scenarios.append(merged_scenario)
113+
break
114+
else:
115+
# If a scenario does not exist with the config, add the new scenario to the list.
116+
deduped_scenarios.append(scenario)
117+
return deduped_scenarios
76118

77119
@classmethod
78120
def get_scenarios(
@@ -83,9 +125,8 @@ def get_scenarios(
83125
This has to be a separate function because pytest does not allow
84126
parametrization of fixtures with arguments from the test class itself.
85127
"""
86-
categories = ["connection", "spec"]
87128
try:
88-
acceptance_test_config_path = cls.acceptance_test_config_path
129+
all_tests_config = cls.acceptance_test_config
89130
except FileNotFoundError as e:
90131
# Destinations sometimes do not have an acceptance tests file.
91132
warnings.warn(
@@ -95,15 +136,9 @@ def get_scenarios(
95136
)
96137
return []
97138

98-
all_tests_config = yaml.safe_load(cls.acceptance_test_config_path.read_text())
99-
if "acceptance_tests" not in all_tests_config:
100-
raise ValueError(
101-
f"Acceptance tests config not found in {cls.acceptance_test_config_path}."
102-
f" Found only: {str(all_tests_config)}."
103-
)
104-
105139
test_scenarios: list[ConnectorTestScenario] = []
106-
for category in categories:
140+
# we look in the basic_read section to find any empty streams
141+
for category in ["spec", "connection", "basic_read"]:
107142
if (
108143
category not in all_tests_config["acceptance_tests"]
109144
or "tests" not in all_tests_config["acceptance_tests"][category]
@@ -121,15 +156,11 @@ def get_scenarios(
121156

122157
scenario = ConnectorTestScenario.model_validate(test)
123158

124-
if scenario.config_path and scenario.config_path in [
125-
s.config_path for s in test_scenarios
126-
]:
127-
# Skip duplicate scenarios based on config_path
128-
continue
129-
130159
test_scenarios.append(scenario)
131160

132-
return test_scenarios
161+
deduped_test_scenarios = cls._dedup_scenarios(test_scenarios)
162+
163+
return deduped_test_scenarios
133164

134165
@pytest.mark.skipif(
135166
shutil.which("docker") is None,
@@ -332,6 +363,11 @@ def test_docker_image_build_and_read(
332363
# If `read_from_streams` is a list, we filter the discovered streams.
333364
streams_list = list(set(streams_list) & set(read_from_streams))
334365

366+
if scenario.empty_streams:
367+
# Filter out streams marked as empty in the scenario.
368+
empty_stream_names = [stream.name for stream in scenario.empty_streams]
369+
streams_list = [s for s in streams_list if s.name not in empty_stream_names]
370+
335371
configured_catalog: ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog(
336372
streams=[
337373
ConfiguredAirbyteStream(

airbyte_cdk/test/standard_tests/source_base.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ def test_basic_read(
120120
if scenario.expected_outcome.expect_exception() and discover_result.errors:
121121
# Failed as expected; we're done.
122122
return
123+
streams = discover_result.catalog.catalog.streams # type: ignore [reportOptionalMemberAccess, union-attr]
124+
125+
if scenario.empty_streams:
126+
# Filter out streams marked as empty in the scenario.
127+
empty_stream_names = [stream.name for stream in scenario.empty_streams]
128+
streams = [s for s in streams if s.name not in empty_stream_names]
123129

124130
configured_catalog = ConfiguredAirbyteCatalog(
125131
streams=[
@@ -128,7 +134,7 @@ def test_basic_read(
128134
sync_mode=SyncMode.full_refresh,
129135
destination_sync_mode=DestinationSyncMode.append_dedup,
130136
)
131-
for stream in discover_result.catalog.catalog.streams # type: ignore [reportOptionalMemberAccess, union-attr]
137+
for stream in streams
132138
]
133139
)
134140
result = run_test_job(

airbyte_cdk/utils/connector_paths.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,6 @@ def _find_in_adjacent_dirs(current_dir: Path) -> Path | None:
8686

8787
def resolve_connector_name_and_directory(
8888
connector_ref: str | Path | None = None,
89-
*,
90-
connector_directory: Path | None = None,
9189
) -> tuple[str, Path]:
9290
"""Resolve the connector name and directory.
9391
@@ -104,6 +102,7 @@ def resolve_connector_name_and_directory(
104102
FileNotFoundError: If the connector directory does not exist or cannot be found.
105103
"""
106104
connector_name: str | None = None
105+
connector_directory: Path | None = None
107106

108107
# Resolve connector_ref to connector_name or connector_directory (if provided)
109108
if connector_ref:

0 commit comments

Comments
 (0)