Skip to content

Commit d43f38e

Browse files
authored
Add transaction model aiopg (#219)
* #197 Add transaction model aiopg * add docs
1 parent 9382f02 commit d43f38e

File tree

10 files changed

+688
-16
lines changed

10 files changed

+688
-16
lines changed

aiopg/__init__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@
55
from .connection import connect, Connection, TIMEOUT as DEFAULT_TIMEOUT
66
from .cursor import Cursor
77
from .pool import create_pool, Pool
8-
8+
from .transaction import IsolationLevel, Transaction
99

1010
__all__ = ('connect', 'create_pool', 'Connection', 'Cursor', 'Pool',
11-
'version', 'version_info', 'DEFAULT_TIMEOUT')
11+
'version', 'version_info', 'DEFAULT_TIMEOUT', 'IsolationLevel',
12+
'Transaction')
1213

1314
__version__ = '0.13.1'
1415

1516
version = __version__ + ' , Python ' + sys.version
1617

17-
1818
VersionInfo = namedtuple('VersionInfo',
1919
'major minor micro releaselevel serial')
2020

@@ -40,6 +40,6 @@ def _parse_version(ver):
4040

4141
version_info = _parse_version(__version__)
4242

43-
4443
# make pyflakes happy
45-
(connect, create_pool, Connection, Cursor, Pool, DEFAULT_TIMEOUT)
44+
(connect, create_pool, Connection, Cursor, Pool, DEFAULT_TIMEOUT,
45+
IsolationLevel, Transaction)

aiopg/cursor.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,17 @@
44
import psycopg2
55

66
from .log import logger
7-
from .utils import PY_35, PY_352
7+
from .transaction import Transaction, IsolationLevel
8+
from .utils import PY_35, PY_352, _TransactionBeginContextManager
89

910

1011
class Cursor:
11-
1212
def __init__(self, conn, impl, timeout, echo):
1313
self._conn = conn
1414
self._impl = impl
1515
self._timeout = timeout
1616
self._echo = echo
17+
self._transaction = Transaction(self, IsolationLevel.repeatable_read)
1718

1819
@property
1920
def echo(self):
@@ -146,6 +147,16 @@ def callproc(self, procname, parameters=None, *, timeout=None):
146147
else:
147148
yield from self._conn._poll(waiter, timeout)
148149

