Skip to content

Commit 0aaa63e

Browse files
committed
Streaming task progress.
Signed-off-by: Govind Kamat <[email protected]>
1 parent e7c4c8a commit 0aaa63e

File tree

1 file changed

+15
-17
lines changed

1 file changed

+15
-17
lines changed

osbenchmark/worker_coordinator/worker_coordinator.py

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,27 +1296,25 @@ def update_profile_samples(self, profile_samples):
12961296

12971297
def update_progress_message(self, task_finished=False):
12981298
if not self.quiet and self.current_step >= 0:
1299-
is_bulk = False
13001299
tasks = ",".join([t.name for t in self.tasks_per_join_point[self.current_step]])
1301-
if len(self.tasks_per_join_point[self.current_step]) == 1:
1302-
task = set(self.tasks_per_join_point[self.current_step]).pop()
1303-
is_bulk = task.operation.type == 'bulk'
13041300

1305-
if task_finished and not is_bulk:
1306-
total_progress = 1.0
1307-
else:
1308-
# we only count clients which actually contribute to progress. If clients are executing tasks eternally in a parallel
1309-
# structure, we should not count them. The reason is that progress depends entirely on the client(s) that execute the
1310-
# task that is completing the parallel structure.
1311-
progress_per_client = [s.task_progress
1312-
for s in self.most_recent_sample_per_client.values() if s.task_progress is not None]
1313-
1314-
num_clients = max(len(progress_per_client), 1)
1315-
progress_per_client = [p[0] for p in progress_per_client]
1316-
total_progress = sum(progress_per_client) / num_clients
1317-
if is_bulk:
1301+
# we only count clients which actually contribute to progress. If clients are executing tasks eternally in a parallel
1302+
# structure, we should not count them. The reason is that progress depends entirely on the client(s) that execute the
1303+
# task that is completing the parallel structure.
1304+
progress_per_client = [s.task_progress
1305+
for s in self.most_recent_sample_per_client.values() if s.task_progress is not None]
1306+
1307+
num_clients = len(progress_per_client)
1308+
assert num_clients > 0, "Number of clients is 0"
1309+
total_progress = sum([p[0] for p in progress_per_client]) / num_clients
1310+
units = set(progress_per_client)
1311+
assert len(units) == 1, "Encountered mix of disparate units while tracking task progress"
1312+
unit = units.pop()
1313+
if unit != '%':
13181314
self.progress_publisher.print("Running %s" % tasks, "[%4.1f GB]" % total_progress)
13191315
else:
1316+
if task_finished:
1317+
total_progress = 1.0
13201318
self.progress_publisher.print("Running %s" % tasks, "[%3d%% done]" % (round(total_progress * 100)))
13211319
if task_finished:
13221320
self.progress_publisher.finish()

0 commit comments

Comments
 (0)