Skip to content

Commit 5b17cc2

Browse files
committed
tx reader and writer based on callbacks
1 parent 3579d4a commit 5b17cc2

File tree

15 files changed

+537
-146
lines changed

15 files changed

+537
-146
lines changed

.github/workflows/tests.yaml

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,15 @@ jobs:
1818
fail-fast: false
1919
matrix:
2020
python-version: [3.8, 3.9]
21-
environment: [py-proto5, py-tls-proto5, py-proto4, py-tls-proto4, py-proto3, py-tls-proto3]
22-
folder: [ydb, tests --ignore=tests/topics, tests/topics]
21+
environment: [py, py-tls, py-proto4, py-tls-proto4, py-proto3, py-tls-proto3]
22+
folder: [ydb, tests]
2323
exclude:
24-
- environment: py-tls-proto5
24+
- environment: py-tls
2525
folder: ydb
2626
- environment: py-tls-proto4
2727
folder: ydb
2828
- environment: py-tls-proto3
2929
folder: ydb
30-
- environment: py-tls-proto5
31-
folder: tests/topics
32-
- environment: py-tls-proto4
33-
folder: tests/topics
34-
- environment: py-tls-proto3
35-
folder: tests/topics
3630

3731
steps:
3832
- uses: actions/checkout@v1
Lines changed: 126 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,69 @@
11
import asyncio
22
from asyncio import wait_for
33
import pytest
4+
from unittest import mock
45
import ydb
56

7+
DEFAULT_TIMEOUT = 0.1
8+
DEFAULT_RETRY_SETTINGS = ydb.RetrySettings(max_retries=1)
9+
610

7-
@pytest.mark.skip("Not implemented yet.")
811
@pytest.mark.asyncio
912
class TestTopicTransactionalReader:
1013
async def test_commit(self, driver: ydb.aio.Driver, topic_with_messages, topic_consumer):
11-
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
12-
async with ydb.aio.QuerySessionPool(driver) as pool:
14+
async with ydb.aio.QuerySessionPool(driver) as pool:
15+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
1316

1417
async def callee(tx: ydb.aio.QueryTxContext):
15-
batch = await wait_for(reader.receive_batch_with_tx(tx, max_messages=1), 1)
16-
assert len(batch) == 1
17-
assert batch[0].data.decode() == "123"
18+
batch = await wait_for(reader.receive_batch_with_tx(tx, max_messages=1), DEFAULT_TIMEOUT)
19+
assert len(batch.messages) == 1
20+
assert batch.messages[0].data.decode() == "123"
1821

19-
await pool.retry_tx_async(callee)
22+
await pool.retry_tx_async(callee, retry_settings=DEFAULT_RETRY_SETTINGS)
2023

21-
msg = await wait_for(reader.receive_message(), 1)
24+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
25+
msg = await wait_for(reader.receive_message(), DEFAULT_TIMEOUT)
2226
assert msg.data.decode() == "456"
2327

2428
async def test_rollback(self, driver: ydb.aio.Driver, topic_with_messages, topic_consumer):
2529
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
2630
async with ydb.aio.QuerySessionPool(driver) as pool:
2731

2832
async def callee(tx: ydb.aio.QueryTxContext):
29-
batch = await wait_for(reader.receive_batch_with_tx(tx, max_messages=1), 1)
30-
assert len(batch) == 1
31-
assert batch[0].data.decode() == "123"
33+
batch = await wait_for(reader.receive_batch_with_tx(tx, max_messages=1), DEFAULT_TIMEOUT)
34+
assert len(batch.messages) == 1
35+
assert batch.messages[0].data.decode() == "123"
3236

3337
await tx.rollback()
3438

35-
await pool.retry_tx_async(callee)
39+
await pool.retry_tx_async(callee, retry_settings=DEFAULT_RETRY_SETTINGS)
3640

37-
msg = await wait_for(reader.receive_message(), 1)
41+
msg = await wait_for(reader.receive_message(), DEFAULT_TIMEOUT)
3842
assert msg.data.decode() == "123"
3943

44+
async def test_tx_failed_if_update_offsets_call_failed(
45+
self, driver: ydb.aio.Driver, topic_with_messages, topic_consumer
46+
):
47+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
48+
async with ydb.aio.QuerySessionPool(driver) as pool:
49+
with mock.patch.object(
50+
reader._reconnector,
51+
"_do_commit_batches_with_tx_call",
52+
side_effect=ydb.Error("Update offsets in tx failed"),
53+
):
54+
55+
async def callee(tx: ydb.aio.QueryTxContext):
56+
batch = await wait_for(reader.receive_batch_with_tx(tx, max_messages=1), DEFAULT_TIMEOUT)
57+
assert len(batch.messages) == 1
58+
assert batch.messages[0].data.decode() == "123"
59+
60+
with pytest.raises(ydb.Error):
61+
await pool.retry_tx_async(callee, retry_settings=DEFAULT_RETRY_SETTINGS)
62+
63+
msg = await wait_for(reader.receive_message(), DEFAULT_TIMEOUT)
64+
assert msg.data.decode() == "123"
65+
4066

