Skip to content

Commit 6cd27c7

Browse files
committed
feat: working container 'read' tests
1 parent fc99ddc commit 6cd27c7

File tree

3 files changed

+305
-26
lines changed

3 files changed

+305
-26
lines changed

airbyte_cdk/test/models/scenario.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class AcceptanceTestFileTypes(BaseModel):
4747
config_path: Path | None = None
4848
config_dict: dict[str, Any] | None = None
4949

50-
id: str | None = None
50+
_id: str | None = None # Used to override the default ID generation
5151

5252
configured_catalog_path: Path | None = None
5353
timeout_seconds: int | None = None
@@ -99,16 +99,21 @@ def expected_outcome(self) -> ExpectedOutcome:
9999
return ExpectedOutcome.from_status_str(self.status)
100100

101101
@property
102-
def instance_name(self) -> str:
103-
return self.config_path.stem if self.config_path else "Unnamed Scenario"
102+
def id(self) -> str:
103+
"""Return a unique identifier for the test scenario.
104+
105+
This is used by PyTest to identify the test scenario.
106+
"""
107+
if self._id:
108+
return self._id
104109

105-
def __str__(self) -> str:
106-
if self.id:
107-
return f"'{self.id}' Test Scenario"
108110
if self.config_path:
109-
return f"'{self.config_path.name}' Test Scenario"
111+
return self.config_path.stem
110112

111-
return f"'{hash(self)}' Test Scenario"
113+
return str(hash(self))
114+
115+
def __str__(self) -> str:
116+
return f"'{self.id}' Test Scenario"
112117

