Skip to content

Commit cc217e4

Browse files
authored
Merge pull request #11 from ods/refactor_exceptions
Richer exception interface to simplify debugging huge inserts
2 parents a3ce940 + a5901bf commit cc217e4

File tree

7 files changed

+118
-61
lines changed

7 files changed

+118
-61
lines changed

aiochsa/client.py

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44

55
import aiohttp
66

7-
from .compiler import Compiler
7+
from .compiler import Compiler, Statement
88
from .dialect import ClickhouseSaDialect
9-
from .exc import DBException, ProtocolError
10-
from .parser import parse_json_compact
9+
from .exc import DBException, ProtocolError, exc_message_re
10+
from .parser import parse_json_compact, JSONDecodeError
1111
from .record import Record
1212
from .types import TypeRegistry
1313

@@ -43,21 +43,26 @@ def __init__(
4343
self._types = types
4444
self._compiler = Compiler(dialect=dialect, escape=types.escape)
4545

46-
async def _execute(self, statement: str, *args) -> Iterable[Record]:
47-
query, json_each_row_parameters = self._compiler.compile_statement(
46+
async def _execute(self, statement: Statement, *args) -> Iterable[Record]:
47+
compiled, json_each_row_parameters = self._compiler.compile_statement(
4848
statement, args,
4949
)
50+
sql_logger.debug(compiled)
51+
compiled_with_params = compiled
52+
rows = None
5053
if json_each_row_parameters:
5154
to_json = self._types.to_json # lookup optimization
52-
query += '\n'
53-
query += '\n'.join(
55+
rows = [
5456
json.dumps(
5557
{name: to_json(value) for name, value in row.items()},
5658
use_decimal=True,
5759
)
5860
for row in json_each_row_parameters
59-
)
60-
sql_logger.debug(query)
61+
]
62+
if sql_logger.isEnabledFor(logging.DEBUG):
63+
for idx, row in enumerate(rows):
64+
sql_logger.debug(f'{idx}: {row}')
65+
compiled_with_params += '\n' + '\n'.join(rows)
6166

6267
# First attempt may fail due to broken state of aiohttp session
6368
# (aiohttp doesn't handle connection closing properly?)
@@ -66,18 +71,27 @@ async def _execute(self, statement: str, *args) -> Iterable[Record]:
6671
async with self._session.post(
6772
self.url,
6873
params = {'default_format': 'JSONCompact', **self.params},
69-
data = query.encode(),
74+
data = compiled_with_params.encode(),
7075
) as response:
76+
body = await response.read()
7177
if response.status != 200:
72-
body = await response.read()
7378
raise DBException.from_message(
74-
query, body.decode(errors='replace'),
79+
body.decode(errors='replace'),
80+
statement=compiled, rows=rows,
7581
)
7682

77-
if response.content_type == 'application/json':
78-
return await parse_json_compact(
79-
self._types, response.content,
80-
)
83+
elif response.content_type == 'application/json':
84+
try:
85+
return parse_json_compact(self._types, body)
86+
except JSONDecodeError:
87+
body_str = body.decode(errors='replace')
88+
m = exc_message_re.search(body_str)
89+
if not m:
90+
raise
91+
raise DBException.from_message(
92+
body_str[m.start():],
93+
statement=compiled, rows=rows,
94+
)
8195
else:
8296
return ()
8397
except aiohttp.ClientError as exc:
@@ -86,22 +100,22 @@ async def _execute(self, statement: str, *args) -> Iterable[Record]:
86100
logger.debug(f'First attempt failed, retrying (error: {exc})')
87101

88102
async def iterate(
89-
self, statement: str, *args,
103+
self, statement: Statement, *args,
90104
) -> AsyncGenerator[Record, None]:
91105
for row in await self._execute(statement, *args):
92106
yield row
93107

94-
async def execute(self, statement: str, *args) -> None:
108+
async def execute(self, statement: Statement, *args) -> None:
95109
await self._execute(statement, *args)
96110

97-
async def fetch(self, statement: str, *args) -> List[Record]:
111+
async def fetch(self, statement: Statement, *args) -> List[Record]:
98112
return list(await self._execute(statement, *args))
99113

100-
async def fetchrow(self, statement: str, *args) -> Optional[Record]:
114+
async def fetchrow(self, statement: Statement, *args) -> Optional[Record]:
101115
gen = await self._execute(statement, *args)
102116
return next(iter(gen), None)
103117

104-
async def fetchval(self, statement: str, *args) -> Any:
118+
async def fetchval(self, statement: Statement, *args) -> Any:
105119
row = await self.fetchrow(statement, *args)
106120
if row is not None:
107121
return row[0]

aiochsa/compiler.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from types import SimpleNamespace
2+
from typing import Union
23

34
from sqlalchemy.engine.util import _distill_params
45
from sqlalchemy.sql import func, ClauseElement
@@ -7,6 +8,9 @@
78
from sqlalchemy.sql.functions import FunctionElement
89

910

11+
Statement = Union[str, ClauseElement]
12+
13+
1014
class Compiler:
1115

1216
def __init__(self, dialect, escape):
@@ -91,7 +95,7 @@ def _execute_context(self, dialect, constructor, statement, parameters, *args):
9195
return context.statement % escaped, ()
9296

9397

94-
def compile_statement(self, statement, args):
98+
def compile_statement(self, statement: Statement, args):
9599
if isinstance(statement, str):
96100
assert not args
97101
return statement, args

aiochsa/exc.py

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
1+
from collections import namedtuple
12
import re
23

34

45
# Based on `getExceptionMessage()`:
5-
# https://github.com/yandex/ClickHouse/blob/master/dbms/src/Common/Exception.cpp#L261
6-
_match_exc_message = re.compile(
7-
r'^Code: (?P<code>\d+), '
6+
# https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/Exception.cpp#L370-L373
7+
exc_message_re = re.compile(
8+
r'Code: (?P<code>\d+), '
89
r'e\.displayText\(\) = (?P<display_text>.+?)'
910
r'(?:, Stack trace[^:]*:\s+(?P<stack_trace>.+))?$',
1011
re.M,
11-
).match
12+
)
1213

14+
at_row_re = re.compile(r'[(]at row (?P<num>\d+)[)]')
1315

1416
class AiochsaException(Exception):
1517
""" Base class for aiochsa exceptions """
@@ -19,33 +21,56 @@ class ProtocolError(AiochsaException):
1921
""" Error communicating to Clickhouse server """
2022

2123

24+
RowInfo = namedtuple('RowInfo', ['num', 'content'])
25+
26+
2227
class DBException(AiochsaException):
2328
""" Error returned from Clickhouse database """
2429

25-
def __init__(self, statement, code, display_text, stack_trace=None):
26-
super().__init__(statement, code, display_text, stack_trace)
27-
self.statement = statement
30+
def __init__(
31+
self, code, display_text, stack_trace=None, statement=None, row=None,
32+
):
33+
super().__init__(code, display_text, stack_trace)
2834
self.code = code
2935
self.display_text = display_text
3036
self.stack_trace = stack_trace
37+
self.statement = statement
38+
self.row = row
3139

3240
def __str__(self):
33-
statement = self.statement
34-
if len(statement) > 200:
35-
statement = statement[:200] + '...'
36-
return f'[Code={self.code}] {self.display_text}: {statement}'
41+
message = f'[Code={self.code}] {self.display_text}'
42+
if self.statement:
43+
statement = self.statement
44+
if len(statement) > 200:
45+
statement = statement[:200] + '...'
46+
message += f'\n{statement}'
47+
if self.row:
48+
message += f'\n{self.row.num}: {self.row.content}'
49+
return message
3750

3851
@classmethod
39-
def from_message(cls, statement, exc_message):
40-
m = _match_exc_message(exc_message)
52+
def from_message(cls, exc_message, *, statement=None, rows=None):
53+
m = exc_message_re.match(exc_message)
4154
if m:
55+
display_text = m.group('display_text')
56+
57+
row = None
58+
if rows:
59+
at_row_m = at_row_re.search(display_text)
60+
if at_row_m:
61+
# It's 1-based
62+
row_num = int(at_row_m.group('num'))
63+
if len(rows) >= row_num:
64+
row = RowInfo(row_num, rows[row_num - 1])
65+
4266
return cls(
43-
statement=statement,
4467
code=int(m.group('code')),
45-
display_text=m.group('display_text'),
68+
display_text=display_text,
4669
stack_trace=m.group('stack_trace'),
70+
statement=statement,
71+
row=row,
4772
)
4873
else: # pragma: nocover
4974
return cls(
50-
statement=statement, code=None, display_text=exc_message,
75+
code=None, display_text=exc_message, statement=statement,
5176
)

aiochsa/parser.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
1-
import asyncio
21
import pkgutil
32
import simplejson as json
4-
from typing import AsyncGenerator, Iterable, Union
3+
from typing import Iterable
54

6-
import aiohttp
75
from lark import Lark, Transformer, v_args
86

97
from .record import Record
108
from .types import TypeRegistry
119

1210

13-
__all__ = ['parse_type']
11+
__all__ = ['parse_type', 'parse_json_compact', 'JSONDecodeError']
12+
13+
14+
# Re-export
15+
JSONDecodeError = json.JSONDecodeError
1416

1517

1618
type_parser = Lark(
@@ -43,19 +45,18 @@ def parse_type(types: TypeRegistry, type_str):
4345
return TypeTransformer(types).transform(tree)
4446

4547

46-
async def parse_json_compact(
47-
types: TypeRegistry,
48-
content: Union[asyncio.StreamReader, aiohttp.StreamReader],
48+
def parse_json_compact(
49+
types: TypeRegistry, content: bytes,
4950
) -> Iterable[Record]:
5051
# The method is split into three phases:
51-
# 1. Read whole response. It's done immediately after await-ing, so we
52-
# can close response after it.
52+
# 1. Parse JSON. It's done immediately, so that we can fall back to
53+
# parsing exception when it breaks normal response.
5354
# 2. Parse type information from meta. It's done at first iteration of
54-
# returned async-generator.
55+
# returned generator.
5556
# 3. Convert each row one-by-one. It's done on demand at each iteration.
5657
# This way we can cleanup resources even when result is not used.
5758

58-
json_data = json.loads(await content.read(), parse_float=str)
59+
json_data = json.loads(content, parse_float=str)
5960
return convert_json_compact(types, json_data)
6061

6162

tests/test_exc.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from decimal import Decimal
2+
13
import pytest
24

35
import aiochsa
@@ -33,3 +35,13 @@ async def test_exc_stacktrace(dsn, statement):
3335
assert str(error_codes.SYNTAX_ERROR) in str(exc_info.value)
3436
assert 'Syntax error' in str(exc_info.value)
3537
assert len(str(exc_info.value)) < 2000
38+
39+
40+
async def test_exc_row(conn, table_test1):
41+
with pytest.raises(aiochsa.DBException) as exc_info:
42+
await conn.execute(
43+
table_test1.insert(),
44+
{'amount': Decimal('1234567890.1234567890')},
45+
)
46+
assert exc_info.value.code == error_codes.ARGUMENT_OUT_OF_BOUND
47+
assert exc_info.value.row == (1, '{"amount": 1234567890.1234567890}')

tests/test_parser.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def test_parse_type(type_str, type_obj):
7474

7575
async def test_parse_json_compact_decimal():
7676
""" Insure Decimal is parsed without loss of precision """
77-
json_data = b'''\
77+
content = b'''\
7878
{
7979
"meta": [
8080
{"name": "column", "type": "Decimal(18, 13)"}
@@ -84,10 +84,6 @@ async def test_parse_json_compact_decimal():
8484
]
8585
}
8686
'''
87-
content = asyncio.StreamReader()
88-
content.feed_data(json_data)
89-
content.feed_eof()
90-
91-
[[value]] = list(await parse_json_compact(t.TypeRegistry(), content))
87+
[[value]] = list(parse_json_compact(t.TypeRegistry(), content))
9288
assert isinstance(value, Decimal)
9389
assert str(value) == '1.2345678901230'

tests/test_pool.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
import pytest
33

4-
from aiochsa import DBException
4+
from aiochsa import DBException, error_codes
55
from aiochsa.pool import dsn_to_params, create_pool
66

77

@@ -43,20 +43,25 @@ async def test_pool_acquire_release(pool):
4343
await pool.release(conn)
4444

4545

46+
LONG_QUERY = 'SELECT sleep(1) FROM numbers(10) SETTINGS max_block_size=1'
47+
48+
4649
async def test_pool_params(dsn):
4750
async with create_pool(dsn, max_execution_time=1) as conn:
48-
with pytest.raises(DBException):
49-
await conn.execute('SELECT sleep(3)')
51+
with pytest.raises(DBException) as exc_info:
52+
await conn.execute(LONG_QUERY)
53+
assert exc_info.value.code == error_codes.TIMEOUT_EXCEEDED
5054

5155

5256
async def test_pool_dsn_params(dsn):
5357
dsn += '?max_execution_time=1'
5458
async with create_pool(dsn) as conn:
55-
with pytest.raises(DBException):
56-
await conn.execute('SELECT sleep(3)')
59+
with pytest.raises(DBException) as exc_info:
60+
await conn.execute(LONG_QUERY)
61+
assert exc_info.value.code == error_codes.TIMEOUT_EXCEEDED
5762

5863

5964
async def test_session_timeout(dsn):
6065
async with create_pool(dsn, session_timeout=0.1) as conn:
6166
with pytest.raises(asyncio.TimeoutError):
62-
await conn.execute('SELECT sleep(1)')
67+
await conn.execute(LONG_QUERY)

0 commit comments

Comments
 (0)