Skip to content

Commit 3545a78

Browse files
authored
Merge pull request #201 Deny split transactions by default (v3)
2 parents 8e00a22 + 7bcee51 commit 3545a78

File tree

6 files changed

+233
-7
lines changed

6 files changed

+233
-7
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* BROKEN CHANGE: deny any action in transaction after commit/rollback
2+
13
## 3.0.1b6 ##
24
* BROKEN CHANGES: remove writer.write(mess1, mess2) variant, use list instead: writer.write([mess1, mess2])
35
* BROKEN CHANGES: change names of public method in topic client

tests/aio/test_tx.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ async def test_tx_snapshot_ro(driver, database):
8585

8686
await ro_tx.commit()
8787

88+
ro_tx = session.transaction(tx_mode=ydb.SnapshotReadOnly())
8889
with pytest.raises(ydb.issues.GenericError) as exc_info:
8990
await ro_tx.execute("UPDATE `test` SET value = value + 1")
9091
assert "read only transaction" in exc_info.value.message
@@ -94,3 +95,64 @@ async def test_tx_snapshot_ro(driver, database):
9495
commit_tx=True,
9596
)
9697
assert data[0].rows == [{"value": 2}]
98+
99+
100+
@pytest.mark.asyncio
101+
async def test_split_transactions_deny_split(driver, table_name):
102+
async with ydb.aio.SessionPool(driver, 1) as pool:
103+
104+
async def check_transaction(s: ydb.aio.table.Session):
105+
async with s.transaction(deny_split_transactions=True) as tx:
106+
await tx.execute("INSERT INTO %s (id) VALUES (1)" % table_name)
107+
await tx.commit()
108+
109+
with pytest.raises(RuntimeError):
110+
await tx.execute("INSERT INTO %s (id) VALUES (2)" % table_name)
111+
112+
await tx.commit()
113+
114+
async with s.transaction() as tx:
115+
rs = await tx.execute("SELECT COUNT(*) as cnt FROM %s" % table_name)
116+
assert rs[0].rows[0].cnt == 1
117+
118+
await pool.retry_operation(check_transaction)
119+
120+
121+
@pytest.mark.asyncio
122+
async def test_split_transactions_allow_split(driver, table_name):
123+
async with ydb.aio.SessionPool(driver, 1) as pool:
124+
125+
async def check_transaction(s: ydb.aio.table.Session):
126+
async with s.transaction(deny_split_transactions=False) as tx:
127+
await tx.execute("INSERT INTO %s (id) VALUES (1)" % table_name)
128+
await tx.commit()
129+
130+
await tx.execute("INSERT INTO %s (id) VALUES (2)" % table_name)
131+
await tx.commit()
132+
133+
async with s.transaction() as tx:
134+
rs = await tx.execute("SELECT COUNT(*) as cnt FROM %s" % table_name)
135+
assert rs[0].rows[0].cnt == 2
136+
137+
await pool.retry_operation(check_transaction)
138+
139+
140+
@pytest.mark.asyncio
141+
async def test_split_transactions_default(driver, table_name):
142+
async with ydb.aio.SessionPool(driver, 1) as pool:
143+
144+
async def check_transaction(s: ydb.aio.table.Session):
145+
async with s.transaction() as tx:
146+
await tx.execute("INSERT INTO %s (id) VALUES (1)" % table_name)
147+
await tx.commit()
148+
149+
with pytest.raises(RuntimeError):
150+
await tx.execute("INSERT INTO %s (id) VALUES (2)" % table_name)
151+
152+
await tx.commit()
153+
154+
async with s.transaction() as tx:
155+
rs = await tx.execute("SELECT COUNT(*) as cnt FROM %s" % table_name)
156+
assert rs[0].rows[0].cnt == 1
157+
158+
await pool.retry_operation(check_transaction)

tests/conftest.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,33 @@ async def driver_sync(endpoint, database, event_loop):
105105
driver.stop(timeout=10)
106106