113118
@contextmanager
114119
def with_temp_config_file(

airbyte_cdk/test/standard_tests/docker_base.py

Lines changed: 194 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,30 @@
66
import inspect
77
import shutil
88
import sys
9+
import tempfile
910
import warnings
11+
from dataclasses import asdict
1012
from pathlib import Path
13+
from subprocess import CompletedProcess, SubprocessError
1114

15+
import orjson
1216
import pytest
1317
import yaml
1418
from boltons.typeutils import classproperty
1519

20+
from airbyte_cdk.models import (
21+
AirbyteCatalog,
22+
ConfiguredAirbyteCatalog,
23+
ConfiguredAirbyteStream,
24+
DestinationSyncMode,
25+
)
26+
from airbyte_cdk.models.airbyte_protocol_serializers import (
27+
AirbyteCatalogSerializer,
28+
AirbyteStreamSerializer,
29+
)
1630
from airbyte_cdk.models.connector_metadata import MetadataFile
1731
from airbyte_cdk.test.models import ConnectorTestScenario
32+
from airbyte_cdk.test.utils.reading import catalog
1833
from airbyte_cdk.utils.connector_paths import (
1934
ACCEPTANCE_TEST_CONFIG,
2035
find_connector_root,
@@ -127,17 +142,23 @@ def test_docker_image_build_and_spec(
127142
no_verify=False,
128143
)
129144

130-
_ = run_docker_command(
131-
[
132-
"docker",
133-
"run",
134-
"--rm",
135-
connector_image,
136-
"spec",
137-
],
138-
check=True, # Raise an error if the command fails
139-
capture_output=False,
140-
)
145+
try:
146+
result: CompletedProcess[str] = run_docker_command(
147+
[
148+
"docker",
149+
"run",
150+
"--rm",
151+
connector_image,
152+
"spec",
153+
],
154+
check=True, # Raise an error if the command fails
155+
capture_stderr=True,
156+
capture_stdout=True,
157+
)
158+
except SubprocessError as ex:
159+
raise AssertionError(
160+
f"Failed to run `spec` command in docker image {connector_image!r}. Error: {ex!s}"
161+
) from None
141162

142163
@pytest.mark.skipif(
143164
shutil.which("docker") is None,
@@ -192,5 +213,166 @@ def test_docker_image_build_and_check(
192213
container_config_path,
193214
],
194215
check=True, # Raise an error if the command fails
195-
capture_output=False,
216+
capture_stderr=True,
217+
capture_stdout=True,
218+
)
219+
220+
@pytest.mark.skipif(
221+
shutil.which("docker") is None,
222+
reason="docker CLI not found in PATH, skipping docker image tests",
223+
)
224+
@pytest.mark.image_tests
225+
def test_docker_image_build_and_read(
226+
self,
227+
scenario: ConnectorTestScenario,
228+
connector_image_override: str | None,
229+
read_from_streams: Literal["all", "none", "default"] | list[str],
230+
read_scenarios: Literal["all", "none", "default"] | list[str],
231+
) -> None:
232+
"""Read from the connector's Docker image.
233+
234+
This test builds the connector image and runs the `read` command inside the container.
235+
236+
Note:
237+
- It is expected for docker image caches to be reused between test runs.
238+
- In the rare case that image caches need to be cleared, please clear
239+
the local docker image cache using `docker image prune -a` command.
240+
- If the --connector-image arg is provided, it will be used instead of building the image.
241+
"""
242+
if scenario.expected_outcome.expect_exception():
243+
pytest.skip("Skipping (expected to fail).")
244+
245+
if read_from_streams == "none":
246+
pytest.skip("Skipping read test (`--read-from-streams=false`).")
247+
248+
if read_scenarios == "none":
249+
pytest.skip("Skipping (`--read-scenarios=none`).")
250+
251+
default_scenario_ids = ["config", "valid_config", "default"]
252+
if read_scenarios == "all":
253+
pass
254+
elif read_scenarios == "default":
255+
if scenario.id not in default_scenario_ids:
256+
pytest.skip(
257+
f"Skipping read test for scenario '{scenario.id}' "
258+
f"(not in default scenarios list '{default_scenario_ids}')."
259+
)
260+
elif scenario.id not in read_scenarios:
261+
# pytest.skip(
262+
raise ValueError(
263+
f"Skipping read test for scenario '{scenario.id}' "
264+
f"(not in --read-scenarios={read_scenarios})."
265+
)
266+
267+
tag = "dev-latest"
268+
connector_root = self.get_connector_root_dir()
269+
connector_name = connector_root.name
270+
metadata = MetadataFile.from_file(connector_root / "metadata.yaml")
271+
connector_image: str | None = connector_image_override
272+
if not connector_image:
273+
tag = "dev-latest"
274+
connector_image = build_connector_image(
275+
connector_name=connector_name,
276+
connector_directory=connector_root,
277+
metadata=metadata,
278+
tag=tag,
279+
no_verify=False,
280+
)
281+
282+
container_config_path = "/secrets/config.json"
283+
container_catalog_path = "/secrets/catalog.json"
284+
285+
discovered_catalog_path = Path(
286+
tempfile.mktemp(prefix=f"{connector_name}-discovered-catalog-", suffix=".json")
287+
)
288+
configured_catalog_path = Path(
289+
tempfile.mktemp(prefix=f"{connector_name}-configured-catalog-", suffix=".json")
290+
)
291+
with scenario.with_temp_config_file(
292+
connector_root=connector_root,
293+
) as temp_config_file:
294+
discover_result = run_docker_command(
295+
[
296+
"docker",
297+
"run",
298+
"--rm",
299+
"-v",
300+
f"{temp_config_file}:{container_config_path}",
301+
connector_image,
302+
"discover",
303+
"--config",
304+
container_config_path,
305+
],
306+
check=True, # Raise an error if the command fails
307+
capture_stderr=True,
308+
capture_stdout=True,
309+
)
310+
try:
311+
discovered_catalog: AirbyteCatalog = AirbyteCatalogSerializer.load(
312+
orjson.loads(discover_result.stdout)["catalog"],
313+
)
314+
except Exception as ex:
315+
raise AssertionError(
316+
f"Failed to load discovered catalog from {discover_result.stdout}. "
317+
f"Error: {ex!s}"
318+
) from None
319+
if not discovered_catalog.streams:
320+
raise ValueError(
321+
f"Discovered catalog for connector '{connector_name}' is empty. "
322+
"Please check the connector's discover implementation."
323+
)
324+
325+
streams_list = [stream.name for stream in discovered_catalog.streams]
326+
if read_from_streams == "default" and metadata.data.suggestedStreams:
327+
# set `streams_list` to be the intersection of discovered and suggested streams.
328+
streams_list = list(set(streams_list) & set(metadata.data.suggestedStreams))
329+
330+
if isinstance(read_from_streams, list):
331+
# If `read_from_streams` is a list, we filter the discovered streams.
332+
streams_list = list(set(streams_list) & set(read_from_streams))
333+
334+
configured_catalog: ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog(
335+
streams=[
336+
ConfiguredAirbyteStream(
337+
stream=stream,
338+
sync_mode=stream.supported_sync_modes[0],
339+
destination_sync_mode=DestinationSyncMode.append,
340+
)
341+
for stream in discovered_catalog.streams
342+
if stream.name in streams_list
343+
]
344+
)
345+
configured_catalog_path.write_text(
346+
orjson.dumps(asdict(configured_catalog)).decode("utf-8")
347+
)
348+
read_result: CompletedProcess[str] = run_docker_command(
349+
[
350+
"docker",
351+
"run",
352+
"--rm",
353+
"-v",
354+
f"{temp_config_file}:{container_config_path}",
355+
"-v",
356+
f"{configured_catalog_path}:{container_catalog_path}",
357+
connector_image,
358+
"read",
359+
"--config",
360+
container_config_path,
361+
"--catalog",
362+
container_catalog_path,
363+
],
364+
check=False,
365+
capture_stderr=True,
366+
capture_stdout=True,
196367
)
368+
if read_result.returncode != 0:
369+
raise AssertionError(
370+
f"Failed to run `read` command in docker image {connector_image!r}. "
371+
"\n-----------------"
372+
f"EXIT CODE: {read_result.returncode}\n"
373+
"STDERR:\n"
374+
f"{read_result.stderr}\n"
375+
f"STDOUT:\n"
376+
f"{read_result.stdout}\n"
377+
"\n-----------------"
378+
) from None

airbyte_cdk/test/standard_tests/pytest_hooks.py

Lines changed: 98 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,95 @@
1313
```
1414
"""
1515

16+
from typing import Literal
17+
1618
import pytest
1719

1820

21+
@pytest.fixture
22+
def connector_image_override(request: pytest.FixtureRequest) -> str | None:
23+
"""Return the value of --connector-image, or None if not set."""
24+
return request.config.getoption("--connector-image")
25+
26+
27+
@pytest.fixture
28+
def read_from_streams(
29+
request: pytest.FixtureRequest,
30+
) -> Literal["all", "none", "default"] | list[str]:
31+
"""Specify if the test should read from streams.
32+
33+
The input can be one of the following:
34+
- [Omitted] - Default to False, meaning no streams will be read.
35+
- `--read-from-streams`: Read from all suggested streams.
36+
- `--read-from-streams=true`: Read from all suggested streams.
37+
- `--read-from-streams=suggested`: Read from all suggested streams.
38+
- `--read-from-streams=default`: Read from all suggested streams.
39+
- `--read-from-streams=all`: Read from all streams.
40+
- `--read-from-streams=stream1,stream2`: Read from the specified streams only.
41+
- `--read-from-streams=false`: Do not read from any streams.
42+
- `--read-from-streams=none`: Do not read from any streams.
43+
"""
44+
input_val: str | bool | None = request.config.getoption(
45+
"--read-from-streams",
46+
default="suggested", # type: ignore
47+
) # type: ignore
48+
49+
if isinstance(input_val, str):
50+
if input_val.lower() == "false":
51+
return False
52+
if input_val.lower() in ["true", "suggested", "default"]:
53+
# Default to 'default' (suggested) streams if the input is 'true', 'suggested', or
54+
# 'default'.
55+
# This is the default behavior if the option is not set.
56+
return "default"
57+
if input_val.lower() == "all":
58+
# This will sometimes fail if the account doesn't have permissions
59+
# to premium or restricted stream data.
60+
return "all"
61+
62+
# If the input is a comma-separated list, split it into a list.
63+
# This will return a one-element list if the input is a single stream name.
64+
return input_val.split(",")
65+
66+
# Else, probably a bool; return it as is.
67+
return input_val or False
68+
69+
70+
@pytest.fixture
71+
def read_scenarios(
72+
request: pytest.FixtureRequest,
73+
) -> list[str] | Literal["all", "default"]:
74+
"""Return the value of `--read-scenarios`.
75+
76+
This argument is ignored if `--read-from-streams` is False or not set.
77+
78+
The input can be one of the following:
79+
- [Omitted] - Default to 'config.json', meaning the default scenario will be read.
80+
- `--read-scenarios=all`: Read all scenarios.
81+
- `--read-scenarios=none`: Read no scenarios. (Overrides `--read-from-streams`, if set.)
82+
- `--read-scenarios=scenario1,scenario2`: Read the specified scenarios only.
83+
84+
"""
85+
input_val = request.config.getoption("--read-scenarios", default="default")
86+
87+
if input_val.lower() == "default":
88+
# Default config scenario is always 'config.json'.
89+
return "default"
90+
91+
if input_val.lower() == "none":
92+
# Default config scenario is always 'config.json'.
93+
return []
94+
95+
return (
96+
[
97+
scenario_name.strip().lower().removesuffix(".json")
98+
for scenario_name in input_val.split(",")
99+
]
100+
if input_val
101+
else []
102+
)
103+
104+
19105
def pytest_addoption(parser: pytest.Parser) -> None:
20106
"""Add --connector-image to pytest's CLI."""
21107
parser.addoption(
@@ -24,12 +110,18 @@ def pytest_addoption(parser: pytest.Parser) -> None:
24110
default=None,
25111
help="Use this pre-built connector Docker image instead of building one.",
26112
)
27-
28-
29-
@pytest.fixture
30-
def connector_image_override(request: pytest.FixtureRequest) -> str | None:
31-
"""Return the value of --connector-image, or None if not set."""
32-
return request.config.getoption("--connector-image")
113+
parser.addoption(
114+
"--read-from-streams",
115+
action="store",
116+
default=None,
117+
help=read_from_streams.__doc__,
118+
)
119+
parser.addoption(
120+
"--read-scenarios",
121+
action="store",
122+
default="default",
123+
help=read_scenarios.__doc__,
124+
)
33125

34126

35127
def pytest_generate_tests(metafunc: pytest.Metafunc) -> None:

0 commit comments

Comments
 (0)