Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions wfcommons/wfbench/bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,34 +127,34 @@ def create_benchmark_from_synthetic_workflow(
if cpu_work is None:
cpu_work = {}
for task in self.workflow.tasks.values():
if task.category not in cpu_work or task.runtime > cpu_work[task.category]:
cpu_work[task.category] = task.runtime
if task.name not in cpu_work or task.runtime > cpu_work[task.name]:
cpu_work[task.name] = task.runtime
for key in cpu_work.keys():
cpu_work[key] *= 1000

cores, lock = self._creating_lock_files(lock_files_folder)

task_max_runtimes = {}
for task in self.workflow.tasks.values():
if task.category not in task_max_runtimes or task.runtime > task_max_runtimes[task.category]:
task_max_runtimes[task.category] = task.runtime
if task.name not in task_max_runtimes or task.runtime > task_max_runtimes[task.name]:
task_max_runtimes[task.name] = task.runtime
max_runtime = max(runtime for runtime in task_max_runtimes.values())

for task in self.workflow.tasks.values():
runtime_factor = task.runtime / max_runtime
task_runtime_factor = task.runtime / task_max_runtimes[task.category]
task_runtime_factor = task.runtime / task_max_runtimes[task.name]
# scale argument parameters to achieve a runtime distribution
task_percent_cpu = percent_cpu[task.category] * task_runtime_factor if isinstance(percent_cpu, dict) else percent_cpu * runtime_factor
task_percent_cpu = percent_cpu[task.name] * task_runtime_factor if isinstance(percent_cpu, dict) else percent_cpu * runtime_factor
task_cores = int(10 * task_percent_cpu) # set number of cores to cpu threads in wfbench
task_percent_cpu = max(0.1, task_percent_cpu) # set minimum to 0.1 which is equivalent to 1 thread in wfbench
task_percent_cpu = round(task_percent_cpu, 2)
if cpu_work is not None:
task_cpu_work = cpu_work[task.category] * task_runtime_factor if isinstance(cpu_work, dict) else cpu_work * runtime_factor
task_cpu_work = cpu_work[task.name] * task_runtime_factor if isinstance(cpu_work, dict) else cpu_work * runtime_factor
task_cpu_work = int(task_cpu_work)
else:
task_cpu_work = None
if gpu_work is not None:
task_gpu_work = gpu_work[task.category] * task_runtime_factor if isinstance(gpu_work, dict) else gpu_work * runtime_factor
task_gpu_work = gpu_work[task.name] * task_runtime_factor if isinstance(gpu_work, dict) else gpu_work * runtime_factor
task_gpu_work = int(task_gpu_work)
else:
task_gpu_work = None
Expand Down Expand Up @@ -416,9 +416,9 @@ def _generate_task_cpu_params(self,
if not cpu_work:
return []

_percent_cpu = percent_cpu[task.category] if isinstance(
_percent_cpu = percent_cpu[task.name] if isinstance(
percent_cpu, dict) else percent_cpu
_cpu_work = cpu_work[task.category] if isinstance(
_cpu_work = cpu_work[task.name] if isinstance(
cpu_work, dict) else cpu_work

params = [f"--percent-cpu {_percent_cpu}", f"--cpu-work {int(_cpu_work)}"]
Expand All @@ -434,7 +434,7 @@ def _generate_task_gpu_params(self, task: Task, gpu_work: Union[int, Dict[str, i
"""
if not gpu_work:
return []
_gpu_work = gpu_work[task.category] if isinstance(
_gpu_work = gpu_work[task.name] if isinstance(
gpu_work, dict) else gpu_work

return [f"--gpu-work {_gpu_work}"]
Expand Down Expand Up @@ -497,12 +497,12 @@ def _output_files(self, data: Dict[str, str]) -> Dict[str, Dict[str, int]]:
for task in self.workflow.tasks.values():
output_files.setdefault(task.task_id, {})
if not self.workflow.tasks_children[task.task_id]:
output_files[task.task_id][task.task_id] = int(data[task.category])
output_files[task.task_id][task.task_id] = int(data[task.name])
else:
for child_name in self.workflow.tasks_children[task.task_id]:
child = self.workflow.tasks[child_name]
output_files[task.task_id][child.task_id] = int(
data[child.category])
data[child.name])

return output_files

Expand Down Expand Up @@ -561,7 +561,7 @@ def _add_input_files(self, output_files: Dict[str, Dict[str, str]], data: Union[
if not self.workflow.tasks_parents[task.task_id]:
task.input_files.append(
File(f"{task.task_id}_input.txt",
data[task.category] if isinstance(
data[task.name] if isinstance(
data, Dict) else data,
FileLink.INPUT))
inputs.append(f'{task.task_id}_input.txt')
Expand Down Expand Up @@ -591,7 +591,7 @@ def _generate_data_for_root_nodes(self, save_dir: pathlib.Path, data: Union[int,
"""
for task in self.workflow.tasks.values():
if not self.workflow.tasks_parents[task.task_id]:
file_size = data[task.category] if isinstance(
file_size = data[task.name] if isinstance(
data, Dict) else data
file = save_dir.joinpath(f"{task.task_id}_input.txt")
if not file.is_file():
Expand Down