@@ -84,6 +84,38 @@ def acceptance_test_config(cls) -> dict[str, object]:
8484 )
8585 return tests_config
8686
87+ @staticmethod
88+ def _dedup_scenarios (scenarios : list [ConnectorTestScenario ]) -> list [ConnectorTestScenario ]:
89+ """
90+ For FAST tests, we treat each config as a separate test scenario to run against, whereas CATs defined
91+ a series of more granular scenarios specifying a config_path and empty_streams among other things.
92+
93+ This method deduplicates the CATs scenarios based on their config_path. In doing so, we choose to
94+ take the union of any defined empty_streams, to have high confidence that runnning a read with the
95+ config will not error on the lack of data in the empty streams or lack of permissions to read them.
96+
97+ """
98+ deduped_scenarios : list [ConnectorTestScenario ] = []
99+
100+ for scenario in scenarios :
101+ for existing_scenario in deduped_scenarios :
102+ if scenario .config_path == existing_scenario .config_path :
103+ # If a scenario with the same config_path already exists, we merge the empty streams.
104+ # scenarios are immutable, so we create a new one.
105+ all_empty_streams = (existing_scenario .empty_streams or []) + (
106+ scenario .empty_streams or []
107+ )
108+ merged_scenario = existing_scenario .model_copy (
109+ update = {"empty_streams" : list (set (all_empty_streams ))}
110+ )
111+ deduped_scenarios .remove (existing_scenario )
112+ deduped_scenarios .append (merged_scenario )
113+ break
114+ else :
115+ # If a scenario does not exist with the config, add the new scenario to the list.
116+ deduped_scenarios .append (scenario )
117+ return deduped_scenarios
118+
87119 @classmethod
88120 def get_scenarios (
89121 cls ,
@@ -105,6 +137,7 @@ def get_scenarios(
105137 return []
106138
107139 test_scenarios : list [ConnectorTestScenario ] = []
140+ # we look in the basic_read section to find any empty streams
108141 for category in ["spec" , "connection" , "basic_read" ]:
109142 if (
110143 category not in all_tests_config ["acceptance_tests" ]
@@ -125,25 +158,7 @@ def get_scenarios(
125158
126159 test_scenarios .append (scenario )
127160
128- # Remove duplicate scenarios based on config_path.
129- deduped_test_scenarios : list [ConnectorTestScenario ] = []
130- for scenario in test_scenarios :
131- for existing_scenario in deduped_test_scenarios :
132- if scenario .config_path == existing_scenario .config_path :
133- # If a scenario with the same config_path already exists, we merge the empty streams.
134- # scenarios are immutable, so we create a new one.
135- all_empty_streams = (existing_scenario .empty_streams or []) + (
136- scenario .empty_streams or []
137- )
138- new_scenario = existing_scenario .model_copy (
139- update = {"empty_streams" : all_empty_streams }
140- )
141- deduped_test_scenarios .remove (existing_scenario )
142- deduped_test_scenarios .append (new_scenario )
143- break
144- else :
145- # If a scenario does not exist with the config, add the new scenario to the list.
146- deduped_test_scenarios .append (scenario )
161+ deduped_test_scenarios = cls ._dedup_scenarios (test_scenarios )
147162
148163 return deduped_test_scenarios
149164
@@ -348,12 +363,6 @@ def test_docker_image_build_and_read(
348363 # If `read_from_streams` is a list, we filter the discovered streams.
349364 streams_list = list (set (streams_list ) & set (read_from_streams ))
350365
351- if scenario .empty_streams :
352- # If there are empty streams, we remove them from the list of streams to read.
353- streams_list = list (
354- set (streams_list ) - set (stream .name for stream in scenario .empty_streams )
355- )
356-
357366 if scenario .empty_streams :
358367 # Filter out streams marked as empty in the scenario.
359368 empty_stream_names = [stream .name for stream in scenario .empty_streams ]
0 commit comments