diff --git a/cubed/runtime/executors/beam.py b/cubed/runtime/executors/beam.py index 116dacda..e6861e45 100644 --- a/cubed/runtime/executors/beam.py +++ b/cubed/runtime/executors/beam.py @@ -82,13 +82,18 @@ def expand(self, pcoll): class BeamExecutor(DagExecutor): """An execution engine that uses Apache Beam.""" + def __init__(self, **kwargs): + self.kwargs = kwargs + @property def name(self) -> str: return "beam" def execute_dag(self, dag, callbacks=None, spec=None, compute_id=None, **kwargs): + merged_kwargs = {**self.kwargs, **kwargs} + dag = dag.copy() - pipeline = beam.Pipeline(**kwargs) + pipeline = beam.Pipeline(**merged_kwargs) for name, node in visit_nodes(dag): cubed_pipeline = node["pipeline"] diff --git a/cubed/tests/utils.py b/cubed/tests/utils.py index 73c87e7f..91d35634 100644 --- a/cubed/tests/utils.py +++ b/cubed/tests/utils.py @@ -35,8 +35,12 @@ MAIN_EXECUTORS.append(create_executor("processes")) try: - ALL_EXECUTORS.append(create_executor("beam")) - MAIN_EXECUTORS.append(create_executor("beam")) + ALL_EXECUTORS.append( + create_executor("beam", executor_options=dict(runner="FnApiRunner")) + ) + MAIN_EXECUTORS.append( + create_executor("beam", executor_options=dict(runner="FnApiRunner")) + ) except ImportError: pass