Skip to content

Commit 7b2134c

Browse files
authored
Fix PostCommit Python ValidatesRunner Samza / Spark jobs (#35210)
* Skip Samza and Spark runner tests * Fix formatting
1 parent 2f79185 commit 7b2134c

File tree

2 files changed

+48
-26
lines changed

2 files changed

+48
-26
lines changed

sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -827,13 +827,15 @@ def timer_callback(self, t=beam.DoFn.TimestampParam):
827827
assert_that(actual, equal_to(expected))
828828

829829
def test_pardo_et_timer_with_no_firing(self):
830-
if type(self) in [FnApiRunnerTest,
831-
FnApiRunnerTestWithGrpc,
832-
FnApiRunnerTestWithGrpcAndMultiWorkers,
833-
FnApiRunnerTestWithDisabledCaching,
834-
FnApiRunnerTestWithMultiWorkers,
835-
FnApiRunnerTestWithBundleRepeat,
836-
FnApiRunnerTestWithBundleRepeatAndMultiWorkers]:
830+
if type(self).__name__ in {'FnApiRunnerTest',
831+
'FnApiRunnerTestWithGrpc',
832+
'FnApiRunnerTestWithGrpcAndMultiWorkers',
833+
'FnApiRunnerTestWithDisabledCaching',
834+
'FnApiRunnerTestWithMultiWorkers',
835+
'FnApiRunnerTestWithBundleRepeat',
836+
'FnApiRunnerTestWithBundleRepeatAndMultiWorkers',
837+
'SamzaRunnerTest',
838+
'SparkRunnerTest'}:
837839
raise unittest.SkipTest("https://github.com/apache/beam/issues/35168")
838840

839841
# The timer will not fire. It is initially set to T + 10, but then it is
@@ -842,20 +844,32 @@ def test_pardo_et_timer_with_no_firing(self):
842844
self._run_pardo_et_timer_test(5, 10, True, True, [])
843845

844846
def test_pardo_et_timer_with_no_reset(self):
845-
if type(self) in [FnApiRunnerTest,
846-
FnApiRunnerTestWithGrpc,
847-
FnApiRunnerTestWithGrpcAndMultiWorkers,
848-
FnApiRunnerTestWithDisabledCaching,
849-
FnApiRunnerTestWithMultiWorkers,
850-
FnApiRunnerTestWithBundleRepeat,
851-
FnApiRunnerTestWithBundleRepeatAndMultiWorkers]:
847+
if type(self).__name__ in {'FnApiRunnerTest',
848+
'FnApiRunnerTestWithGrpc',
849+
'FnApiRunnerTestWithGrpcAndMultiWorkers',
850+
'FnApiRunnerTestWithDisabledCaching',
851+
'FnApiRunnerTestWithMultiWorkers',
852+
'FnApiRunnerTestWithBundleRepeat',
853+
'FnApiRunnerTestWithBundleRepeatAndMultiWorkers',
854+
'SamzaRunnerTest',
855+
'SparkRunnerTest'}:
852856
raise unittest.SkipTest("https://github.com/apache/beam/issues/35168")
853857

854858
# The timer will not fire. It is initially set to T + 10, and then it is
855859
# cleared at T + 4 and never set again (count is not reset).
856860
self._run_pardo_et_timer_test(5, 10, False, True, [])
857861

858862
def test_pardo_et_timer_with_no_reset_and_no_clear(self):
863+
if type(self).__name__ in {'FnApiRunnerTest',
864+
'FnApiRunnerTestWithGrpc',
865+
'FnApiRunnerTestWithGrpcAndMultiWorkers',
866+
'FnApiRunnerTestWithDisabledCaching',
867+
'FnApiRunnerTestWithMultiWorkers',
868+
'FnApiRunnerTestWithBundleRepeat',
869+
'FnApiRunnerTestWithBundleRepeatAndMultiWorkers',
870+
'SamzaRunnerTest',
871+
'SparkRunnerTest'}:
872+
raise unittest.SkipTest("https://github.com/apache/beam/issues/35168")
859873
# The timer will fire at T + 10. After the timer is set, it is never
860874
# cleared or set again.
861875
self._run_pardo_et_timer_test(5, 10, False, False, ["fired"])

sdks/python/apache_beam/runners/portability/portable_runner_test.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -223,22 +223,30 @@ def test_draining_sdf_with_sdf_initiated_checkpointing(self):
223223
raise unittest.SkipTest("Portable runners don't support drain yet.")
224224

225225
def test_pardo_et_timer_with_no_firing(self):
226-
if type(self) in [PortableRunnerTest,
227-
PortableRunnerTestWithSubprocesses,
228-
PortableRunnerTestWithSubprocessesAndMultiWorkers,
229-
PortableRunnerTestWithExternalEnv,
230-
PortableRunnerTestWithLocalDocker,
231-
PortableRunnerOptimizedWithoutFusion]:
226+
if type(self).__name__ in {
227+
'PortableRunnerTest',
228+
'PortableRunnerTestWithSubprocesses',
229+
'PortableRunnerTestWithSubprocessesAndMultiWorkers',
230+
'PortableRunnerTestWithExternalEnv',
231+
'PortableRunnerTestWithLocalDocker',
232+
'PortableRunnerOptimizedWithoutFusion',
233+
'SamzaRunnerTest',
234+
'SparkRunnerTest'
235+
}:
232236
raise unittest.SkipTest("https://github.com/apache/beam/issues/35168")
233237
super().test_pardo_et_timer_with_no_firing()
234238

235239
def test_pardo_et_timer_with_no_reset(self):
236-
if type(self) in [PortableRunnerTest,
237-
PortableRunnerTestWithSubprocesses,
238-
PortableRunnerTestWithSubprocessesAndMultiWorkers,
239-
PortableRunnerTestWithExternalEnv,
240-
PortableRunnerTestWithLocalDocker,
241-
PortableRunnerOptimizedWithoutFusion]:
240+
if type(self).__name__ in {
241+
'PortableRunnerTest',
242+
'PortableRunnerTestWithSubprocesses',
243+
'PortableRunnerTestWithSubprocessesAndMultiWorkers',
244+
'PortableRunnerTestWithExternalEnv',
245+
'PortableRunnerTestWithLocalDocker',
246+
'PortableRunnerOptimizedWithoutFusion',
247+
'SamzaRunnerTest',
248+
'SparkRunnerTest'
249+
}:
242250
raise unittest.SkipTest("https://github.com/apache/beam/issues/35168")
243251
super().test_pardo_et_timer_with_no_reset()
244252

0 commit comments

Comments
 (0)