Skip to content

Commit 45e9151

Browse files
committed
first draft of fetching stream bypass logic
1 parent c0f2851 commit 45e9151

File tree

1 file changed

+78
-15
lines changed

1 file changed

+78
-15
lines changed

airbyte_cdk/test/standard_tests/docker_base.py

Lines changed: 78 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,75 @@ def is_destination_connector(cls) -> bool:
6464
return cast(str, cls.connector_name).startswith("destination-")
6565

6666
@classproperty
67-
def acceptance_test_config_path(cls) -> Path:
68-
"""Get the path to the acceptance test config file."""
69-
result = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG
70-
if result.exists():
71-
return result
67+
def acceptance_test_config(cls) -> dict[str, object]:
68+
"""Get the contents of acceptance test config file.
69+
70+
Also perform some basic validation that the file has the expected structure.
71+
"""
72+
acceptance_test_config_path = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG
73+
if not acceptance_test_config_path.exists():
74+
raise FileNotFoundError(f"Acceptance test config file not found at: {str(acceptance_test_config_path)}")
75+
76+
tests_config = yaml.safe_load(acceptance_test_config_path.read_text())
7277

73-
raise FileNotFoundError(f"Acceptance test config file not found at: {str(result)}")
78+
if "acceptance_tests" not in tests_config:
79+
raise ValueError(
80+
f"Acceptance tests config not found in {acceptance_test_config_path}."
81+
f" Found only: {str(tests_config)}."
82+
)
83+
return tests_config
84+
85+
86+
@classmethod
87+
def _get_empty_streams(cls) -> list[str]:
88+
"""Parse the acceptance test config file and return a list of stream names that are empty.
89+
90+
In reality, the "empty" designation could indicate the stream has no data or that reading from
91+
it could result in an error (e.g. our sandbox account doesn't not have access to that stream).
92+
93+
This method is used to tell the test suite to skip reading from those streams.
94+
"""
95+
try:
96+
all_tests_config = cls.acceptance_test_config
97+
except FileNotFoundError as e:
98+
# Destinations sometimes do not have an acceptance tests file.
99+
warnings.warn(
100+
f"Acceptance test config file not found: {e!s}. No streams will be skipped.",
101+
category=UserWarning,
102+
stacklevel=1,
103+
)
104+
return []
105+
106+
if "basic_read" not in all_tests_config["acceptance_tests"]:
107+
warnings.warn(
108+
"No 'basic_read' section found in acceptance test config. No streams will be skipped.",
109+
category=UserWarning,
110+
stacklevel=1,
111+
)
112+
return []
113+
basic_read_block = all_tests_config["acceptance_tests"]["basic_read"]
114+
115+
# there is some inconsistency in the acceptance test config file
116+
# sometimes tests are defined under "tests" key, sometimes directly under the "basic_read" key.
117+
# We will handle both cases.
118+
basic_read_scenarios = basic_read_block.get("tests", basic_read_block)
119+
if not isinstance(basic_read_scenarios, list) or len(basic_read_scenarios) == 0:
120+
warnings.warn(
121+
"No 'tests' key found in 'basic_read' section of acceptance test config. "
122+
"No streams will be skipped.",
123+
category=UserWarning,
124+
stacklevel=1,
125+
)
126+
return []
127+
128+
empty_streams: list[str] = []
129+
# Iterate through the scenarios and collect empty streams.
130+
for scenario in basic_read_scenarios:
131+
if "empty_streams" in scenario:
132+
# If the scenario has an "empty_streams" key, return its value.
133+
empty_streams.extend(scenario["empty_streams"]["name"])
134+
135+
return empty_streams
74136

75137
@classmethod
76138
def get_scenarios(
@@ -83,7 +145,7 @@ def get_scenarios(
83145
"""
84146
categories = ["connection", "spec"]
85147
try:
86-
cls.acceptance_test_config_path
148+
all_tests_config = cls.acceptance_test_config
87149
except FileNotFoundError as e:
88150
# Destinations sometimes do not have an acceptance tests file.
89151
warnings.warn(
@@ -93,13 +155,6 @@ def get_scenarios(
93155
)
94156
return []
95157

96-
all_tests_config = yaml.safe_load(cls.acceptance_test_config_path.read_text())
97-
if "acceptance_tests" not in all_tests_config:
98-
raise ValueError(
99-
f"Acceptance tests config not found in {cls.acceptance_test_config_path}."
100-
f" Found only: {str(all_tests_config)}."
101-
)
102-
103158
test_scenarios: list[ConnectorTestScenario] = []
104159
for category in categories:
105160
if (
@@ -126,7 +181,7 @@ def get_scenarios(
126181
continue
127182

128183
test_scenarios.append(scenario)
129-
184+
#import pdb; pdb.set_trace() # noqa: T201
130185
return test_scenarios
131186

132187
@pytest.mark.skipif(
@@ -330,6 +385,14 @@ def test_docker_image_build_and_read(
330385
# If `read_from_streams` is a list, we filter the discovered streams.
331386
streams_list = list(set(streams_list) & set(read_from_streams))
332387

388+
empty_streams = self._get_empty_streams()
389+
if empty_streams:
390+
# If there are empty streams, we remove them from the list of streams to read.
391+
streams_list = list(set(streams_list) - set(empty_streams))
392+
393+
print("YOYOYOYOYO")
394+
import pdb; pdb.set_trace() # noqa: T201
395+
333396
configured_catalog: ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog(
334397
streams=[
335398
ConfiguredAirbyteStream(

0 commit comments

Comments
 (0)