41-
# @pytest.mark.skip("Not implemented yet.")
4267
class TestTopicTransactionalWriter:
4368
async def test_commit(self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO):
4469
async with ydb.aio.QuerySessionPool(driver) as pool:
@@ -47,7 +72,7 @@ async def callee(tx: ydb.aio.QueryTxContext):
4772
tx_writer = driver.topic_client.tx_writer(tx, topic_path)
4873
await tx_writer.write(ydb.TopicWriterMessage(data="123".encode()))
4974

50-
await pool.retry_tx_async(callee)
75+
await pool.retry_tx_async(callee, retry_settings=DEFAULT_RETRY_SETTINGS)
5176

5277
msg = await wait_for(topic_reader.receive_message(), 0.1)
5378
assert msg.data.decode() == "123"
@@ -61,7 +86,92 @@ async def callee(tx: ydb.aio.QueryTxContext):
6186

6287
await tx.rollback()
6388

64-
await pool.retry_tx_async(callee)
89+
await pool.retry_tx_async(callee, retry_settings=DEFAULT_RETRY_SETTINGS)
90+
91+
with pytest.raises(asyncio.TimeoutError):
92+
await wait_for(topic_reader.receive_message(), 0.1)
93+
94+
async def test_no_msg_written_in_error_case(
95+
self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO
96+
):
97+
async with ydb.aio.QuerySessionPool(driver) as pool:
98+
99+
async def callee(tx: ydb.aio.QueryTxContext):
100+
tx_writer = driver.topic_client.tx_writer(tx, topic_path)
101+
await tx_writer.write(ydb.TopicWriterMessage(data="123".encode()))
102+
103+
raise BaseException("error")
104+
105+
with pytest.raises(BaseException):
106+
await pool.retry_tx_async(callee, retry_settings=DEFAULT_RETRY_SETTINGS)
65107

66108
with pytest.raises(asyncio.TimeoutError):
67109
await wait_for(topic_reader.receive_message(), 0.1)
110+
111+
async def test_msg_written_exactly_once_with_retries(
112+
self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO
113+
):
114+
error_raised = False
115+
async with ydb.aio.QuerySessionPool(driver) as pool:
116+
117+
async def callee(tx: ydb.aio.QueryTxContext):
118+
nonlocal error_raised
119+
tx_writer = driver.topic_client.tx_writer(tx, topic_path)
120+
await tx_writer.write(ydb.TopicWriterMessage(data="123".encode()))
121+
122+
if not error_raised:
123+
error_raised = True
124+
raise ydb.issues.Unavailable("some retriable error")
125+
126+
await pool.retry_tx_async(callee, retry_settings=DEFAULT_RETRY_SETTINGS)
127+
128+
msg = await wait_for(topic_reader.receive_message(), 0.1)
129+
assert msg.data.decode() == "123"
130+
131+
with pytest.raises(asyncio.TimeoutError):
132+
await wait_for(topic_reader.receive_message(), 0.1)
133+
134+
135+
class TestTopicTransactionalWriterSync:
136+
def test_commit(self, driver_sync: ydb.Driver, topic_path, topic_reader_sync: ydb.TopicReader):
137+
with ydb.QuerySessionPool(driver_sync) as pool:
138+
139+
def callee(tx: ydb.QueryTxContext):
140+
tx_writer = driver_sync.topic_client.tx_writer(tx, topic_path)
141+
tx_writer.write(ydb.TopicWriterMessage(data="123".encode()))
142+
143+
pool.retry_tx_sync(callee, retry_settings=ydb.RetrySettings(max_retries=1))
144+
145+
msg = topic_reader_sync.receive_message(timeout=0.1)
146+
assert msg.data.decode() == "123"
147+
148+
def test_rollback(self, driver_sync: ydb.aio.Driver, topic_path, topic_reader_sync: ydb.TopicReader):
149+
with ydb.QuerySessionPool(driver_sync) as pool:
150+
151+
def callee(tx: ydb.QueryTxContext):
152+
tx_writer = driver_sync.topic_client.tx_writer(tx, topic_path)
153+
tx_writer.write(ydb.TopicWriterMessage(data="123".encode()))
154+
155+
tx.rollback()
156+
157+
pool.retry_tx_sync(callee, retry_settings=DEFAULT_RETRY_SETTINGS)
158+
159+
with pytest.raises(TimeoutError):
160+
topic_reader_sync.receive_message(timeout=0.1)
161+
162+
def test_no_msg_written_in_error_case(
163+
self, driver_sync: ydb.Driver, topic_path, topic_reader_sync: ydb.TopicReaderAsyncIO
164+
):
165+
with ydb.QuerySessionPool(driver_sync) as pool:
166+
167+
def callee(tx: ydb.QueryTxContext):
168+
tx_writer = driver_sync.topic_client.tx_writer(tx, topic_path)
169+
tx_writer.write(ydb.TopicWriterMessage(data="123".encode()))
170+
171+
raise BaseException("error")
172+
173+
with pytest.raises(BaseException):
174+
pool.retry_tx_sync(callee, retry_settings=DEFAULT_RETRY_SETTINGS)
175+
176+
with pytest.raises(TimeoutError):
177+
topic_reader_sync.receive_message(timeout=0.1)

