Skip to content

Commit abc0652

Browse files
authored
Explicitly use Beam Direct Runner in tests since default is now Prism Runner (#815)
1 parent c3c00a7 commit abc0652

File tree

2 files changed

+12
-3
lines changed

2 files changed

+12
-3
lines changed

cubed/runtime/executors/beam.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,18 @@ def expand(self, pcoll):
8282
class BeamExecutor(DagExecutor):
8383
"""An execution engine that uses Apache Beam."""
8484

85+
def __init__(self, **kwargs):
86+
self.kwargs = kwargs
87+
8588
@property
8689
def name(self) -> str:
8790
return "beam"
8891

8992
def execute_dag(self, dag, callbacks=None, spec=None, compute_id=None, **kwargs):
93+
merged_kwargs = {**self.kwargs, **kwargs}
94+
9095
dag = dag.copy()
91-
pipeline = beam.Pipeline(**kwargs)
96+
pipeline = beam.Pipeline(**merged_kwargs)
9297

9398
for name, node in visit_nodes(dag):
9499
cubed_pipeline = node["pipeline"]

cubed/tests/utils.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,12 @@
3535
MAIN_EXECUTORS.append(create_executor("processes"))
3636

3737
try:
38-
ALL_EXECUTORS.append(create_executor("beam"))
39-
MAIN_EXECUTORS.append(create_executor("beam"))
38+
ALL_EXECUTORS.append(
39+
create_executor("beam", executor_options=dict(runner="FnApiRunner"))
40+
)
41+
MAIN_EXECUTORS.append(
42+
create_executor("beam", executor_options=dict(runner="FnApiRunner"))
43+
)
4044
except ImportError:
4145
pass
4246

0 commit comments

Comments
 (0)