Skip to content

Commit e1ea1f4

Browse files
committed
remove difficult variants of send message
fix examples add wait container timeout - for tests on m1
1 parent d42d7c1 commit e1ea1f4

File tree

7 files changed

+98
-37
lines changed

7 files changed

+98
-37
lines changed

examples/topic/writer_async_example.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ async def connect_without_context_manager(db: ydb.aio.Driver):
3737
async def send_messages(writer: ydb.TopicWriterAsyncIO):
3838
# simple str/bytes without additional metadata
3939
await writer.write("mess") # send text
40-
await writer.write(bytes([1, 2, 3])) # send bytes
41-
await writer.write("mess-1", "mess-2") # send two messages
40+
await writer.write(bytes([1, 2, 3])) # send single message with bytes 1,2,3
41+
await writer.write(["mess-1", "mess-2"]) # send two messages
4242

4343
# full forms
4444
await writer.write(ydb.TopicWriterMessage("mess")) # send text
4545
await writer.write(ydb.TopicWriterMessage(bytes([1, 2, 3]))) # send bytes
46-
await writer.write(
46+
await writer.write([
4747
ydb.TopicWriterMessage("mess-1"), ydb.TopicWriterMessage("mess-2")
48-
) # send few messages by one call
48+
]) # send few messages by one call
4949

5050
# with meta
5151
await writer.write(
@@ -71,12 +71,12 @@ async def send_messages_with_manual_seqno(writer: ydb.TopicWriter):
7171

7272
async def send_messages_with_wait_ack(writer: ydb.TopicWriterAsyncIO):
7373
# future wait
74-
await writer.write_with_result(
74+
await writer.write_with_result([
7575
ydb.TopicWriterMessage("mess", seqno=1), ydb.TopicWriterMessage("mess", seqno=2)
76-
)
76+
])
7777

7878
# send with flush
79-
await writer.write("1", "2", "3")
79+
await writer.write(["1", "2", "3"])
8080
await writer.flush()
8181

8282

examples/topic/writer_example.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,21 +44,21 @@ def connect_without_context_manager(db: ydb.Driver):
4444
try:
4545
pass # some code
4646
finally:
47-
await writer.close()
47+
writer.close()
4848

4949

5050
def send_messages(writer: ydb.TopicWriter):
5151
# simple str/bytes without additional metadata
5252
writer.write("mess") # send text
53-
writer.write(bytes([1, 2, 3])) # send bytes
54-
writer.write("mess-1", "mess-2") # send two messages
53+
writer.write(bytes([1, 2, 3])) # send single message with bytes 1,2,3
54+
writer.write(["mess-1", "mess-2"]) # send two messages
5555

5656
# full forms
5757
writer.write(ydb.TopicWriterMessage("mess")) # send text
5858
writer.write(ydb.TopicWriterMessage(bytes([1, 2, 3]))) # send bytes
59-
writer.write(
59+
writer.write([
6060
ydb.TopicWriterMessage("mess-1"), ydb.TopicWriterMessage("mess-2")
61-
) # send few messages by one call
61+
]) # send few messages by one call
6262

6363
# with meta
6464
writer.write(ydb.TopicWriterMessage("asd", seqno=123, created_at_ns=time.time_ns()))
@@ -87,13 +87,13 @@ def send_messages_with_wait_ack(writer: ydb.TopicWriter):
8787
).result()
8888

8989
# implicit, by sync call
90-
writer.write_with_ack(
90+
writer.write_with_ack([
9191
ydb.TopicWriterMessage("mess", seqno=1), ydb.TopicWriterMessage("mess", seqno=2)
92-
)
92+
])
9393
# write_with_ack
9494

9595
# send with flush
96-
writer.write("1", "2", "3")
96+
writer.write(["1", "2", "3"])
9797
writer.flush()
9898

9999

tests/conftest.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ def docker_compose_file(pytestconfig):
1212

1313

1414
def wait_container_ready(driver):
15-
driver.wait(timeout=10)
15+
driver.wait(timeout=30)
1616

