Skip to content

Commit 1fae069

Browse files
committed
allow serial execution without scheduler
1 parent 8671ec9 commit 1fae069

File tree

2 files changed

+15
-5
lines changed

2 files changed

+15
-5
lines changed

cems/src/main/python/monitor.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from pmonitor import PMonitor
22

33
class Monitor:
4-
def __init__(self, preconditions, usecase, hosts, types, log_dir, simulation):
4+
def __init__(self, preconditions, usecase, hosts, types, log_dir, simulation, synchronous=False):
55
"""
66
77
:type preconditions: list
@@ -10,8 +10,12 @@ def __init__(self, preconditions, usecase, hosts, types, log_dir, simulation):
1010
:type calls: list
1111
:type log_dir: str
1212
:type simulation: bool
13+
:type synchronous: bool
1314
"""
14-
self.pm = PMonitor(preconditions, usecase, hosts, types, logdir=log_dir, simulation=simulation, polling="job_status_callback.sh")
15+
if synchronous:
16+
self.pm = PMonitor(preconditions, usecase, hosts, types, logdir=log_dir, simulation=simulation)
17+
else:
18+
self.pm = PMonitor(preconditions, usecase, hosts, types, logdir=log_dir, simulation=simulation, polling="job_status_callback.sh")
1519

1620
def execute(self, job):
1721
"""

cems/src/main/python/workflow.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -366,16 +366,22 @@ def run_test_job(self, hosts, simulation=False, logdir='trace'):
366366

367367
monitor.wait_for_completion()
368368

369-
def run_matchup(self, hosts, num_parallel_tasks, simulation=False, logdir='trace'):
369+
def run_matchup(self, hosts, num_parallel_tasks, simulation=False, logdir='trace', synchronous=False):
370370
"""
371371
372372
:param hosts: list
373373
:param num_parallel_tasks: int
374374
:param simulation: bool
375375
:param logdir: str
376+
:param synchronous: bool
376377
:return:
377378
"""
378-
monitor = self._get_monitor(hosts, [('matchup_start.sh', num_parallel_tasks)], logdir, simulation)
379+
if synchronous:
380+
runs_script = 'matchup_run.sh'
381+
else:
382+
runs_script = 'matchup_start.sh'
383+
384+
monitor = self._get_monitor(hosts, [(runs_script, num_parallel_tasks)], logdir, simulation)
379385

380386
sensors = self._get_sensor_pairs()
381387
for sensor_pair in sensors:
@@ -393,7 +399,7 @@ def run_matchup(self, hosts, num_parallel_tasks, simulation=False, logdir='trace
393399
pre_condition = 'ingest-' + primary_name + '-' + start_string + '-' + end_string
394400
post_condition = 'matchup-' + name + '-' + start_string + '-' + end_string + '-' + self.usecase_config
395401

396-
job = Job(job_name, 'matchup_start.sh', [pre_condition], [post_condition],
402+
job = Job(job_name, runs_script, [pre_condition], [post_condition],
397403
[start_string, end_string, self._get_config_dir(), self.usecase_config])
398404
monitor.execute(job)
399405

0 commit comments

Comments
 (0)