|
| 1 | +import numpy as np |
| 2 | +from collections import OrderedDict |
| 3 | +import pickle |
| 4 | +from pathlib import Path |
| 5 | +import subprocess |
| 6 | +import matplotlib.pyplot as plt |
| 7 | +import matplotlib.image as mpimg |
| 8 | + |
| 9 | +from .job import Job |
| 10 | +from .config import Config |
| 11 | + |
| 12 | + |
| 13 | +class Campaign: |
| 14 | + def __init__(self, small=False, resume=True, config=None, name="campaign", |
| 15 | + pipeline=None, job_dependencies=None, verbose=True): |
| 16 | + self.small = small |
| 17 | + self.resume = resume |
| 18 | + self.jobs = OrderedDict() |
| 19 | + self.verbose = verbose |
| 20 | + self.name = name |
| 21 | + self._pipeline = pipeline |
| 22 | + |
| 23 | + if job_dependencies is None: |
| 24 | + self.job_dependencies = {} |
| 25 | + else: |
| 26 | + self.job_dependencies = job_dependencies |
| 27 | + |
| 28 | + self.config = config |
| 29 | + |
| 30 | + output_path = Path() |
| 31 | + if "paths" in self.config: |
| 32 | + if "output_root" in self.config["paths"]: |
| 33 | + output_path = Path(self.config["paths"]["output_root"]) |
| 34 | + self.file_name = (output_path / self.name).with_suffix(".cpg") |
| 35 | + |
| 36 | + @property |
| 37 | + def config(self): |
| 38 | + return self._config |
| 39 | + |
| 40 | + @config.setter |
| 41 | + def config(self, config): |
| 42 | + |
| 43 | + if isinstance(config, str): |
| 44 | + config_paths = [Path(config)] |
| 45 | + load_default = False |
| 46 | + elif isinstance(config, Path): |
| 47 | + config_paths = [config] |
| 48 | + load_default = False |
| 49 | + elif isinstance(config, (Config, dict)) or config is None: |
| 50 | + config_paths = [] |
| 51 | + load_default = True |
| 52 | + else: |
| 53 | + raise TypeError("Received a config object of an unrecognized type ({}).".format(type(config))) |
| 54 | + |
| 55 | + self._config = Config.get_config(config_paths=config_paths, load_default=load_default) |
| 56 | + |
| 57 | + if isinstance(config, (Config, dict)): |
| 58 | + self._config.update(config) |
| 59 | + |
| 60 | + if self.pipeline is not None: |
| 61 | + self.pipeline.config = self._config |
| 62 | + |
| 63 | + assert(self._config is not None) |
| 64 | + |
| 65 | + |
| 66 | + @property |
| 67 | + def pipeline(self): |
| 68 | + return self._pipeline |
| 69 | + |
| 70 | + @pipeline.setter |
| 71 | + def pipeline(self, pipeline): |
| 72 | + self._pipeline = pipeline |
| 73 | + self._pipeline.config = self.config |
| 74 | + |
| 75 | + def set_workflow(self, pipeline=None, job_dependencies=None, verbose=None): |
| 76 | + if pipeline is None: |
| 77 | + return |
| 78 | + |
| 79 | + self.pipeline = pipeline |
| 80 | + self.pipeline.config = self.config |
| 81 | + |
| 82 | + if verbose is None: |
| 83 | + verbose = self.verbose |
| 84 | + |
| 85 | + if job_dependencies is not None: |
| 86 | + self.job_dependencies = job_dependencies |
| 87 | + |
| 88 | + for job_name, processing_step in self.pipeline.items(): |
| 89 | + |
| 90 | + if job_name in self.job_dependencies: |
| 91 | + for job_dependency in self.job_dependencies[job_name]: |
| 92 | + if job_dependency not in self.jobs: |
| 93 | + raise ValueError(job_name + " depends on " + job_dependency + |
| 94 | + ", but no such job has been ran.") |
| 95 | + processing_step.dep_after = [self.jobs[job_dependency].job_ids |
| 96 | + for job_dependency in self.job_dependencies[job_name]] |
| 97 | + |
| 98 | + self.jobs[job_name] = Job(job_name=job_name, small=self.small, |
| 99 | + resume=self.resume, config=self.config, |
| 100 | + verbose=verbose, processing_step=processing_step) |
| 101 | + |
| 102 | + def run(self, include=None, exclude=None, test=False, verbose=None): |
| 103 | + if include is not None: |
| 104 | + exclude = [job_name for job_name in self.step_names if job_name not in include] |
| 105 | + elif exclude is None: |
| 106 | + exclude = [] |
| 107 | + |
| 108 | + for job_name in self.jobs: |
| 109 | + if job_name not in exclude: |
| 110 | + self.jobs[job_name].run(verbose=verbose, test=test) |
| 111 | + |
| 112 | + def cancel(self): |
| 113 | + for job in self.jobs.values(): |
| 114 | + job.cancel() |
| 115 | + |
| 116 | + def get_status(self): |
| 117 | + return {job.name: job.get_status() for job in self.jobs.values()} |
| 118 | + |
| 119 | + def print_status(self): |
| 120 | + for job in self.jobs.values(): |
| 121 | + print("=" * int(np.ceil((100 - len(job.name) - 1) / 2.0)) + " " + job.name + " " + |
| 122 | + "=" * int(np.floor((100 - len(job.name) - 1) / 2.0))) |
| 123 | + job.print_status() |
| 124 | + print(" ") |
| 125 | + |
| 126 | + def print_log(self, job_id, tail=None, head=None): |
| 127 | + for job in self.jobs.values(): |
| 128 | + if job_id in job.job_ids.values(): |
| 129 | + keys = list(job.job_ids.keys()) |
| 130 | + values = list(job.job_ids.values()) |
| 131 | + with open(job.file_names_log[keys[values.index(job_id)]], "r") as log_file: |
| 132 | + log_text = log_file.read() |
| 133 | + if head is not None: |
| 134 | + print("\n".join(log_text.split("\n")[:head])) |
| 135 | + elif tail is not None: |
| 136 | + print("\n".join(log_text.split("\n")[-tail:])) |
| 137 | + else: |
| 138 | + print(log_text) |
| 139 | + return |
| 140 | + |
| 141 | + def print_script(self, job_id): |
| 142 | + for job in self.jobs.values(): |
| 143 | + if job_id in job.job_ids.values(): |
| 144 | + keys = list(job.job_ids.keys()) |
| 145 | + values = list(job.job_ids.values()) |
| 146 | + with open(job.file_names_slurm[keys[values.index(job_id)]], "r") as slurm_file: |
| 147 | + print(slurm_file.read()) |
| 148 | + return |
| 149 | + |
| 150 | + def relaunch_job(self, job_id, dep_sup=None): |
| 151 | + for job in self.jobs.values(): |
| 152 | + for job_key, id_ in job.job_ids.items(): |
| 153 | + if job_id == id_: |
| 154 | + new_id = job.run_a_job(job_key, dep_sup=dep_sup) |
| 155 | + if new_id is not None: |
| 156 | + print("Job {} launched.".format(new_id)) |
| 157 | + |
| 158 | + def print_job_id_info(self, job_id): |
| 159 | + for job in self.jobs.values(): |
| 160 | + for job_key, id_ in job.job_ids.items(): |
| 161 | + if job_id == id_: |
| 162 | + print("#"*45 + " JOB INFO " + "#"*45) |
| 163 | + print("job name:", job.specific_names[job_key]) |
| 164 | + print("job key:", job_key) |
| 165 | + print("job id:", job_id) |
| 166 | + print("job depends on ids:", job.get_dependency_ids(job_key)) |
| 167 | + print("#"*100) |
| 168 | + |
| 169 | + def _check_file_name_(self, file_name): |
| 170 | + if file_name is not None: |
| 171 | + if isinstance(file_name, str): |
| 172 | + file_name = Path(file_name) |
| 173 | + self.file_name = file_name |
| 174 | + |
| 175 | + def save(self, file_name=None): |
| 176 | + self._check_file_name_(file_name) |
| 177 | + |
| 178 | + with self.file_name.open("wb") as f: |
| 179 | + pickle.dump(self, f) |
| 180 | + print("Saving campaign as {}.".format(self.file_name)) |
| 181 | + |
| 182 | + def load(self, file_name=None): |
| 183 | + self._check_file_name_(file_name) |
| 184 | + with self.file_name.open("rb") as f: |
| 185 | + self = pickle.load(f) |
| 186 | + |
| 187 | + return self |
| 188 | + |
| 189 | + def load_or_run(self, rerun=False, file_name=None, **run_kwargs): |
| 190 | + """ |
| 191 | + This commands load the campaign if it already exists (i.e., if its the file pointed to by |
| 192 | + file_name or self.file_name exists) and if rerun is False. Else, it run it and save it. |
| 193 | + :param rerun: Specify whether the campaign should be rerun if it already exists. |
| 194 | + :param file_name: Path where to save the campaign to or load the campaign from. |
| 195 | + :param run_kwargs: Arguments to be passed to the self.run() method. |
| 196 | + :return: None |
| 197 | + """ |
| 198 | + |
| 199 | + if rerun: |
| 200 | + self.run(**run_kwargs) |
| 201 | + self.save(file_name) |
| 202 | + return |
| 203 | + |
| 204 | + try: |
| 205 | + self.load(file_name) |
| 206 | + except IOError: |
| 207 | + self.run(**run_kwargs) |
| 208 | + self.save(file_name) |
| 209 | + |
| 210 | + def show_workflow(self): |
| 211 | + block_test = "blockdiag {\n" |
| 212 | + |
| 213 | + for children in list(self.job_dependencies.keys()): |
| 214 | + for parent in self.job_dependencies[children]: |
| 215 | + block_test += " {} -> {};\n".format(parent, children) |
| 216 | + block_test += "}\n" |
| 217 | + |
| 218 | + with open("diagram", "w") as f: |
| 219 | + f.write(block_test) |
| 220 | + |
| 221 | + subprocess.check_output(["blockdiag", "--size=2000x2000", "diagram"]) |
| 222 | + |
| 223 | + fig, axes = plt.subplots(1, 1, figsize=(15, 15)) |
| 224 | + img = mpimg.imread('diagram.png') |
| 225 | + axes.imshow(img) |
| 226 | + plt.axis('off') |
0 commit comments