150+
def begin(self):
151+
return _TransactionBeginContextManager(self._transaction.begin())
152+
153+
def begin_nested(self):
154+
if not self._transaction.is_begin:
155+
return _TransactionBeginContextManager(
156+
self._transaction.begin())
157+
else:
158+
return self._transaction.point()
159+
149160
@asyncio.coroutine
150161
def mogrify(self, operation, parameters=None):
151162
"""Return a query string after arguments binding.

aiopg/transaction.py

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
import asyncio
2+
import enum
3+
import uuid
4+
import warnings
5+
from abc import ABC, abstractmethod
6+
7+
import psycopg2
8+
from aiopg.utils import PY_35, _TransactionPointContextManager
9+
10+
__all__ = ('IsolationLevel', 'Transaction')
11+
12+
13+
class IsolationCompiler(ABC):
14+
name = ''
15+
16+
__slots__ = ('_readonly', '_deferrable')
17+
18+
def __init__(self, readonly, deferrable):
19+
self._readonly = readonly
20+
self._deferrable = deferrable
21+
self._check_readonly_deferrable()
22+
23+
def _check_readonly_deferrable(self):
24+
available = self._readonly or self._deferrable
25+
if not isinstance(self, SerializableCompiler) and available:
26+
raise ValueError('Is only available for serializable transactions')
27+
28+
def savepoint(self, unique_id):
29+
return 'SAVEPOINT {}'.format(unique_id)
30+
31+
def release_savepoint(self, unique_id):
32+
return 'RELEASE SAVEPOINT {}'.format(unique_id)
33+
34+
def rollback_savepoint(self, unique_id):
35+
return 'ROLLBACK TO SAVEPOINT {}'.format(unique_id)
36+
37+
def commit(self):
38+
return 'COMMIT'
39+
40+
def rollback(self):
41+
return 'ROLLBACK'
42+
43+
@abstractmethod
44+
def begin(self):
45+
raise NotImplementedError("Please Implement this method")
46+
47+
def __repr__(self):
48+
return self.name
49+
50+
51+
class ReadCommittedCompiler(IsolationCompiler):
52+
name = 'Read committed'
53+
54+
def begin(self):
55+
return 'BEGIN'
56+
57+
58+
class RepeatableReadCompiler(IsolationCompiler):
59+
name = 'Repeatable read'
60+
61+
def begin(self):
62+
return 'BEGIN ISOLATION LEVEL REPEATABLE READ'
63+
64+
65+
class SerializableCompiler(IsolationCompiler):
66+
name = 'Serializable'
67+
68+
def begin(self):
69+
query = 'BEGIN ISOLATION LEVEL SERIALIZABLE'
70+
71+
if self._readonly:
72+
query += ' READ ONLY'
73+
74+
if self._deferrable:
75+
query += ' DEFERRABLE'
76+
77+
return query
78+
79+
80+
class IsolationLevel(enum.Enum):
81+
serializable = SerializableCompiler
82+
repeatable_read = RepeatableReadCompiler
83+
read_committed = ReadCommittedCompiler
84+
85+
def __call__(self, readonly, deferrable):
86+
return self.value(readonly, deferrable)
87+
88+
89+
class Transaction:
90+
__slots__ = ('_cur', '_is_begin', '_isolation', '_unique_id')
91+
92+
def __init__(self, cur, isolation_level,
93+
readonly=False, deferrable=False):
94+
self._cur = cur
95+
self._is_begin = False
96+
self._unique_id = None
97+
self._isolation = isolation_level(readonly, deferrable)
98+
99+
@property
100+
def is_begin(self):
101+
return self._is_begin
102+
103+
@asyncio.coroutine
104+
def begin(self):
105+
if self._is_begin:
106+
raise psycopg2.ProgrammingError(
107+
'You are trying to open a new transaction, use the save point')
108+
self._is_begin = True
109+
yield from self._cur.execute(self._isolation.begin())
110+
return self
111+
112+
@asyncio.coroutine
113+
def commit(self):
114+
self._check_commit_rollback()
115+
yield from self._cur.execute(self._isolation.commit())
116+
self._is_begin = False
117+
118+
@asyncio.coroutine
119+
def rollback(self):
120+
self._check_commit_rollback()
121+
yield from self._cur.execute(self._isolation.rollback())
122+
self._is_begin = False
123+
124+
@asyncio.coroutine
125+
def rollback_savepoint(self):
126+
self._check_release_rollback()
127+
yield from self._cur.execute(
128+
self._isolation.rollback_savepoint(self._unique_id))
129+
self._unique_id = None
130+
131+
@asyncio.coroutine
132+
def release_savepoint(self):
133+
self._check_release_rollback()
134+
yield from self._cur.execute(
135+
self._isolation.release_savepoint(self._unique_id))
136+
self._unique_id = None
137+
138+
@asyncio.coroutine
139+
def savepoint(self):
140+
self._check_commit_rollback()
141+
if self._unique_id is not None:
142+
raise psycopg2.ProgrammingError('You do not shut down savepoint')
143+
144+
self._unique_id = 's{}'.format(uuid.uuid1().hex)
145+
yield from self._cur.execute(
146+
self._isolation.savepoint(self._unique_id))
147+
148+
return self
149+
150+
def point(self):
151+
return _TransactionPointContextManager(self.savepoint())
152+
153+
def _check_commit_rollback(self):
154+
if not self._is_begin:
155+
raise psycopg2.ProgrammingError('You are trying to commit '
156+
'the transaction does not open')
157+
158+
def _check_release_rollback(self):
159+
self._check_commit_rollback()
160+
if self._unique_id is None:
161+
raise psycopg2.ProgrammingError('You do not start savepoint')
162+
163+
def __repr__(self):
164+
return "<{} transaction={} id={:#x}>".format(
165+
self.__class__.__name__,
166+
self._isolation,
167+
id(self)
168+
)
169+
170+
def __del__(self):
171+
if self._is_begin:
172+
warnings.warn(
173+
"You have not closed transaction {!r}".format(self),
174+
ResourceWarning)
175+
176+
if self._unique_id is not None:
177+
warnings.warn(
178+
"You have not closed savepoint {!r}".format(self),
179+
ResourceWarning)
180+
181+
if PY_35:
182+
@asyncio.coroutine
183+
def __aenter__(self):
184+
return (yield from self.begin())
185+
186+
@asyncio.coroutine
187+
def __aexit__(self, exc_type, exc, tb):
188+
if exc_type is not None:
189+
yield from self.rollback()
190+
else:
191+
yield from self.commit()

aiopg/utils.py

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
import asyncio
22
import sys
33

4-
54
PY_35 = sys.version_info >= (3, 5)
65
PY_352 = sys.version_info >= (3, 5, 2)
76

87
if PY_35:
98
from collections.abc import Coroutine
9+
1010
base = Coroutine
1111
else:
1212
base = object
1313

14-
1514
try:
1615
ensure_future = asyncio.ensure_future
1716
except AttributeError:
@@ -26,7 +25,6 @@ def create_future(loop):
2625

2726

2827
class _ContextManager(base):
29-
3028
__slots__ = ('_coro', '_obj')
3129

3230
def __init__(self, coro):
@@ -84,7 +82,6 @@ def __aexit__(self, exc_type, exc, tb):
8482

8583

8684
class _SAConnectionContextManager(_ContextManager):
87-
8885
if PY_35: # pragma: no branch
8986
if PY_352:
9087
def __aiter__(self):
@@ -97,7 +94,6 @@ def __aiter__(self):
9794

9895

9996
class _PoolContextManager(_ContextManager):
100-
10197
if PY_35:
10298
@asyncio.coroutine
10399
def __aexit__(self, exc_type, exc, tb):
@@ -106,8 +102,33 @@ def __aexit__(self, exc_type, exc, tb):
106102
self._obj = None
107103

108104

109-
class _TransactionContextManager(_ContextManager):
105+
class _TransactionPointContextManager(_ContextManager):
106+
if PY_35:
107+
108+
@asyncio.coroutine
109+
def __aexit__(self, exc_type, exc_val, exc_tb):
110+
if exc_type is not None:
111+
yield from self._obj.rollback_savepoint()
112+
else:
113+
yield from self._obj.release_savepoint()
114+
115+
self._obj = None
110116

117+
118+
class _TransactionBeginContextManager(_ContextManager):
119+
if PY_35:
120+
121+
@asyncio.coroutine
122+
def __aexit__(self, exc_type, exc_val, exc_tb):
123+
if exc_type is not None:
124+
yield from self._obj.rollback()
125+
else:
126+
yield from self._obj.commit()
127+
128+
self._obj = None
129+
130+
131+
class _TransactionContextManager(_ContextManager):
111132
if PY_35:
112133

113134
@asyncio.coroutine
@@ -121,7 +142,6 @@ def __aexit__(self, exc_type, exc, tb):
121142

122143

123144
class _PoolAcquireContextManager(_ContextManager):
124-
125145
__slots__ = ('_coro', '_conn', '_pool')
126146

127147
def __init__(self, coro, pool):
@@ -228,6 +248,7 @@ def __exit__(self, *args):
228248
if not PY_35:
229249
try:
230250
from asyncio import coroutines
251+
231252
coroutines._COROUTINE_TYPES += (_ContextManager,)
232253
except:
233254
pass

0 commit comments

Comments
 (0)