107107

108+
@pytest.fixture()
109+
def table_name(driver_sync, database):
110+
table_name = "table"
111+
112+
with ydb.SessionPool(driver_sync) as pool:
113+
114+
def create_table(s):
115+
try:
116+
s.drop_table(database + "/" + table_name)
117+
except ydb.SchemeError:
118+
pass
119+
120+
s.execute_scheme(
121+
"""
122+
CREATE TABLE %s (
123+
id Int64 NOT NULL,
124+
i64Val Int64,
125+
PRIMARY KEY(id)
126+
)
127+
"""
128+
% table_name
129+
)
130+
131+
pool.retry_operation_sync(create_table)
132+
return table_name
133+
134+
108135
@pytest.fixture()
109136
def topic_consumer():
110137
return "fixture-consumer"

tests/table/test_tx.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ def test_tx_snapshot_ro(driver_sync, database):
8080

8181
ro_tx.commit()
8282

83+
ro_tx = session.transaction(tx_mode=ydb.SnapshotReadOnly())
8384
with pytest.raises(ydb.issues.GenericError) as exc_info:
8485
ro_tx.execute("UPDATE `test` SET value = value + 1")
8586
assert "read only transaction" in exc_info.value.message
@@ -89,3 +90,61 @@ def test_tx_snapshot_ro(driver_sync, database):
8990
commit_tx=True,
9091
)
9192
assert data[0].rows == [{"value": 2}]
93+
94+
95+
def test_split_transactions_deny_split(driver_sync, table_name):
96+
with ydb.SessionPool(driver_sync, 1) as pool:
97+
98+
def check_transaction(s: ydb.table.Session):
99+
with s.transaction(deny_split_transactions=True) as tx:
100+
tx.execute("INSERT INTO %s (id) VALUES (1)" % table_name)
101+
tx.commit()
102+
103+
with pytest.raises(RuntimeError):
104+
tx.execute("INSERT INTO %s (id) VALUES (2)" % table_name)
105+
106+
tx.commit()
107+
108+
with s.transaction() as tx:
109+
rs = tx.execute("SELECT COUNT(*) as cnt FROM %s" % table_name)
110+
assert rs[0].rows[0].cnt == 1
111+
112+
pool.retry_operation_sync(check_transaction)
113+
114+
115+
def test_split_transactions_allow_split(driver_sync, table_name):
116+
with ydb.SessionPool(driver_sync, 1) as pool:
117+
118+
def check_transaction(s: ydb.table.Session):
119+
with s.transaction(deny_split_transactions=False) as tx:
120+
tx.execute("INSERT INTO %s (id) VALUES (1)" % table_name)
121+
tx.commit()
122+
123+
tx.execute("INSERT INTO %s (id) VALUES (2)" % table_name)
124+
tx.commit()
125+
126+
with s.transaction() as tx:
127+
rs = tx.execute("SELECT COUNT(*) as cnt FROM %s" % table_name)
128+
assert rs[0].rows[0].cnt == 2
129+
130+
pool.retry_operation_sync(check_transaction)
131+
132+
133+
def test_split_transactions_default(driver_sync, table_name):
134+
with ydb.SessionPool(driver_sync, 1) as pool:
135+
136+
def check_transaction(s: ydb.table.Session):
137+
with s.transaction() as tx:
138+
tx.execute("INSERT INTO %s (id) VALUES (1)" % table_name)
139+
tx.commit()
140+
141+
with pytest.raises(RuntimeError):
142+
tx.execute("INSERT INTO %s (id) VALUES (2)" % table_name)
143+
144+
tx.commit()
145+
146+
with s.transaction() as tx:
147+
rs = tx.execute("SELECT COUNT(*) as cnt FROM %s" % table_name)
148+
assert rs[0].rows[0].cnt == 1
149+
150+
pool.retry_operation_sync(check_transaction)

