forked from NSLS2/cms-workflows
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathend_of_run_workflow.py
More file actions
35 lines (26 loc) · 983 Bytes
/
end_of_run_workflow.py
File metadata and controls
35 lines (26 loc) · 983 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from prefect import flow, get_run_logger, task
from prefect.task_runners import ConcurrentTaskRunner
#from analysis import run_analysis
from data_validation import read_all_streams
from linker import create_symlinks
@task
def log_completion():
logger = get_run_logger()
logger.info("Complete")
@flow(task_runner=ConcurrentTaskRunner())
def end_of_run_workflow(stop_doc):
logger = get_run_logger()
uid = stop_doc["run_start"]
# Launch validation, analysis, and linker tasks concurrently
linker_task = create_symlinks.submit(uid)
logger.info("Launched linker task")
validation_task = read_all_streams.submit("cms", uid)
logger.info("Launched validation task")
# analysis_task = run_analysis(raw_ref=uid)
# logger.info("Launched analysis task")
# Wait for all tasks to comple
logger.info("Waiting for tasks to complete")
linker_task.result()
validation_task.result()
# analysis_task.result()
log_completion()