Skip to content
This repository was archived by the owner on Sep 22, 2023. It is now read-only.

Commit 377a99f

Browse files
authored
fix: Update CLI with backend.ai-cli (#94)
* feat: Update CLI to use the new cli pkg with interrupt-handling mixin * setup: Upgrade backend.ai-cli to 0.4.1 * fix: Restore old code because now we use mixins * fix: Ensure proper SSL termination and proxy context termination - Currently Python's asyncio does not wait for processing of "connection lost" event for all open SSL transports. This generates a very long and messy exception traces, which would confuse the end-users. We apply a hack to mitigate this issue. - When we call other context managers inside context managers and get exceptions during an outer enter handler, we should call exit handlers of the inner context managers properly once their enter handlers are called.
1 parent 31957db commit 377a99f

File tree

11 files changed

+108
-122
lines changed

11 files changed

+108
-122
lines changed

CHANGELOG.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
Changes
22
=======
33

4+
20.03.0b2 (2020-04-xx)
5+
----------------------
6+
7+
* FIX: Improve exception handling in ``backend.ai app`` command
8+
and internally update backend.ai-cli package to work consistently
9+
with #93 (#94)
10+
411
19.12.0b1 (2020-01-11)
512
----------------------
613

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
'setuptools>=45.2.0',
77
]
88
install_requires = [
9-
'backend.ai-cli~=0.3',
9+
'backend.ai-cli~=0.4.1',
1010
'aiohttp~=3.6.2',
1111
'appdirs~=1.4.3',
1212
'async_timeout~=3.0', # to avoid pip10 resolver issue
@@ -98,6 +98,7 @@ def read_src_version():
9898
install_requires=install_requires,
9999
extras_require={
100100
'dev': dev_requires,
101+
'build': build_requires,
101102
'test': test_requires,
102103
'lint': lint_requires,
103104
'typecheck': typecheck_requires,

src/ai/backend/client/cli/__init__.py

Lines changed: 2 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,13 @@
1-
from pathlib import Path
2-
import os
3-
import signal
4-
import sys
51
import warnings
62

73
import click
8-
from click.exceptions import ClickException, Abort
94

105
from .. import __version__
116
from ..config import APIConfig, set_config
12-
from ai.backend.cli.extensions import AliasGroup
7+
from ai.backend.cli.extensions import ExtendedCommandGroup
138

149

15-
@click.group(cls=AliasGroup,
10+
@click.group(cls=ExtendedCommandGroup,
1611
context_settings={'help_option_names': ['-h', '--help']})
1712
@click.option('--skip-sslcert-validation',
1813
help='Skip SSL certificate validation for all API requests.',
@@ -29,24 +24,6 @@ def main(skip_sslcert_validation):
2924
warnings.showwarning = show_warning
3025

3126

32-
@click.command(context_settings=dict(ignore_unknown_options=True,
33-
allow_extra_args=True))
34-
def run_alias():
35-
"""
36-
Quick aliases for run command.
37-
"""
38-
mode = Path(sys.argv[0]).stem
39-
help = True if len(sys.argv) <= 1 else False
40-
if mode == 'lcc':
41-
sys.argv.insert(1, 'c')
42-
elif mode == 'lpython':
43-
sys.argv.insert(1, 'python')
44-
sys.argv.insert(1, 'run')
45-
if help:
46-
sys.argv.append('--help')
47-
run_main()
48-
49-
5027
def _attach_command():
5128
from . import admin, config, app, files, logs, manager, proxy, ps, run # noqa
5229
from . import vfolder # noqa
@@ -55,48 +32,3 @@ def _attach_command():
5532

5633

5734
_attach_command()
58-
59-
60-
def run_main():
61-
try:
62-
_interrupted = False
63-
main.main(
64-
standalone_mode=False,
65-
prog_name='backend.ai',
66-
)
67-
except KeyboardInterrupt:
68-
# For interruptions outside the Click's exception handling block.
69-
print("Interrupted!", end="", file=sys.stderr)
70-
sys.stderr.flush()
71-
_interrupted = True
72-
except Abort as e:
73-
# Click wraps unhandled KeyboardInterrupt with a plain
74-
# sys.exit(1) call and prints "Aborted!" message
75-
# (which would look non-sense to users).
76-
# This is *NOT* what we want.
77-
# Instead of relying on Click, mark the _interrupted
78-
# flag to perform our own exit routines.
79-
if isinstance(e.__context__, KeyboardInterrupt):
80-
print("Interrupted!", end="", file=sys.stderr)
81-
sys.stderr.flush()
82-
_interrupted = True
83-
else:
84-
print("Aborted!", end="", file=sys.stderr)
85-
sys.stderr.flush()
86-
sys.exit(1)
87-
except ClickException as e:
88-
e.show()
89-
sys.exit(e.exit_code)
90-
finally:
91-
if _interrupted:
92-
# Override the exit code when it's interrupted,
93-
# referring https://github.com/python/cpython/pull/11862
94-
if sys.platform.startswith('win'):
95-
# Use STATUS_CONTROL_C_EXIT to notify cmd.exe
96-
# for interrupted exit
97-
sys.exit(-1073741510)
98-
else:
99-
# Use the default signal handler to set the exit
100-
# code properly for interruption.
101-
signal.signal(signal.SIGINT, signal.SIG_DFL)
102-
os.kill(os.getpid(), signal.SIGINT)

src/ai/backend/client/cli/__main__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from . import run_main
1+
from . import main
22

33

4-
run_main()
4+
main()

src/ai/backend/client/cli/app.py

Lines changed: 44 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import asyncio
22
import json
3-
import os
43
import shlex
5-
import signal
64
import sys
75
from typing import (
86
Union, Optional,
@@ -197,34 +195,37 @@ async def __aenter__(self) -> None:
197195
await self.api_session.__aenter__()
198196

199197
user_url_template = "{protocol}://{host}:{port}"
200-
compute_session = self.api_session.ComputeSession(self.session_name)
201-
all_apps = await compute_session.stream_app_info()
202-
for app_info in all_apps:
203-
if app_info['name'] == self.app_name:
204-
if 'url_template' in app_info.keys():
205-
user_url_template = app_info['url_template']
206-
break
207-
else:
208-
print_fail(f'The app "{self.app_name}" is not supported by the session.')
209-
self.exit_code = 1
210-
os.kill(0, signal.SIGINT)
211-
return
212-
213-
self.local_server = await asyncio.start_server(
214-
self.handle_connection, self.host, self.port)
215-
user_url = user_url_template.format(
216-
protocol=self.protocol,
217-
host=self.host,
218-
port=self.port,
219-
)
220-
print_info(
221-
"A local proxy to the application \"{0}\" ".format(self.app_name) +
222-
"provided by the session \"{0}\" ".format(self.session_name) +
223-
"is available at:\n{0}".format(user_url)
224-
)
225-
if self.host == '0.0.0.0':
226-
print_warn('NOTE: Replace "0.0.0.0" with the actual hostname you use '
227-
'to connect with the CLI app proxy.')
198+
try:
199+
compute_session = self.api_session.ComputeSession(self.session_name)
200+
all_apps = await compute_session.stream_app_info()
201+
for app_info in all_apps:
202+
if app_info['name'] == self.app_name:
203+
if 'url_template' in app_info.keys():
204+
user_url_template = app_info['url_template']
205+
break
206+
else:
207+
print_fail(f'The app "{self.app_name}" is not supported by the session.')
208+
self.exit_code = 1
209+
return
210+
211+
self.local_server = await asyncio.start_server(
212+
self.handle_connection, self.host, self.port)
213+
user_url = user_url_template.format(
214+
protocol=self.protocol,
215+
host=self.host,
216+
port=self.port,
217+
)
218+
print_info(
219+
"A local proxy to the application \"{0}\" ".format(self.app_name) +
220+
"provided by the session \"{0}\" ".format(self.session_name) +
221+
"is available at:\n{0}".format(user_url)
222+
)
223+
if self.host == '0.0.0.0':
224+
print_warn('NOTE: Replace "0.0.0.0" with the actual hostname you use '
225+
'to connect with the CLI app proxy.')
226+
except Exception:
227+
await self.api_session.__aexit__(*sys.exc_info())
228+
raise
228229

229230
async def __aexit__(self, *exc_info) -> None:
230231
if self.local_server is not None:
@@ -267,15 +268,19 @@ def app(session_name, app, protocol, bind, arg, env):
267268
elif len(bind_parts) == 2:
268269
host = bind_parts[0]
269270
port = int(bind_parts[1])
270-
proxy_ctx = ProxyRunnerContext(
271-
host, port,
272-
session_name, app,
273-
protocol=protocol,
274-
args=arg,
275-
envs=env,
276-
)
277-
asyncio_run_forever(proxy_ctx)
278-
sys.exit(proxy_ctx.exit_code)
271+
try:
272+
proxy_ctx = ProxyRunnerContext(
273+
host, port,
274+
session_name, app,
275+
protocol=protocol,
276+
args=arg,
277+
envs=env,
278+
)
279+
asyncio_run_forever(proxy_ctx)
280+
sys.exit(proxy_ctx.exit_code)
281+
except Exception as e:
282+
print_error(e)
283+
sys.exit(1)
279284

280285

281286
@main.command()

src/ai/backend/client/cli/dotfile.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
import click
44
from tabulate import tabulate
55

6-
from . import AliasGroup, main
6+
from . import main
77
from .pretty import print_info, print_warn, print_error
88
from ..session import Session
99

1010

11-
@main.group(cls=AliasGroup)
11+
@main.group()
1212
def dotfile():
1313
'''Provides dotfile operations.'''
1414

src/ai/backend/client/cli/session_template.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@
33
import click
44
from tabulate import tabulate
55

6-
from ai.backend.cli.extensions import AliasGroup
76
from . import main
87
from .pretty import print_info, print_warn, print_error
98
from ..session import Session
109

1110

12-
@main.group(cls=AliasGroup, aliases=['sesstpl'])
11+
@main.group(aliases=['sesstpl'])
1312
def session_template():
1413
'''Provides task template operations'''
1514

src/ai/backend/client/cli/vfolder.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,12 @@
66
import click
77
from tabulate import tabulate
88

9-
from ai.backend.cli.extensions import AliasGroup
109
from . import main
1110
from .pretty import print_wait, print_done, print_error, print_fail
1211
from ..session import Session
1312

1413

15-
@main.group(cls=AliasGroup)
14+
@main.group()
1615
def vfolder():
1716
'''Provides virtual folder operations.'''
1817

src/ai/backend/client/request.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io
66
import logging
77
from pathlib import Path
8+
import sys
89
from typing import Any, Callable, Mapping, Sequence, Union
910

1011
import aiohttp
@@ -460,6 +461,7 @@ async def __aenter__(self):
460461
raw_resp = await self._rqst_ctx.__aenter__()
461462
if self.check_status and raw_resp.status // 100 != 2:
462463
msg = await raw_resp.text()
464+
await raw_resp.__aexit__(None, None, None)
463465
raise BackendAPIError(raw_resp.status, raw_resp.reason, msg)
464466
return self.response_cls(self.session, raw_resp,
465467
async_mode=self._async_mode)
@@ -475,6 +477,7 @@ async def __aenter__(self):
475477
except aiohttp.ClientResponseError as e:
476478
msg = 'API endpoint response error.\n' \
477479
'\u279c {!r}'.format(e)
480+
await raw_resp.__aexit__(*sys.exc_info())
478481
raise BackendClientError(msg) from e
479482

480483
def __exit__(self, *args):

src/ai/backend/client/session.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,46 @@ async def _negotiate_api_version(
6363
return client_version
6464

6565

66+
async def _close_aiohttp_session(session: aiohttp.ClientSession):
67+
# This is a hacky workaround for premature closing of SSL transports
68+
# on Windows Proactor event loops.
69+
# Thanks to Vadim Markovtsev's comment on the aiohttp issue #1925.
70+
# (https://github.com/aio-libs/aiohttp/issues/1925#issuecomment-592596034)
71+
transports = 0
72+
all_is_lost = asyncio.Event()
73+
if len(session.connector._conns) == 0:
74+
all_is_lost.set()
75+
for conn in session.connector._conns.values():
76+
for handler, _ in conn:
77+
proto = getattr(handler.transport, "_ssl_protocol", None)
78+
if proto is None:
79+
continue
80+
transports += 1
81+
orig_lost = proto.connection_lost
82+
orig_eof_received = proto.eof_received
83+
84+
def connection_lost(exc):
85+
orig_lost(exc)
86+
nonlocal transports
87+
transports -= 1
88+
if transports == 0:
89+
all_is_lost.set()
90+
91+
def eof_received():
92+
try:
93+
orig_eof_received()
94+
except AttributeError:
95+
# It may happen that eof_received() is called after
96+
# _app_protocol and _transport are set to None.
97+
pass
98+
99+
proto.connection_lost = connection_lost
100+
proto.eof_received = eof_received
101+
await session.close()
102+
if transports > 0:
103+
await all_is_lost.wait()
104+
105+
66106
class _SyncWorkerThread(threading.Thread):
67107

68108
sentinel = object()
@@ -374,7 +414,7 @@ def close(self):
374414
if self._closed:
375415
return
376416
self._closed = True
377-
self._worker_thread.work_queue.put(self.aiohttp_session.close())
417+
self._worker_thread.work_queue.put(_close_aiohttp_session(self.aiohttp_session))
378418
self._worker_thread.work_queue.put(self.worker_thread.sentinel)
379419
self._worker_thread.join()
380420

@@ -600,7 +640,7 @@ async def close(self):
600640
if self._closed:
601641
return
602642
self._closed = True
603-
await self.aiohttp_session.close()
643+
await _close_aiohttp_session(self.aiohttp_session)
604644

605645
async def __aenter__(self):
606646
assert not self.closed, 'Cannot reuse closed session'

0 commit comments

Comments
 (0)