ydb/aio/table.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,14 @@ async def alter_table(
120120
set_read_replicas_settings,
121121
)
122122

123-
def transaction(self, tx_mode=None):
124-
return TxContext(self._driver, self._state, self, tx_mode)
123+
def transaction(self, tx_mode=None, *, deny_split_transactions=True):
124+
return TxContext(
125+
self._driver,
126+
self._state,
127+
self,
128+
tx_mode,
129+
deny_split_transactions=deny_split_transactions,
130+
)
125131

126132
async def describe_table(self, path, settings=None): # pylint: disable=W0236
127133
return await super().describe_table(path, settings)
@@ -184,6 +190,9 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
184190
async def execute(
185191
self, query, parameters=None, commit_tx=False, settings=None
186192
): # pylint: disable=W0236
193+
194+
self._check_split()
195+
187196
return await super().execute(query, parameters, commit_tx, settings)
188197

189198
async def commit(self, settings=None): # pylint: disable=W0236

ydb/table.py

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1173,7 +1173,7 @@ def execute_scheme(self, yql_text, settings=None):
11731173
pass
11741174

11751175
@abstractmethod
1176-
def transaction(self, tx_mode=None):
1176+
def transaction(self, tx_mode=None, deny_split_transactions=True):
11771177
pass
11781178

11791179
@abstractmethod
@@ -1677,8 +1677,14 @@ def execute_scheme(self, yql_text, settings=None):
16771677
self._state.endpoint,
16781678
)
16791679

1680-
def transaction(self, tx_mode=None):
1681-
return TxContext(self._driver, self._state, self, tx_mode)
1680+
def transaction(self, tx_mode=None, deny_split_transactions=True):
1681+
return TxContext(
1682+
self._driver,
1683+
self._state,
1684+
self,
1685+
tx_mode,
1686+
deny_split_transactions=deny_split_transactions,
1687+
)
16821688

16831689
def has_prepared(self, query):
16841690
return query in self._state
@@ -2189,9 +2195,27 @@ def begin(self, settings=None):
21892195

21902196

21912197
class BaseTxContext(ITxContext):
2192-
__slots__ = ("_tx_state", "_session_state", "_driver", "session")
2198+
__slots__ = (
2199+
"_tx_state",
2200+
"_session_state",
2201+
"_driver",
2202+
"session",
2203+
"_finished",
2204+
"_deny_split_transactions",
2205+
)
21932206

