Skip to content
This repository was archived by the owner on Sep 22, 2023. It is now read-only.

Commit 99e1e3c

Browse files
committed
Add background task support and rewrite rescan-images command.
1 parent 1ce0454 commit 99e1e3c

File tree

8 files changed

+188
-83
lines changed

8 files changed

+188
-83
lines changed

src/ai/backend/client/cli/admin/images.py

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
1+
import json
12
import sys
3+
24
import click
35
from tabulate import tabulate
46

57
from . import admin
8+
from ...compat import asyncio_run
69
from ...session import Session
710
from ..pretty import print_done, print_warn, print_fail, print_error
811

912

1013
@admin.command()
1114
@click.option('--operation', is_flag=True, help='Get operational images only')
12-
def images(operation):
13-
'''
15+
def images(operation: bool) -> None:
16+
"""
1417
Show the list of registered images in this cluster.
15-
'''
18+
"""
1619
fields = [
1720
('Name', 'name'),
1821
('Registry', 'registry'),
@@ -40,25 +43,37 @@ def images(operation):
4043
@click.option('-r', '--registry', type=str, default=None,
4144
help='The name (usually hostname or "lablup") '
4245
'of the Docker registry configured.')
43-
def rescan_images(registry):
44-
'''Update the kernel image metadata from all configured docker registries.'''
45-
with Session() as session:
46-
try:
47-
result = session.Image.rescan_images(registry)
48-
except Exception as e:
49-
print_error(e)
50-
sys.exit(1)
51-
if result['ok']:
52-
print_done("Updated the image metadata from the configured registries.")
53-
else:
54-
print_fail(f"Rescanning has failed: {result['msg']}")
46+
def rescan_images(registry: str) -> None:
47+
"""
48+
Update the kernel image metadata from all configured docker registries.
49+
"""
50+
51+
async def rescan_images_impl(registry: str) -> None:
52+
async with Session() as session:
53+
try:
54+
result = await session.Image.rescan_images(registry)
55+
except Exception as e:
56+
print_error(e)
57+
sys.exit(1)
58+
if not result['ok']:
59+
print_fail(f"Rescanning has failed: {result['msg']}")
60+
sys.exit(1)
61+
print_done("Started updating the image metadata from the configured registries.")
62+
task_id = result['task_id']
63+
bgtask = session.BackgroundTask(task_id)
64+
async with bgtask.listen_events() as response:
65+
async for ev in response:
66+
print(click.style(ev.event, fg='cyan', bold=True), json.loads(ev.data))
67+
print_done("Finished registry scanning.")
68+
69+
asyncio_run(rescan_images_impl(registry))
5570

5671

5772
@admin.command()
5873
@click.argument('alias', type=str)
5974
@click.argument('target', type=str)
6075
def alias_image(alias, target):
61-
'''Add an image alias.'''
76+
"""Add an image alias."""
6277
with Session() as session:
6378
try:
6479
result = session.Image.alias_image(alias, target)
@@ -74,7 +89,7 @@ def alias_image(alias, target):
7489
@admin.command()
7590
@click.argument('alias', type=str)
7691
def dealias_image(alias):
77-
'''Remove an image alias.'''
92+
"""Remove an image alias."""
7893
with Session() as session:
7994
try:
8095
result = session.Image.dealias_image(alias)

src/ai/backend/client/cli/run.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1145,9 +1145,9 @@ def events(name, owner_access_key):
11451145
async def _run_events():
11461146
async with AsyncSession() as session:
11471147
compute_session = session.ComputeSession(name, owner_access_key)
1148-
async with compute_session.stream_events() as sse_response:
1149-
async for ev in sse_response.fetch_events():
1150-
print(click.style(ev['event'], fg='cyan', bold=True), json.loads(ev['data']))
1148+
async with compute_session.listen_events() as response:
1149+
async for ev in response:
1150+
print(click.style(ev.event, fg='cyan', bold=True), json.loads(ev.data))
11511151

11521152
try:
11531153
asyncio_run(_run_events())

src/ai/backend/client/compat.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
1-
'''
1+
"""
22
A compatibility module for backported codes from Python 3.6+ standard library.
3-
'''
3+
"""
44

55
import asyncio
66

7+
__all__ = (
8+
'current_loop',
9+
'all_tasks',
10+
'asyncio_run',
11+
'asyncio_run_forever',
12+
)
13+
714

815
if hasattr(asyncio, 'get_running_loop'): # Python 3.7+
916
current_loop = asyncio.get_running_loop
@@ -60,11 +67,11 @@ def _asyncio_run(coro, *, debug=False):
6067

6168

6269
def asyncio_run_forever(server_context, *, debug=False):
63-
'''
70+
"""
6471
A proposed-but-not-implemented asyncio.run_forever() API based on
6572
@vxgmichel's idea.
6673
See discussions on https://github.com/python/asyncio/pull/465
67-
'''
74+
"""
6875
loop = asyncio.new_event_loop()
6976
asyncio.set_event_loop(loop)
7077
loop.set_debug(debug)

src/ai/backend/client/func/bgtask.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from typing import Union
2+
from uuid import UUID
3+
4+
from ..request import (
5+
Request,
6+
SSEContextManager,
7+
)
8+
9+
10+
class BackgroundTask:
11+
"""
12+
Provides server-sent events streaming functions.
13+
"""
14+
15+
session = None
16+
'''The client session instance that this function class is bound to.'''
17+
18+
task_id: UUID
19+
20+
def __init__(self, task_id: Union[UUID, str]) -> None:
21+
self.task_id = task_id if isinstance(task_id, UUID) else UUID(task_id)
22+
23+
# only supported in AsyncAPISession
24+
def listen_events(self) -> SSEContextManager:
25+
"""
26+
Opens an event stream of the background task updates.
27+
28+
:returns: a context manager that returns an :class:`SSEResponse` object.
29+
"""
30+
params = {
31+
'task_id': str(self.task_id),
32+
}
33+
request = Request(
34+
self.session,
35+
'GET', '/events/background-task',
36+
params=params,
37+
)
38+
return request.connect_events()

0 commit comments

Comments
 (0)