Skip to content

Commit 5d32e96

Browse files
committed
Non stream CommitOffset feature
1 parent d980334 commit 5d32e96

File tree

4 files changed

+75
-0
lines changed

4 files changed

+75
-0
lines changed

tests/topics/test_topic_reader.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,32 @@ def test_read_and_commit_with_ack(self, driver_sync, topic_with_messages, topic_
183183

184184
assert message != batch.messages[0]
185185

186+
def test_reader_fine_with_no_stream_commits(self, driver_sync, topic_with_messages, topic_consumer):
187+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
188+
for out in ["123", "456", "789", "0"]:
189+
message = reader.receive_message()
190+
assert message.data.decode() == out
191+
192+
driver_sync.topic_client.commit_offsets(
193+
topic_with_messages,
194+
topic_consumer,
195+
0,
196+
message._commit_end_offset
197+
)
198+
199+
def test_no_stream_commits_works(self, driver_sync, topic_with_messages, topic_consumer):
200+
for out in ["123", "456", "789", "0"]:
201+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
202+
message = reader.receive_message()
203+
assert message.data.decode() == out
204+
205+
driver_sync.topic_client.commit_offsets(
206+
topic_with_messages,
207+
topic_consumer,
208+
0,
209+
message._commit_end_offset
210+
)
211+
186212
def test_read_compressed_messages(self, driver_sync, topic_path, topic_consumer):
187213
with driver_sync.topic_client.writer(topic_path, codec=ydb.TopicCodec.GZIP) as writer:
188214
writer.write("123")

ydb/_apis.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ class TopicService(object):
117117
StreamRead = "StreamRead"
118118
StreamWrite = "StreamWrite"
119119
UpdateOffsetsInTransaction = "UpdateOffsetsInTransaction"
120+
CommitOffset = "CommitOffset"
120121

121122

122123
class QueryService(object):

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,22 @@ def from_proto(msg: ydb_topic_pb2.UpdateTokenResponse) -> typing.Any:
137137
return UpdateTokenResponse()
138138

139139

140+
@dataclass
141+
class CommitOffsetRequest(IToProto):
142+
path: str
143+
consumer: str
144+
partition_id: int
145+
offset: int
146+
147+
def to_proto(self) -> ydb_topic_pb2.CommitOffsetRequest:
148+
return ydb_topic_pb2.CommitOffsetRequest(
149+
path=self.path,
150+
consumer=self.consumer,
151+
partition_id=self.partition_id,
152+
offset=self.offset,
153+
)
154+
155+
140156
########################################################################################################################
141157
# StreamWrite
142158
########################################################################################################################

ydb/topic.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,23 @@ def tx_writer(
317317

318318
return TopicTxWriterAsyncIO(tx=tx, driver=self._driver, settings=settings, _client=self)
319319

320+
async def commit_offsets(self, path: str, consumer: str, partition_id: int, offset: int) -> None:
321+
req = _ydb_topic.CommitOffsetRequest(
322+
path=path,
323+
consumer=consumer,
324+
partition_id=partition_id,
325+
offset=offset,
326+
)
327+
328+
await self._driver(
329+
req.to_proto(),
330+
_apis.TopicService.Stub,
331+
_apis.TopicService.CommitOffset,
332+
_wrap_operation,
333+
)
334+
335+
336+
320337
def close(self):
321338
if self._closed:
322339
return
@@ -563,6 +580,21 @@ def tx_writer(
563580

564581
return TopicTxWriter(tx, self._driver, settings, _parent=self)
565582

583+
def commit_offsets(self, path: str, consumer: str, partition_id: int, offset: int) -> None:
584+
req = _ydb_topic.CommitOffsetRequest(
585+
path=path,
586+
consumer=consumer,
587+
partition_id=partition_id,
588+
offset=offset,
589+
)
590+
591+
self._driver(
592+
req.to_proto(),
593+
_apis.TopicService.Stub,
594+
_apis.TopicService.CommitOffset,
595+
_wrap_operation,
596+
)
597+
566598
def close(self):
567599
if self._closed:
568600
return

0 commit comments

Comments
 (0)