Skip to content

Commit 0d71887

Browse files
authored
Merge pull request #154 Reader topic commit
2 parents 3c131d0 + 2334f35 commit 0d71887

File tree

12 files changed

+917
-171
lines changed

12 files changed

+917
-171
lines changed

tests/conftest.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ async def topic_with_messages(driver, topic_path):
136136
ydb.TopicWriterMessage(data="123".encode()),
137137
ydb.TopicWriterMessage(data="456".encode()),
138138
)
139+
await writer.write_with_ack(
140+
ydb.TopicWriterMessage(data="789".encode()),
141+
ydb.TopicWriterMessage(data="0".encode()),
142+
)
139143
await writer.close()
140144

141145

tests/topics/test_topic_reader.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,18 @@ async def test_read_message(
1111
assert await reader.receive_batch() is not None
1212
await reader.close()
1313

14+
async def test_read_and_commit_message(
15+
self, driver, topic_path, topic_with_messages, topic_consumer
16+
):
17+
18+
reader = driver.topic_client.reader(topic_consumer, topic_path)
19+
batch = await reader.receive_batch()
20+
await reader.commit_with_ack(batch)
21+
22+
reader = driver.topic_client.reader(topic_consumer, topic_path)
23+
batch2 = await reader.receive_batch()
24+
assert batch.messages[0] != batch2.messages[0]
25+
1426

1527
class TestTopicReaderSync:
1628
def test_read_message(
@@ -20,3 +32,14 @@ def test_read_message(
2032

2133
assert reader.receive_batch() is not None
2234
reader.close()
35+
36+
def test_read_and_commit_message(
37+
self, driver_sync, topic_path, topic_with_messages, topic_consumer
38+
):
39+
reader = driver_sync.topic_client.reader(topic_consumer, topic_path)
40+
batch = reader.receive_batch()
41+
reader.commit_with_ack(batch)
42+
43+
reader = driver_sync.topic_client.reader(topic_consumer, topic_path)
44+
batch2 = reader.receive_batch()
45+
assert batch.messages[0] != batch2.messages[0]

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 98 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,23 @@ def to_public(self) -> List[ydb_topic_public_types.PublicCodec]:
6767
return list(map(Codec.to_public, self.codecs))
6868

6969

70-
@dataclass
71-
class OffsetsRange(IFromProto):
72-
start: int
73-
end: int
70+
@dataclass(order=True)
71+
class OffsetsRange(IFromProto, IToProto):
72+
"""
73+
half-opened interval, include [start, end) offsets
74+
"""
75+
76+
__slots__ = ("start", "end")
77+
78+
start: int # first offset
79+
end: int # offset after last, included to range
80+
81+
def __post_init__(self):
82+
if self.end < self.start:
83+
raise ValueError(
84+
"offset end must be not less then start. Got start=%s end=%s"
85+
% (self.start, self.end)
86+
)
7487

7588
@staticmethod
7689
def from_proto(msg: ydb_topic_pb2.OffsetsRange) -> "OffsetsRange":
@@ -79,6 +92,20 @@ def from_proto(msg: ydb_topic_pb2.OffsetsRange) -> "OffsetsRange":
7992
end=msg.end,
8093
)
8194

95+
def to_proto(self) -> ydb_topic_pb2.OffsetsRange:
96+
return ydb_topic_pb2.OffsetsRange(
97+
start=self.start,
98+
end=self.end,
99+
)
100+
101+
def is_intersected_with(self, other: "OffsetsRange") -> bool:
102+
return (
103+
self.start <= other.start < self.end
104+
or self.start < other.end <= self.end
105+
or other.start <= self.start < other.end
106+
or other.start < self.end <= other.end
107+
)
108+
82109

83110
@dataclass
84111
class UpdateTokenRequest(IToProto):
@@ -527,23 +554,67 @@ def from_proto(
527554
)
528555

529556
@dataclass
530-
class CommitOffsetRequest:
557+
class CommitOffsetRequest(IToProto):
531558
commit_offsets: List["PartitionCommitOffset"]
532559

560+
def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.CommitOffsetRequest:
561+
res = ydb_topic_pb2.StreamReadMessage.CommitOffsetRequest(
562+
commit_offsets=list(
563+
map(
564+
StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.to_proto,
565+
self.commit_offsets,
566+
)
567+
),
568+
)
569+
return res
570+
533571
@dataclass
534-
class PartitionCommitOffset:
572+
class PartitionCommitOffset(IToProto):
535573
partition_session_id: int
536574
offsets: List["OffsetsRange"]
537575

576+
def to_proto(
577+
self,
578+
) -> ydb_topic_pb2.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset:
579+
res = ydb_topic_pb2.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset(
580+
partition_session_id=self.partition_session_id,
581+
offsets=list(map(OffsetsRange.to_proto, self.offsets)),
582+
)
583+
return res
584+
538585
@dataclass
539-
class CommitOffsetResponse:
540-
partitions_committed_offsets: List["PartitionCommittedOffset"]
586+
class CommitOffsetResponse(IFromProto):
587+
partitions_committed_offsets: List[
588+
"StreamReadMessage.CommitOffsetResponse.PartitionCommittedOffset"
589+
]
590+
591+
@staticmethod
592+
def from_proto(
593+
msg: ydb_topic_pb2.StreamReadMessage.CommitOffsetResponse,
594+
) -> "StreamReadMessage.CommitOffsetResponse":
595+
return StreamReadMessage.CommitOffsetResponse(
596+
partitions_committed_offsets=list(
597+
map(
598+
StreamReadMessage.CommitOffsetResponse.PartitionCommittedOffset.from_proto,
599+
msg.partitions_committed_offsets,
600+
)
601+
)
602+
)
541603

542604
@dataclass
543-
class PartitionCommittedOffset:
605+
class PartitionCommittedOffset(IFromProto):
544606
partition_session_id: int
545607
committed_offset: int
546608

609+
@staticmethod
610+
def from_proto(
611+
msg: ydb_topic_pb2.StreamReadMessage.CommitOffsetResponse.PartitionCommittedOffset,
612+
) -> "StreamReadMessage.CommitOffsetResponse.PartitionCommittedOffset":
613+
return StreamReadMessage.CommitOffsetResponse.PartitionCommittedOffset(
614+
partition_session_id=msg.partition_session_id,
615+
committed_offset=msg.committed_offset,
616+
)
617+
547618
@dataclass
548619
class PartitionSessionStatusRequest:
549620
partition_session_id: int
@@ -576,16 +647,18 @@ def from_proto(
576647
@dataclass
577648
class StartPartitionSessionResponse(IToProto):
578649
partition_session_id: int
579-
read_offset: int
580-
commit_offset: int
650+
read_offset: Optional[int]
651+
commit_offset: Optional[int]
581652

582653
def to_proto(
583654
self,
584655
) -> ydb_topic_pb2.StreamReadMessage.StartPartitionSessionResponse:
585656
res = ydb_topic_pb2.StreamReadMessage.StartPartitionSessionResponse()
586657
res.partition_session_id = self.partition_session_id
587-
res.read_offset = self.read_offset
588-
res.commit_offset = self.commit_offset
658+
if self.read_offset is not None:
659+
res.read_offset = self.read_offset
660+
if self.commit_offset is not None:
661+
res.commit_offset = self.commit_offset
589662
return res
590663

591664
@dataclass
@@ -609,6 +682,8 @@ def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.FromClient:
609682
res = ydb_topic_pb2.StreamReadMessage.FromClient()
610683
if isinstance(self.client_message, StreamReadMessage.ReadRequest):
611684
res.read_request.CopyFrom(self.client_message.to_proto())
685+
elif isinstance(self.client_message, StreamReadMessage.CommitOffsetRequest):
686+
res.commit_offset_request.CopyFrom(self.client_message.to_proto())
612687
elif isinstance(self.client_message, StreamReadMessage.InitRequest):
613688
res.init_request.CopyFrom(self.client_message.to_proto())
614689
elif isinstance(
@@ -618,7 +693,9 @@ def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.FromClient:
618693
self.client_message.to_proto()
619694
)
620695
else:
621-
raise NotImplementedError()
696+
raise NotImplementedError(
697+
"Unknown message type: %s" % type(self.client_message)
698+
)
622699
return res
623700

624701
@dataclass
@@ -639,6 +716,13 @@ def from_proto(
639716
msg.read_response
640717
),
641718
)
719+
elif mess_type == "commit_offset_response":
720+
return StreamReadMessage.FromServer(
721+
server_status=server_status,
722+
server_message=StreamReadMessage.CommitOffsetResponse.from_proto(
723+
msg.commit_offset_response
724+
),
725+
)
642726
elif mess_type == "init_response":
643727
return StreamReadMessage.FromServer(
644728
server_status=server_status,
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from ydb._grpc.grpcwrapper.ydb_topic import OffsetsRange
2+
3+
4+
def test_offsets_range_intersected():
5+
# not intersected
6+
for test in [(0, 1, 1, 2), (1, 2, 3, 5)]:
7+
assert not OffsetsRange(test[0], test[1]).is_intersected_with(
8+
OffsetsRange(test[2], test[3])
9+
)
10+
assert not OffsetsRange(test[2], test[3]).is_intersected_with(
11+
OffsetsRange(test[0], test[1])
12+
)
13+
14+
# intersected
15+
for test in [
16+
(1, 2, 1, 2),
17+
(1, 10, 1, 2),
18+
(1, 10, 2, 3),
19+
(1, 10, 5, 15),
20+
(10, 20, 5, 15),
21+
]:
22+
assert OffsetsRange(test[0], test[1]).is_intersected_with(
23+
OffsetsRange(test[2], test[3])
24+
)
25+
assert OffsetsRange(test[2], test[3]).is_intersected_with(
26+
OffsetsRange(test[0], test[1])
27+
)

ydb/_topic_common/test_helpers.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,38 @@ def close(self):
3939
self.from_server.put_nowait(None)
4040

4141

42-
async def wait_condition(f: typing.Callable[[], bool], timeout=1):
42+
class WaitConditionError(Exception):
43+
pass
44+
45+
46+
async def wait_condition(
47+
f: typing.Callable[[], bool],
48+
timeout: typing.Optional[typing.Union[float, int]] = None,
49+
):
50+
"""
51+
timeout default is 1 second
52+
if timeout is 0 - only counter work. It userful if test need fast timeout for condition (without wait full timeout)
53+
"""
54+
if timeout is None:
55+
timeout = 1
56+
57+
minimal_loop_count_for_wait = 1000
58+
4359
start = time.monotonic()
4460
counter = 0
45-
while (time.monotonic() - start < timeout) or counter < 1000:
61+
while (time.monotonic() - start < timeout) or counter < minimal_loop_count_for_wait:
4662
counter += 1
4763
if f():
4864
return
4965
await asyncio.sleep(0)
5066

51-
raise Exception("Bad condition in test")
67+
raise WaitConditionError("Bad condition in test")
5268

5369

54-
async def wait_for_fast(fut):
55-
return await asyncio.wait_for(fut, 1)
70+
async def wait_for_fast(
71+
awaitable: typing.Awaitable,
72+
timeout: typing.Optional[typing.Union[float, int]] = None,
73+
):
74+
fut = asyncio.ensure_future(awaitable)
75+
await wait_condition(lambda: fut.done(), timeout)
76+
return fut.result()

0 commit comments

Comments
 (0)