2194-
def __init__(self, driver, session_state, session, tx_mode=None):
2207+
_COMMIT = "commit"
2208+
_ROLLBACK = "rollback"
2209+
2210+
def __init__(
2211+
self,
2212+
driver,
2213+
session_state,
2214+
session,
2215+
tx_mode=None,
2216+
*,
2217+
deny_split_transactions=True
2218+
):
21952219
"""
21962220
An object that provides a simple transaction context manager that allows statements execution
21972221
in a transaction. You don't have to open transaction explicitly, because context manager encapsulates
@@ -2214,6 +2238,8 @@ def __init__(self, driver, session_state, session, tx_mode=None):
22142238
self._tx_state = _tx_ctx_impl.TxState(tx_mode)
22152239
self._session_state = session_state
22162240
self.session = session
2241+
self._finished = ""
2242+
self._deny_split_transactions = deny_split_transactions
22172243

22182244
def __enter__(self):
22192245
"""
@@ -2271,6 +2297,9 @@ def execute(self, query, parameters=None, commit_tx=False, settings=None):
22712297
22722298
:return: A result sets or exception in case of execution errors
22732299
"""
2300+
2301+
self._check_split()
2302+
22742303
return self._driver(
22752304
_tx_ctx_impl.execute_request_factory(
22762305
self._session_state,
@@ -2297,8 +2326,12 @@ def commit(self, settings=None):
22972326
22982327
:return: A committed transaction or exception if commit is failed
22992328
"""
2329+
2330+
self._set_finish(self._COMMIT)
2331+
23002332
if self._tx_state.tx_id is None and not self._tx_state.dead:
23012333
return self
2334+
23022335
return self._driver(
23032336
_tx_ctx_impl.commit_request_factory(self._session_state, self._tx_state),
23042337
_apis.TableService.Stub,
@@ -2318,8 +2351,12 @@ def rollback(self, settings=None):
23182351
23192352
:return: A rolled back transaction or exception if rollback is failed
23202353
"""
2354+
2355+
self._set_finish(self._ROLLBACK)
2356+
23212357
if self._tx_state.tx_id is None and not self._tx_state.dead:
23222358
return self
2359+
23232360
return self._driver(
23242361
_tx_ctx_impl.rollback_request_factory(self._session_state, self._tx_state),
23252362
_apis.TableService.Stub,
@@ -2340,6 +2377,9 @@ def begin(self, settings=None):
23402377
"""
23412378
if self._tx_state.tx_id is not None:
23422379
return self
2380+
2381+
self._check_split()
2382+
23432383
return self._driver(
23442384
_tx_ctx_impl.begin_request_factory(self._session_state, self._tx_state),
23452385
_apis.TableService.Stub,
@@ -2350,6 +2390,21 @@ def begin(self, settings=None):
23502390
self._session_state.endpoint,
23512391
)
23522392

2393+
def _set_finish(self, val):
2394+
self._check_split(val)
2395+
self._finished = val
2396+
2397+
def _check_split(self, allow=""):
2398+
"""
2399+
Deny all operaions with transaction after commit/rollback.
2400+
Exception: double commit and double rollbacks, because it is safe
2401+
"""
2402+
if not self._deny_split_transactions:
2403+
return
2404+
2405+
if self._finished != "" and self._finished != allow:
2406+
raise RuntimeError("Any operation with finished transaction is denied")
2407+
23532408

23542409
class TxContext(BaseTxContext):
23552410
@_utilities.wrap_async_call_exceptions
@@ -2365,6 +2420,9 @@ def async_execute(self, query, parameters=None, commit_tx=False, settings=None):
23652420
23662421
:return: A future of query execution
23672422
"""
2423+
2424+
self._check_split()
2425+
23682426
return self._driver.future(
23692427
_tx_ctx_impl.execute_request_factory(
23702428
self._session_state,
@@ -2396,8 +2454,11 @@ def async_commit(self, settings=None):
23962454
23972455
:return: A future of commit call
23982456
"""
2457+
self._set_finish(self._COMMIT)
2458+
23992459
if self._tx_state.tx_id is None and not self._tx_state.dead:
24002460
return _utilities.wrap_result_in_future(self)
2461+
24012462
return self._driver.future(
24022463
_tx_ctx_impl.commit_request_factory(self._session_state, self._tx_state),
24032464
_apis.TableService.Stub,
@@ -2418,8 +2479,11 @@ def async_rollback(self, settings=None):
24182479
24192480
:return: A future of rollback call
24202481
"""
2482+
self._set_finish(self._ROLLBACK)
2483+
24212484
if self._tx_state.tx_id is None and not self._tx_state.dead:
24222485
return _utilities.wrap_result_in_future(self)
2486+
24232487
return self._driver.future(
24242488
_tx_ctx_impl.rollback_request_factory(self._session_state, self._tx_state),
24252489
_apis.TableService.Stub,
@@ -2441,6 +2505,9 @@ def async_begin(self, settings=None):
24412505
"""
24422506
if self._tx_state.tx_id is not None:
24432507
return _utilities.wrap_result_in_future(self)
2508+
2509+
self._check_split()
2510+
24442511
return self._driver.future(
24452512
_tx_ctx_impl.begin_request_factory(self._session_state, self._tx_state),
24462513
_apis.TableService.Stub,

0 commit comments

Comments
 (0)