tox.ini

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[tox]
2-
envlist = py-proto5,py-proto4,py-proto3,py-tls-proto5,py-tls-proto4,py-tls-proto3,style,pylint,black,protoc,py-cov-proto4
2+
envlist = py,py-proto4,py-proto3,py-tls,py-tls-proto4,py-tls-proto3,style,pylint,black,protoc,py-cov-proto4
33
minversion = 4.2.6
44
skipsdist = True
55
ignore_basepython_conflict = true
@@ -30,7 +30,7 @@ deps =
3030
-r{toxinidir}/test-requirements.txt
3131
protobuf<4.0.0
3232

33-
[testenv:py-proto5]
33+
[testenv:py]
3434
commands =
3535
pytest -v -m "not tls" --docker-compose-remove-volumes --docker-compose=docker-compose.yml {posargs}
3636
deps =
@@ -60,7 +60,7 @@ deps =
6060
-r{toxinidir}/test-requirements.txt
6161
protobuf<4.0.0
6262

63-
[testenv:py-tls-proto5]
63+
[testenv:py-tls]
6464
commands =
6565
pytest -v -m tls --docker-compose-remove-volumes --docker-compose=docker-compose-tls.yml {posargs}
6666
deps =

ydb/_apis.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ class TopicService(object):
116116
DropTopic = "DropTopic"
117117
StreamRead = "StreamRead"
118118
StreamWrite = "StreamWrite"
119+
UpdateOffsetsInTransaction = "UpdateOffsetsInTransaction"
119120

120121

121122
class QueryService(object):

ydb/_errors.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
_errors_retriable_fast_backoff_types = [
77
issues.Unavailable,
8+
issues.ClientInternalError,
89
]
910
_errors_retriable_slow_backoff_types = [
1011
issues.Aborted,

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,6 +1209,52 @@ def to_public(self) -> ydb_topic_public_types.PublicMeteringMode:
12091209
return ydb_topic_public_types.PublicMeteringMode.UNSPECIFIED
12101210

12111211

1212+
@dataclass
1213+
class UpdateOffsetsInTransactionRequest(IToProto):
1214+
tx: TransactionIdentity
1215+
topics: List[UpdateOffsetsInTransactionRequest.TopicOffsets]
1216+
consumer: str
1217+
1218+
def to_proto(self):
1219+
return ydb_topic_pb2.UpdateOffsetsInTransactionRequest(
1220+
tx=self.tx.to_proto(),
1221+
consumer=self.consumer,
1222+
topics=list(
1223+
map(
1224+
UpdateOffsetsInTransactionRequest.TopicOffsets.to_proto,
1225+
self.topics,
1226+
)
1227+
),
1228+
)
1229+
1230+
@dataclass
1231+
class TopicOffsets(IToProto):
1232+
path: str
1233+
partitions: List[UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets]
1234+
1235+
def to_proto(self):
1236+
return ydb_topic_pb2.UpdateOffsetsInTransactionRequest.TopicOffsets(
1237+
path=self.path,
1238+
partitions=list(
1239+
map(
1240+
UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets.to_proto,
1241+
self.partitions,
1242+
)
1243+
),
1244+
)
1245+
1246+
@dataclass
1247+
class PartitionOffsets(IToProto):
1248+
partition_id: int
1249+
partition_offsets: List[OffsetsRange]
1250+
1251+
def to_proto(self) -> ydb_topic_pb2.UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets:
1252+
return ydb_topic_pb2.UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets(
1253+
partition_id=self.partition_id,
1254+
partition_offsets=list(map(OffsetsRange.to_proto, self.partition_offsets)),
1255+
)
1256+
1257+
12121258
@dataclass
12131259
class CreateTopicRequest(IToProto, IFromPublic):
12141260
path: str

ydb/_topic_reader/datatypes.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ def ack_notify(self, offset: int):
108108
waiter = self._ack_waiters.popleft()
109109
waiter._finish_ok()
110110

111+
def _update_last_commited_offset_if_needed(self, offset: int):
112+
self.committed_offset = max(self.committed_offset, offset)
113+
111114
def close(self):
112115
if self.closed:
113116
return
@@ -211,3 +214,9 @@ def _pop_batch(self, message_count: int) -> PublicBatch:
211214
self._bytes_size = self._bytes_size - new_batch._bytes_size
212215

213216
return new_batch
217+
218+
def _update_partition_offsets(self, tx, exc=None):
219+
if exc is not None:
220+
return
221+
offsets = self._commit_get_offsets_range()
222+
self._partition_session._update_last_commited_offset_if_needed(offsets.end)

0 commit comments

Comments
 (0)