diff --git a/wfcommons/wfbench/bench.py b/wfcommons/wfbench/bench.py index 2221998d..a656b6e1 100644 --- a/wfcommons/wfbench/bench.py +++ b/wfcommons/wfbench/bench.py @@ -127,8 +127,8 @@ def create_benchmark_from_synthetic_workflow( if cpu_work is None: cpu_work = {} for task in self.workflow.tasks.values(): - if task.category not in cpu_work or task.runtime > cpu_work[task.category]: - cpu_work[task.category] = task.runtime + if task.name not in cpu_work or task.runtime > cpu_work[task.name]: + cpu_work[task.name] = task.runtime for key in cpu_work.keys(): cpu_work[key] *= 1000 @@ -136,25 +136,25 @@ def create_benchmark_from_synthetic_workflow( task_max_runtimes = {} for task in self.workflow.tasks.values(): - if task.category not in task_max_runtimes or task.runtime > task_max_runtimes[task.category]: - task_max_runtimes[task.category] = task.runtime + if task.name not in task_max_runtimes or task.runtime > task_max_runtimes[task.name]: + task_max_runtimes[task.name] = task.runtime max_runtime = max(runtime for runtime in task_max_runtimes.values()) for task in self.workflow.tasks.values(): runtime_factor = task.runtime / max_runtime - task_runtime_factor = task.runtime / task_max_runtimes[task.category] + task_runtime_factor = task.runtime / task_max_runtimes[task.name] # scale argument parameters to achieve a runtime distribution - task_percent_cpu = percent_cpu[task.category] * task_runtime_factor if isinstance(percent_cpu, dict) else percent_cpu * runtime_factor + task_percent_cpu = percent_cpu[task.name] * task_runtime_factor if isinstance(percent_cpu, dict) else percent_cpu * runtime_factor task_cores = int(10 * task_percent_cpu) # set number of cores to cpu threads in wfbench task_percent_cpu = max(0.1, task_percent_cpu) # set minimum to 0.1 which is equivalent to 1 thread in wfbench task_percent_cpu = round(task_percent_cpu, 2) if cpu_work is not None: - task_cpu_work = cpu_work[task.category] * task_runtime_factor if isinstance(cpu_work, dict) else cpu_work * runtime_factor + task_cpu_work = cpu_work[task.name] * task_runtime_factor if isinstance(cpu_work, dict) else cpu_work * runtime_factor task_cpu_work = int(task_cpu_work) else: task_cpu_work = None if gpu_work is not None: - task_gpu_work = gpu_work[task.category] * task_runtime_factor if isinstance(gpu_work, dict) else gpu_work * runtime_factor + task_gpu_work = gpu_work[task.name] * task_runtime_factor if isinstance(gpu_work, dict) else gpu_work * runtime_factor task_gpu_work = int(task_gpu_work) else: task_gpu_work = None @@ -416,9 +416,9 @@ def _generate_task_cpu_params(self, if not cpu_work: return [] - _percent_cpu = percent_cpu[task.category] if isinstance( + _percent_cpu = percent_cpu[task.name] if isinstance( percent_cpu, dict) else percent_cpu - _cpu_work = cpu_work[task.category] if isinstance( + _cpu_work = cpu_work[task.name] if isinstance( cpu_work, dict) else cpu_work params = [f"--percent-cpu {_percent_cpu}", f"--cpu-work {int(_cpu_work)}"] @@ -434,7 +434,7 @@ def _generate_task_gpu_params(self, task: Task, gpu_work: Union[int, Dict[str, i """ if not gpu_work: return [] - _gpu_work = gpu_work[task.category] if isinstance( + _gpu_work = gpu_work[task.name] if isinstance( gpu_work, dict) else gpu_work return [f"--gpu-work {_gpu_work}"] @@ -497,12 +497,12 @@ def _output_files(self, data: Dict[str, str]) -> Dict[str, Dict[str, int]]: for task in self.workflow.tasks.values(): output_files.setdefault(task.task_id, {}) if not self.workflow.tasks_children[task.task_id]: - output_files[task.task_id][task.task_id] = int(data[task.category]) + output_files[task.task_id][task.task_id] = int(data[task.name]) else: for child_name in self.workflow.tasks_children[task.task_id]: child = self.workflow.tasks[child_name] output_files[task.task_id][child.task_id] = int( - data[child.category]) + data[child.name]) return output_files @@ -561,7 +561,7 @@ def _add_input_files(self, output_files: Dict[str, Dict[str, str]], data: Union[ if not self.workflow.tasks_parents[task.task_id]: task.input_files.append( File(f"{task.task_id}_input.txt", - data[task.category] if isinstance( + data[task.name] if isinstance( data, Dict) else data, FileLink.INPUT)) inputs.append(f'{task.task_id}_input.txt') @@ -591,7 +591,7 @@ def _generate_data_for_root_nodes(self, save_dir: pathlib.Path, data: Union[int, """ for task in self.workflow.tasks.values(): if not self.workflow.tasks_parents[task.task_id]: - file_size = data[task.category] if isinstance( + file_size = data[task.name] if isinstance( data, Dict) else data file = save_dir.joinpath(f"{task.task_id}_input.txt") if not file.is_file():