1717
with ydb.SessionPool(driver) as pool:
1818

@@ -133,12 +133,16 @@ async def topic_path(driver, topic_consumer, database) -> str:
133133
async def topic_with_messages(driver, topic_path):
134134
writer = driver.topic_client.writer(topic_path, producer_id="fixture-producer-id")
135135
await writer.write_with_ack(
136-
ydb.TopicWriterMessage(data="123".encode()),
137-
ydb.TopicWriterMessage(data="456".encode()),
136+
[
137+
ydb.TopicWriterMessage(data="123".encode()),
138+
ydb.TopicWriterMessage(data="456".encode()),
139+
]
138140
)
139141
await writer.write_with_ack(
140-
ydb.TopicWriterMessage(data="789".encode()),
141-
ydb.TopicWriterMessage(data="0".encode()),
142+
[
143+
ydb.TopicWriterMessage(data="789".encode()),
144+
ydb.TopicWriterMessage(data="0".encode()),
145+
]
142146
)
143147
await writer.close()
144148

tests/topics/test_topic_writer.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,30 @@ async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path):
6060
init_info = await writer.wait_init()
6161
assert init_info.last_seqno == last_seqno
6262

63+
async def test_write_multi_message_with_ack(
64+
self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO
65+
):
66+
async with driver.topic_client.writer(topic_path) as writer:
67+
await writer.write_with_ack(
68+
[
69+
ydb.TopicWriterMessage(data="123".encode()),
70+
ydb.TopicWriterMessage(data="456".encode()),
71+
]
72+
)
73+
74+
batch = await topic_reader.receive_batch()
75+
76+
assert batch.messages[0].offset == 0
77+
assert batch.messages[0].seqno == 1
78+
assert batch.messages[0].data == "123".encode()
79+
80+
# remove second recieve batch when implement batching
81+
# https://github.com/ydb-platform/ydb-python-sdk/issues/142
82+
batch = await topic_reader.receive_batch()
83+
assert batch.messages[0].offset == 1
84+
assert batch.messages[0].seqno == 2
85+
assert batch.messages[0].data == "456".encode()
86+
6387

