From f7fafe50547c1cfa3ca46a25f67e2bfa877bfcda Mon Sep 17 00:00:00 2001 From: uchanLee Date: Tue, 7 Sep 2021 00:07:47 +0900 Subject: [PATCH 1/3] feat: display kernel-pull-progress using tqdm --- changes/181.feature | 1 + src/ai/backend/client/func/session.py | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+) create mode 100644 changes/181.feature diff --git a/changes/181.feature b/changes/181.feature new file mode 100644 index 00000000..7e3d96a3 --- /dev/null +++ b/changes/181.feature @@ -0,0 +1 @@ +Display kernel-pull-progress from background-task-reporter diff --git a/src/ai/backend/client/func/session.py b/src/ai/backend/client/func/session.py index a5f524cc..bfeead45 100644 --- a/src/ai/backend/client/func/session.py +++ b/src/ai/backend/client/func/session.py @@ -298,6 +298,27 @@ async def get_or_create( rqst.set_json(params) async with rqst.fetch() as resp: data = await resp.json() + with tqdm(total=100, unit='%') as pbar: + task_id = data['background_task'] + bgtask = resp.session.BackgroundTask(task_id) + async with bgtask.listen_events() as response: + async for ev in response: + progress = json.loads(ev.data) + if ev.event == 'bgtask_updated': + current = progress['current_progress'] + total = progress['total_progress'] + if total == 0: + total = 1e-2 + pbar.n = round(current / total * 100, 2) + pbar.update(0) + pbar.refresh() + elif ev.event == 'bgtask_done': + pbar.n = 100.0 + pbar.update(0) + pbar.refresh() + pbar.clear() + async with rqst.fetch() as resp: + data = await resp.json() o = cls(name, owner_access_key) # type: ignore if api_session.get().api_version[0] >= 5: o.id = UUID(data['sessionId']) From b4520c386d2b2c3940df843872a21cab36bdb9ef Mon Sep 17 00:00:00 2001 From: uchanLee Date: Thu, 9 Sep 2021 16:46:56 +0900 Subject: [PATCH 2/3] fix:check background-task in response from manager --- changes/181.feature | 2 +- src/ai/backend/client/func/session.py | 43 ++++++++++++++------------- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/changes/181.feature b/changes/181.feature index 7e3d96a3..b9df3c38 100644 --- a/changes/181.feature +++ b/changes/181.feature @@ -1 +1 @@ -Display kernel-pull-progress from background-task-reporter +Display the progress of pulling kernel from reporter of the background task. diff --git a/src/ai/backend/client/func/session.py b/src/ai/backend/client/func/session.py index bfeead45..6b9f46eb 100644 --- a/src/ai/backend/client/func/session.py +++ b/src/ai/backend/client/func/session.py @@ -298,27 +298,28 @@ async def get_or_create( rqst.set_json(params) async with rqst.fetch() as resp: data = await resp.json() - with tqdm(total=100, unit='%') as pbar: - task_id = data['background_task'] - bgtask = resp.session.BackgroundTask(task_id) - async with bgtask.listen_events() as response: - async for ev in response: - progress = json.loads(ev.data) - if ev.event == 'bgtask_updated': - current = progress['current_progress'] - total = progress['total_progress'] - if total == 0: - total = 1e-2 - pbar.n = round(current / total * 100, 2) - pbar.update(0) - pbar.refresh() - elif ev.event == 'bgtask_done': - pbar.n = 100.0 - pbar.update(0) - pbar.refresh() - pbar.clear() - async with rqst.fetch() as resp: - data = await resp.json() + if 'background_task' in data: + with tqdm(total=100, unit='%') as pbar: + task_id = data['background_task'] + bgtask = resp.session.BackgroundTask(task_id) + async with bgtask.listen_events() as response: + async for ev in response: + progress = json.loads(ev.data) + if ev.event == 'bgtask_updated': + current = progress['current_progress'] + total = progress['total_progress'] + if total == 0: + total = 1e-2 + pbar.n = round(current / total * 100, 2) + pbar.update(0) + pbar.refresh() + elif ev.event == 'bgtask_done': + pbar.n = 100.0 + pbar.update(0) + pbar.refresh() + pbar.clear() + async with rqst.fetch() as resp: + data = await resp.json() o = cls(name, owner_access_key) # type: ignore if api_session.get().api_version[0] >= 5: o.id = UUID(data['sessionId']) From b93a49261a2c9380db59a480f3ad2172a657f170 Mon Sep 17 00:00:00 2001 From: uchanLee Date: Sun, 12 Sep 2021 19:01:30 +0900 Subject: [PATCH 3/3] fix:separate the console output handling and the abstract bgtask handling --- src/ai/backend/client/cli/run.py | 42 +++++++++++++++++++++++++-- src/ai/backend/client/func/session.py | 25 ++-------------- 2 files changed, 43 insertions(+), 24 deletions(-) diff --git a/src/ai/backend/client/cli/run.py b/src/ai/backend/client/cli/run.py index aa0eacbe..74a4d217 100644 --- a/src/ai/backend/client/cli/run.py +++ b/src/ai/backend/client/cli/run.py @@ -15,6 +15,7 @@ Sequence, Tuple, ) +from tqdm import tqdm import aiohttp import click @@ -599,10 +600,45 @@ async def _run(session, idx, name, envs, except Exception as e: print_fail('[{0}] {1}'.format(idx, e)) return + + async def display_kernel_pulling(compute_session: AsyncSession.ComputeSession) -> bool: + try: + bgtask = compute_session.backgroundtask + except Exception as e: + print_error(e) + return False + else: + with tqdm(total=100, unit='%') as pbar: + async with bgtask.listen_events() as response: + async for ev in response: + progress = json.loads(ev.data) + if ev.event == 'bgtask_updated': + current = progress['current_progress'] + total = progress['total_progress'] + if total == 0: + pbar.n = 0 + else: + pbar.n = round(current / total * 100, 2) + pbar.update(0) + pbar.refresh() + elif ev.event == 'bgtask_done': + pbar.n = 100 + pbar.update(0) + pbar.refresh() + pbar.clear() + compute_session = await session.ComputeSession.get_or_create( + image, + name=name, + ) + await asyncio.sleep(0.1) + return True + if compute_session.status == 'PENDING': print_info('Session ID {0} is enqueued for scheduling.' .format(name)) - return + result = await display_kernel_pulling(compute_session) + if not result: + return elif compute_session.status == 'SCHEDULED': print_info('Session ID {0} is scheduled and about to be started.' .format(name)) @@ -623,7 +659,9 @@ async def _run(session, idx, name, envs, elif compute_session.status == 'TIMEOUT': print_info('Session ID {0} is still on the job queue.' .format(name)) - return + result = await display_kernel_pulling(compute_session) + if not result: + return elif compute_session.status in ('ERROR', 'CANCELLED'): print_fail('Session ID {0} has an error during scheduling/startup or cancelled.' .format(name)) diff --git a/src/ai/backend/client/func/session.py b/src/ai/backend/client/func/session.py index 8f633d67..d2371482 100644 --- a/src/ai/backend/client/func/session.py +++ b/src/ai/backend/client/func/session.py @@ -301,28 +301,6 @@ async def get_or_create( rqst.set_json(params) async with rqst.fetch() as resp: data = await resp.json() - if 'background_task' in data: - with tqdm(total=100, unit='%') as pbar: - task_id = data['background_task'] - bgtask = resp.session.BackgroundTask(task_id) - async with bgtask.listen_events() as response: - async for ev in response: - progress = json.loads(ev.data) - if ev.event == 'bgtask_updated': - current = progress['current_progress'] - total = progress['total_progress'] - if total == 0: - total = 1e-2 - pbar.n = round(current / total * 100, 2) - pbar.update(0) - pbar.refresh() - elif ev.event == 'bgtask_done': - pbar.n = 100.0 - pbar.update(0) - pbar.refresh() - pbar.clear() - async with rqst.fetch() as resp: - data = await resp.json() o = cls(name, owner_access_key) # type: ignore if api_session.get().api_version[0] >= 5: o.id = UUID(data['sessionId']) @@ -331,6 +309,9 @@ async def get_or_create( o.service_ports = data.get('servicePorts', []) o.domain = domain_name o.group = group_name + if 'background_task' in data: + task_id = data['background_task'] + o.backgroundtask = resp.session.BackgroundTask(task_id) return o @api_function