Skip to content

Commit dfe7620

Browse files
committed
read message from real servre
1 parent 89ebf8d commit dfe7620

File tree

6 files changed

+68
-6
lines changed

6 files changed

+68
-6
lines changed

tests/conftest.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,17 @@ async def driver(endpoint, database, event_loop):
100100

101101

102102
@pytest.fixture()
103-
def topic_path(endpoint) -> str:
103+
def topic_consumer():
104+
return "fixture-consumer"
105+
106+
107+
@pytest.fixture()
108+
def topic_path(endpoint, topic_consumer) -> str:
104109
subprocess.run(
105110
"""docker-compose exec -T ydb /ydb -e grpc://%s -d /local topic drop /local/test-topic"""
106111
% endpoint,
107112
shell=True,
113+
capture_output=True,
108114
)
109115
res = subprocess.run(
110116
"""docker-compose exec -T ydb /ydb -e grpc://%s -d /local topic create /local/test-topic"""
@@ -114,4 +120,24 @@ def topic_path(endpoint) -> str:
114120
)
115121
assert res.returncode == 0, res.stderr + res.stdout
116122

123+
res = subprocess.run(
124+
"""docker-compose exec -T ydb /ydb -e grpc://%s -d /local topic consumer add --consumer %s /local/test-topic"""
125+
% (endpoint, topic_consumer),
126+
shell=True,
127+
capture_output=True,
128+
)
129+
assert res.returncode == 0, res.stderr + res.stdout
130+
117131
return "/local/test-topic"
132+
133+
134+
@pytest.fixture()
135+
@pytest.mark.asyncio()
136+
async def topic_with_messages(driver, topic_path):
137+
pass
138+
writer = driver.topic_client.topic_writer(topic_path, producer_and_message_group_id="fixture-producer-id")
139+
await writer.write_with_ack(
140+
ydb.TopicWriterMessage(data="123".encode()),
141+
ydb.TopicWriterMessage(data="456".encode()),
142+
)
143+
await writer.close()

tests/topics/test_topic_reader.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import pytest
2+
3+
from ydb._topic_reader.topic_reader_asyncio import PublicAsyncIOReader
4+
from ydb import TopicReaderSettings
5+
6+
7+
@pytest.mark.asyncio
8+
class TestTopicWriterAsyncIO:
9+
async def test_read_message(self, driver, topic_path, topic_with_messages, topic_consumer):
10+
reader = PublicAsyncIOReader(driver, TopicReaderSettings(
11+
consumer=topic_consumer,
12+
topic=topic_path,
13+
))
14+
await reader.wait_messages()
15+
16+
assert reader.receive_batch() is not None

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ def __init__(self, driver: Driver, settings: PublicReaderSettings):
3232
self._loop = asyncio.get_running_loop()
3333
self._reconnector = ReaderReconnector(driver, settings)
3434

35+
async def wait_messages(self):
36+
await self._reconnector.wait_message()
37+
38+
def receive_batch(self):
39+
return self._reconnector.receive_batch_nowait()
40+
3541

3642
class ReaderReconnector:
3743
_settings: PublicReaderSettings
@@ -58,6 +64,7 @@ async def wait_message(self):
5864
while True:
5965
if self._stream_reader is not None:
6066
await self._stream_reader.wait_messages()
67+
return
6168

6269
await self._state_changed.wait()
6370
self._state_changed.clear()

ydb/_topic_wrapper/common.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,11 +179,14 @@ async def _start_sync_driver(self, driver: ydb.Driver, stub, method):
179179

180180
async def receive(self) -> typing.Any:
181181
# todo handle grpc exceptions and convert it to internal exceptions
182-
grpc_item = await self.from_server_grpc.__anext__()
183-
return self.convert_server_grpc_to_wrapper(grpc_item)
182+
grpc_message = await self.from_server_grpc.__anext__()
183+
# print("rekby, grpc, received", grpc_message)
184+
return self.convert_server_grpc_to_wrapper(grpc_message)
184185

185186
def write(self, wrap_message: IToProto):
186-
self.from_client_grpc.put_nowait(wrap_message.to_proto())
187+
grpc_message=wrap_message.to_proto()
188+
# print("rekby, grpc, send", grpc_message)
189+
self.from_client_grpc.put_nowait(grpc_message)
187190

188191

189192
@dataclass(init=False)

ydb/_topic_wrapper/reader.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,12 @@ def __init__(self, client_message: "ReaderMessagesFromClientToServer"):
227227

228228
def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.FromClient:
229229
res = ydb_topic_pb2.StreamReadMessage.FromClient()
230-
if isinstance(self.client_message, StreamReadMessage.InitRequest):
230+
if isinstance(self.client_message, StreamReadMessage.ReadRequest):
231+
res.read_request.CopyFrom(self.client_message.to_proto())
232+
elif isinstance(self.client_message, StreamReadMessage.InitRequest):
231233
res.init_request.CopyFrom(self.client_message.to_proto())
234+
elif isinstance(self.client_message, StreamReadMessage.StartPartitionSessionResponse):
235+
res.start_partition_session_response.CopyFrom(self.client_message.to_proto())
232236
else:
233237
raise NotImplementedError()
234238
return res
@@ -242,12 +246,16 @@ def from_proto(msg: ydb_topic_pb2.StreamReadMessage.FromServer) -> "StreamReadMe
242246
mess_type = msg.WhichOneof("server_message")
243247
if mess_type == "read_response":
244248
return StreamReadMessage.FromServer(
245-
server_message=StreamReadMessage.ReadResponse.from_proto(msg.init_response)
249+
server_message=StreamReadMessage.ReadResponse.from_proto(msg.read_response)
246250
)
247251
elif mess_type == "init_response":
248252
return StreamReadMessage.FromServer(
249253
server_message=StreamReadMessage.InitResponse.from_proto(msg.init_response),
250254
)
255+
elif mess_type == "start_partition_session_request":
256+
return StreamReadMessage.FromServer(
257+
server_message=StreamReadMessage.StartPartitionSessionRequest.from_proto(msg.start_partition_session_request)
258+
)
251259

252260
# todo replace exception to log
253261
raise NotImplementedError()

ydb/topic.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from . import aio, Credentials
44
from ._topic_reader.topic_reader import (
5+
PublicReaderSettings as TopicReaderSettings,
56
Reader as TopicReader,
67
ReaderAsyncIO as TopicReaderAsyncIO,
78
Selector as TopicSelector,
@@ -18,6 +19,7 @@
1819
RetryPolicy as TopicWriterRetryPolicy,
1920
)
2021

22+
2123
from ydb._topic_writer.topic_writer_asyncio import WriterAsyncIO as TopicWriterAsyncIO
2224

2325

0 commit comments

Comments
 (0)