6
6
7
7
8
8
import asyncpg
9
- import sys
10
- import textwrap
9
+ import typing
10
+
11
+ from ._postgres_message import PostgresMessage
11
12
12
13
13
14
__all__ = ['PostgresError' , 'FatalPostgresError' , 'UnknownPostgresError' ,
16
17
'UnsupportedClientFeatureError' ]
17
18
18
19
19
- def _is_asyncpg_class (cls ):
20
- modname = cls .__module__
21
- return modname == 'asyncpg' or modname .startswith ('asyncpg.' )
22
-
23
-
24
- class PostgresMessageMeta (type ):
25
-
26
- _message_map = {}
27
- _field_map = {
28
- 'S' : 'severity' ,
29
- 'V' : 'severity_en' ,
30
- 'C' : 'sqlstate' ,
31
- 'M' : 'message' ,
32
- 'D' : 'detail' ,
33
- 'H' : 'hint' ,
34
- 'P' : 'position' ,
35
- 'p' : 'internal_position' ,
36
- 'q' : 'internal_query' ,
37
- 'W' : 'context' ,
38
- 's' : 'schema_name' ,
39
- 't' : 'table_name' ,
40
- 'c' : 'column_name' ,
41
- 'd' : 'data_type_name' ,
42
- 'n' : 'constraint_name' ,
43
- 'F' : 'server_source_filename' ,
44
- 'L' : 'server_source_line' ,
45
- 'R' : 'server_source_function'
46
- }
47
-
48
- def __new__ (mcls , name , bases , dct ):
49
- cls = super ().__new__ (mcls , name , bases , dct )
50
- if cls .__module__ == mcls .__module__ and name == 'PostgresMessage' :
51
- for f in mcls ._field_map .values ():
52
- setattr (cls , f , None )
53
-
54
- if _is_asyncpg_class (cls ):
55
- mod = sys .modules [cls .__module__ ]
56
- if hasattr (mod , name ):
57
- raise RuntimeError ('exception class redefinition: {}' .format (
58
- name ))
59
-
60
- code = dct .get ('sqlstate' )
61
- if code is not None :
62
- existing = mcls ._message_map .get (code )
63
- if existing is not None :
64
- raise TypeError ('{} has duplicate SQLSTATE code, which is'
65
- 'already defined by {}' .format (
66
- name , existing .__name__ ))
67
- mcls ._message_map [code ] = cls
68
-
69
- return cls
70
-
71
- @classmethod
72
- def get_message_class_for_sqlstate (mcls , code ):
73
- return mcls ._message_map .get (code , UnknownPostgresError )
74
-
75
-
76
- class PostgresMessage (metaclass = PostgresMessageMeta ):
77
-
78
- @classmethod
79
- def _get_error_class (cls , fields ):
80
- sqlstate = fields .get ('C' )
81
- return type (cls ).get_message_class_for_sqlstate (sqlstate )
82
-
83
- @classmethod
84
- def _get_error_dict (cls , fields , query ):
85
- dct = {
86
- 'query' : query
87
- }
88
-
89
- field_map = type (cls )._field_map
90
- for k , v in fields .items ():
91
- field = field_map .get (k )
92
- if field :
93
- dct [field ] = v
94
-
95
- return dct
96
-
97
- @classmethod
98
- def _make_constructor (cls , fields , query = None ):
99
- dct = cls ._get_error_dict (fields , query )
100
-
101
- exccls = cls ._get_error_class (fields )
102
- message = dct .get ('message' , '' )
103
-
104
- # PostgreSQL will raise an exception when it detects
105
- # that the result type of the query has changed from
106
- # when the statement was prepared.
107
- #
108
- # The original error is somewhat cryptic and unspecific,
109
- # so we raise a custom subclass that is easier to handle
110
- # and identify.
111
- #
112
- # Note that we specifically do not rely on the error
113
- # message, as it is localizable.
114
- is_icse = (
115
- exccls .__name__ == 'FeatureNotSupportedError' and
116
- _is_asyncpg_class (exccls ) and
117
- dct .get ('server_source_function' ) == 'RevalidateCachedQuery'
118
- )
119
-
120
- if is_icse :
121
- exceptions = sys .modules [exccls .__module__ ]
122
- exccls = exceptions .InvalidCachedStatementError
123
- message = ('cached statement plan is invalid due to a database '
124
- 'schema or configuration change' )
125
-
126
- is_prepared_stmt_error = (
127
- exccls .__name__ in ('DuplicatePreparedStatementError' ,
128
- 'InvalidSQLStatementNameError' ) and
129
- _is_asyncpg_class (exccls )
130
- )
131
-
132
- if is_prepared_stmt_error :
133
- hint = dct .get ('hint' , '' )
134
- hint += textwrap .dedent ("""\
135
-
136
- NOTE: pgbouncer with pool_mode set to "transaction" or
137
- "statement" does not support prepared statements properly.
138
- You have two options:
139
-
140
- * if you are using pgbouncer for connection pooling to a
141
- single server, switch to the connection pool functionality
142
- provided by asyncpg, it is a much better option for this
143
- purpose;
144
-
145
- * if you have no option of avoiding the use of pgbouncer,
146
- then you can set statement_cache_size to 0 when creating
147
- the asyncpg connection object.
148
- """ )
149
-
150
- dct ['hint' ] = hint
151
-
152
- return exccls , message , dct
153
-
154
- def as_dict (self ):
155
- dct = {}
156
- for f in type (self )._field_map .values ():
157
- val = getattr (self , f )
158
- if val is not None :
159
- dct [f ] = val
160
- return dct
20
+ _PE = typing .TypeVar ('_PE' , bound = 'PostgresError' )
21
+ _IE = typing .TypeVar ('_IE' , bound = 'InterfaceError' )
22
+ _PM = typing .TypeVar ('_PM' , bound = 'PostgresMessage' )
161
23
162
24
163
25
class PostgresError (PostgresMessage , Exception ):
164
26
"""Base class for all Postgres errors."""
165
27
166
- def __str__ (self ):
167
- msg = self .args [0 ]
28
+ def __str__ (self ) -> str :
29
+ msg : str = self .args [0 ]
168
30
if self .detail :
169
31
msg += '\n DETAIL: {}' .format (self .detail )
170
32
if self .hint :
@@ -173,7 +35,11 @@ def __str__(self):
173
35
return msg
174
36
175
37
@classmethod
176
- def new (cls , fields , query = None ):
38
+ def new (
39
+ cls : typing .Type [_PE ],
40
+ fields : typing .Dict [str , str ],
41
+ query : typing .Optional [str ] = None
42
+ ) -> _PE :
177
43
exccls , message , dct = cls ._make_constructor (fields , query )
178
44
ex = exccls (message )
179
45
ex .__dict__ .update (dct )
@@ -189,12 +55,15 @@ class UnknownPostgresError(FatalPostgresError):
189
55
190
56
191
57
class InterfaceMessage :
192
- def __init__ (self , * , detail = None , hint = None ):
193
- self .detail = detail
194
- self .hint = hint
58
+ args : typing .Tuple [typing .Any , ...]
59
+
60
+ def __init__ (self , * , detail : typing .Optional [str ] = None ,
61
+ hint : typing .Optional [str ] = None ) -> None :
62
+ self .detail : typing .Optional [str ] = detail
63
+ self .hint : typing .Optional [str ] = hint
195
64
196
- def __str__ (self ):
197
- msg = self .args [0 ]
65
+ def __str__ (self ) -> str :
66
+ msg : str = self .args [0 ]
198
67
if self .detail :
199
68
msg += '\n DETAIL: {}' .format (self .detail )
200
69
if self .hint :
@@ -206,11 +75,12 @@ def __str__(self):
206
75
class InterfaceError (InterfaceMessage , Exception ):
207
76
"""An error caused by improper use of asyncpg API."""
208
77
209
- def __init__ (self , msg , * , detail = None , hint = None ):
78
+ def __init__ (self , msg : str , * , detail : typing .Optional [str ] = None ,
79
+ hint : typing .Optional [str ] = None ) -> None :
210
80
InterfaceMessage .__init__ (self , detail = detail , hint = hint )
211
81
Exception .__init__ (self , msg )
212
82
213
- def with_msg (self , msg ) :
83
+ def with_msg (self : _IE , msg : str ) -> _IE :
214
84
return type (self )(
215
85
msg ,
216
86
detail = self .detail ,
@@ -231,7 +101,8 @@ class UnsupportedClientFeatureError(InterfaceError):
231
101
class InterfaceWarning (InterfaceMessage , UserWarning ):
232
102
"""A warning caused by an improper use of asyncpg API."""
233
103
234
- def __init__ (self , msg , * , detail = None , hint = None ):
104
+ def __init__ (self , msg : str , * , detail : typing .Optional [str ] = None ,
105
+ hint : typing .Optional [str ] = None ) -> None :
235
106
InterfaceMessage .__init__ (self , detail = detail , hint = hint )
236
107
UserWarning .__init__ (self , msg )
237
108
@@ -247,25 +118,32 @@ class ProtocolError(InternalClientError):
247
118
class OutdatedSchemaCacheError (InternalClientError ):
248
119
"""A value decoding error caused by a schema change before row fetching."""
249
120
250
- def __init__ (self , msg , * , schema = None , data_type = None , position = None ):
121
+ def __init__ (self , msg : str , * , schema : typing .Optional [str ] = None ,
122
+ data_type : typing .Optional [str ] = None ,
123
+ position : typing .Optional [str ] = None ) -> None :
251
124
super ().__init__ (msg )
252
- self .schema_name = schema
253
- self .data_type_name = data_type
254
- self .position = position
125
+ self .schema_name : typing . Optional [ str ] = schema
126
+ self .data_type_name : typing . Optional [ str ] = data_type
127
+ self .position : typing . Optional [ str ] = position
255
128
256
129
257
130
class PostgresLogMessage (PostgresMessage ):
258
131
"""A base class for non-error server messages."""
259
132
260
- def __str__ (self ):
133
+ def __str__ (self ) -> str :
261
134
return '{}: {}' .format (type (self ).__name__ , self .message )
262
135
263
- def __setattr__ (self , name , val ) :
136
+ def __setattr__ (self , name : str , val : typing . Any ) -> None :
264
137
raise TypeError ('instances of {} are immutable' .format (
265
138
type (self ).__name__ ))
266
139
267
140
@classmethod
268
- def new (cls , fields , query = None ):
141
+ def new (
142
+ cls : typing .Type [_PM ],
143
+ fields : typing .Dict [str , str ],
144
+ query : typing .Optional [str ] = None
145
+ ) -> PostgresMessage :
146
+ exccls : typing .Type [PostgresMessage ]
269
147
exccls , message_text , dct = cls ._make_constructor (fields , query )
270
148
271
149
if exccls is UnknownPostgresError :
@@ -277,7 +155,7 @@ def new(cls, fields, query=None):
277
155
exccls = asyncpg .PostgresWarning
278
156
279
157
if issubclass (exccls , (BaseException , Warning )):
280
- msg = exccls (message_text )
158
+ msg : PostgresMessage = exccls (message_text )
281
159
else :
282
160
msg = exccls ()
283
161
0 commit comments