6488
class TestTopicWriterSync:
6589
def test_send_message(self, driver_sync: ydb.Driver, topic_path):
@@ -115,3 +139,27 @@ def test_random_producer_id(
115139
batch2 = topic_reader_sync.receive_batch()
116140

117141
assert batch1.messages[0].producer_id != batch2.messages[0].producer_id
142+
143+
def test_write_multi_message_with_ack(
144+
self, driver_sync: ydb.Driver, topic_path, topic_reader_sync: ydb.TopicReader
145+
):
146+
with driver_sync.topic_client.writer(topic_path) as writer:
147+
writer.write_with_ack(
148+
[
149+
ydb.TopicWriterMessage(data="123".encode()),
150+
ydb.TopicWriterMessage(data="456".encode()),
151+
]
152+
)
153+
154+
batch = topic_reader_sync.receive_batch()
155+
156+
assert batch.messages[0].offset == 0
157+
assert batch.messages[0].seqno == 1
158+
assert batch.messages[0].data == "123".encode()
159+
160+
# remove second recieve batch when implement batching
161+
# https://github.com/ydb-platform/ydb-python-sdk/issues/142
162+
batch = topic_reader_sync.receive_batch()
163+
assert batch.messages[0].offset == 1
164+
assert batch.messages[0].seqno == 2
165+
assert batch.messages[0].data == "456".encode()

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ async def receive_batch(
115115
*,
116116
max_messages: typing.Union[int, None] = None,
117117
max_bytes: typing.Union[int, None] = None,
118-
) -> typing.Union[topic_reader.PublicBatch, None]:
118+
) -> typing.Union[datatypes.PublicBatch, None]:
119119
"""
120120
Get one messages batch from reader.
121121
All messages in a batch from same partition.
@@ -243,7 +243,8 @@ def commit(
243243
return self._stream_reader.commit(batch)
244244

245245
async def close(self):
246-
await self._stream_reader.close()
246+
if self._stream_reader:
247+
await self._stream_reader.close()
247248
for task in self._background_tasks:
248249
task.cancel()
249250

ydb/_topic_writer/topic_writer.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import uuid
44
from dataclasses import dataclass
55
from enum import Enum
6-
from typing import List, Union, TextIO, BinaryIO, Optional, Any, Dict
6+
from typing import List, Union, Optional, Any, Dict
77

88
import typing
99

@@ -92,9 +92,9 @@ class PublicWriterInitInfo:
9292
class PublicMessage:
9393
seqno: Optional[int]
9494
created_at: Optional[datetime.datetime]
95-
data: Union[str, bytes, TextIO, BinaryIO]
95+
data: "PublicMessage.SimpleMessageSourceType"
9696

97-
SimpleMessageSourceType = Union[str, bytes, TextIO, BinaryIO]
97+
SimpleMessageSourceType = Union[str, bytes] # Will be extend
9898

9999
def __init__(
100100
self,
@@ -107,6 +107,14 @@ def __init__(
107107
self.created_at = created_at
108108
self.data = data
109109

110+
@staticmethod
111+
def _create_message(
112+
data: Union["PublicMessage", "PublicMessage.SimpleMessageSourceType"]
113+
) -> "PublicMessage":
114+
if isinstance(data, PublicMessage):
115+
return data
116+
return PublicMessage(data=data)
117+
110118

111119
class InternalMessage(StreamWriteMessage.WriteRequest.MessageData, IToProto):
112120
def __init__(self, mess: PublicMessage):

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
import datetime
33
from collections import deque
4-
from typing import Deque, AsyncIterator, Union, List, Optional
4+
from typing import Deque, AsyncIterator, Union, List
55

66
import ydb
77
from .topic_writer import (
@@ -76,7 +76,6 @@ async def close(self, *, flush: bool = True):
7676
async def write_with_ack(
7777
self,
7878
messages: Union[MessageType, List[MessageType]],
79-
*args: Optional[MessageType],
8079
) -> Union[PublicWriteResultTypes, List[PublicWriteResultTypes]]:
8180
"""
8281
IT IS SLOWLY WAY. IT IS BAD CHOISE IN MOST CASES.
@@ -86,7 +85,7 @@ async def write_with_ack(
8685
8786
For wait with timeout use asyncio.wait_for.
8887
"""
89-
futures = await self.write_with_ack_future(messages, *args)
88+
futures = await self.write_with_ack_future(messages)
9089
if not isinstance(futures, list):
9190
futures = [futures]
9291

@@ -98,7 +97,6 @@ async def write_with_ack(
9897
async def write_with_ack_future(
9998
self,
10099
messages: Union[MessageType, List[MessageType]],
101-
*args: Optional[MessageType],
102100
) -> Union[asyncio.Future, List[asyncio.Future]]:
103101
"""
104102
send one or number of messages to server.
@@ -108,20 +106,22 @@ async def write_with_ack_future(
108106
109107
For wait with timeout use asyncio.wait_for.
110108
"""
109+
input_single_message = not isinstance(messages, list)
111110
if isinstance(messages, PublicMessage):
112-
futures = await self._reconnector.write_with_ack_future([messages])
113-
return futures[0]
111+
messages = [PublicMessage._create_message(messages)]
114112
if isinstance(messages, list):
115-
for m in messages:
116-
if not isinstance(m, PublicMessage):
117-
raise NotImplementedError()
118-
return await self._reconnector.write_with_ack_future(messages)
119-
raise NotImplementedError()
113+
for index, m in enumerate(messages):
114+
messages[index] = PublicMessage._create_message(m)
115+
116+
futures = await self._reconnector.write_with_ack_future(messages)
117+
if input_single_message:
118+
return futures[0]
119+
else:
120+
return futures
120121

121122
async def write(
122123
self,
123124
messages: Union[MessageType, List[MessageType]],
124-
*args: Optional[MessageType],
125125
):
126126
"""
127127
send one or number of messages to server.

0 commit comments

Comments
 (0)