Skip to content

Commit 0d24cb9

Browse files
authored
Merge pull request #197 remove difficult variants of send message
2 parents d42d7c1 + b9a9a4b commit 0d24cb9

File tree

10 files changed

+107
-42
lines changed

10 files changed

+107
-42
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* BROKEN CHANGES: remove writer.write(mess1, mess2) variant, use list instead: writer.write([mess1, mess2])
12
* BROKEN CHANGES: change names of public method in topic client
23
* BROKEN CHANGES: rename parameter producer_and_message_group_id to producer_id
34
* producer_id is optional now

examples/topic/reader_async_example.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ def process_batch(batch):
123123
# no reason work with expired batch
124124
# go read next - good batch
125125
return
126-
await _process(message)
127-
await reader.commit(batch)
126+
_process(message)
127+
reader.commit(batch)
128128

129129
async for batch in reader.batches():
130130
process_batch(batch)

examples/topic/writer_async_example.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@ async def connect_without_context_manager(db: ydb.aio.Driver):
3737
async def send_messages(writer: ydb.TopicWriterAsyncIO):
3838
# simple str/bytes without additional metadata
3939
await writer.write("mess") # send text
40-
await writer.write(bytes([1, 2, 3])) # send bytes
41-
await writer.write("mess-1", "mess-2") # send two messages
40+
await writer.write(bytes([1, 2, 3])) # send single message with bytes 1,2,3
41+
await writer.write(["mess-1", "mess-2"]) # send two messages
4242

4343
# full forms
4444
await writer.write(ydb.TopicWriterMessage("mess")) # send text
4545
await writer.write(ydb.TopicWriterMessage(bytes([1, 2, 3]))) # send bytes
4646
await writer.write(
47-
ydb.TopicWriterMessage("mess-1"), ydb.TopicWriterMessage("mess-2")
47+
[ydb.TopicWriterMessage("mess-1"), ydb.TopicWriterMessage("mess-2")]
4848
) # send few messages by one call
4949

5050
# with meta
@@ -72,11 +72,14 @@ async def send_messages_with_manual_seqno(writer: ydb.TopicWriter):
7272
async def send_messages_with_wait_ack(writer: ydb.TopicWriterAsyncIO):
7373
# future wait
7474
await writer.write_with_result(
75-
ydb.TopicWriterMessage("mess", seqno=1), ydb.TopicWriterMessage("mess", seqno=2)
75+
[
76+
ydb.TopicWriterMessage("mess", seqno=1),
77+
ydb.TopicWriterMessage("mess", seqno=2),
78+
]
7679
)
7780

7881
# send with flush
79-
await writer.write("1", "2", "3")
82+
await writer.write(["1", "2", "3"])
8083
await writer.flush()
8184

8285

examples/topic/writer_example.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,20 +44,20 @@ def connect_without_context_manager(db: ydb.Driver):
4444
try:
4545
pass # some code
4646
finally:
47-
await writer.close()
47+
writer.close()
4848

4949

5050
def send_messages(writer: ydb.TopicWriter):
5151
# simple str/bytes without additional metadata
5252
writer.write("mess") # send text
53-
writer.write(bytes([1, 2, 3])) # send bytes
54-
writer.write("mess-1", "mess-2") # send two messages
53+
writer.write(bytes([1, 2, 3])) # send single message with bytes 1,2,3
54+
writer.write(["mess-1", "mess-2"]) # send two messages
5555

5656
# full forms
5757
writer.write(ydb.TopicWriterMessage("mess")) # send text
5858
writer.write(ydb.TopicWriterMessage(bytes([1, 2, 3]))) # send bytes
5959
writer.write(
60-
ydb.TopicWriterMessage("mess-1"), ydb.TopicWriterMessage("mess-2")
60+
[ydb.TopicWriterMessage("mess-1"), ydb.TopicWriterMessage("mess-2")]
6161
) # send few messages by one call
6262

6363
# with meta
@@ -88,12 +88,15 @@ def send_messages_with_wait_ack(writer: ydb.TopicWriter):
8888

8989
# implicit, by sync call
9090
writer.write_with_ack(
91-
ydb.TopicWriterMessage("mess", seqno=1), ydb.TopicWriterMessage("mess", seqno=2)
91+
[
92+
ydb.TopicWriterMessage("mess", seqno=1),
93+
ydb.TopicWriterMessage("mess", seqno=2),
94+
]
9295
)
9396
# write_with_ack
9497

9598
# send with flush
96-
writer.write("1", "2", "3")
99+
writer.write(["1", "2", "3"])
97100
writer.flush()
98101

99102

tests/conftest.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ def docker_compose_file(pytestconfig):
1212

1313

1414
def wait_container_ready(driver):
15-
driver.wait(timeout=10)
15+
driver.wait(timeout=30)
1616

1717
with ydb.SessionPool(driver) as pool:
1818

@@ -133,12 +133,16 @@ async def topic_path(driver, topic_consumer, database) -> str:
133133
async def topic_with_messages(driver, topic_path):
134134
writer = driver.topic_client.writer(topic_path, producer_id="fixture-producer-id")
135135
await writer.write_with_ack(
136-
ydb.TopicWriterMessage(data="123".encode()),
137-
ydb.TopicWriterMessage(data="456".encode()),
136+
[
137+
ydb.TopicWriterMessage(data="123".encode()),
138+
ydb.TopicWriterMessage(data="456".encode()),
139+
]
138140
)
139141
await writer.write_with_ack(
140-
ydb.TopicWriterMessage(data="789".encode()),
141-
ydb.TopicWriterMessage(data="0".encode()),
142+
[
143+
ydb.TopicWriterMessage(data="789".encode()),
144+
ydb.TopicWriterMessage(data="0".encode()),
145+
]
142146
)
143147
await writer.close()
144148

tests/topics/test_topic_writer.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,30 @@ async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path):
6060
init_info = await writer.wait_init()
6161
assert init_info.last_seqno == last_seqno
6262

63+
async def test_write_multi_message_with_ack(
64+
self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO
65+
):
66+
async with driver.topic_client.writer(topic_path) as writer:
67+
await writer.write_with_ack(
68+
[
69+
ydb.TopicWriterMessage(data="123".encode()),
70+
ydb.TopicWriterMessage(data="456".encode()),
71+
]
72+
)
73+
74+
batch = await topic_reader.receive_batch()
75+
76+
assert batch.messages[0].offset == 0
77+
assert batch.messages[0].seqno == 1
78+
assert batch.messages[0].data == "123".encode()
79+
80+
# remove second recieve batch when implement batching
81+
# https://github.com/ydb-platform/ydb-python-sdk/issues/142
82+
batch = await topic_reader.receive_batch()
83+
assert batch.messages[0].offset == 1
84+
assert batch.messages[0].seqno == 2
85+
assert batch.messages[0].data == "456".encode()
86+
6387

6488
class TestTopicWriterSync:
6589
def test_send_message(self, driver_sync: ydb.Driver, topic_path):
@@ -115,3 +139,27 @@ def test_random_producer_id(
115139
batch2 = topic_reader_sync.receive_batch()
116140

117141
assert batch1.messages[0].producer_id != batch2.messages[0].producer_id
142+
143+
def test_write_multi_message_with_ack(
144+
self, driver_sync: ydb.Driver, topic_path, topic_reader_sync: ydb.TopicReader
145+
):
146+
with driver_sync.topic_client.writer(topic_path) as writer:
147+
writer.write_with_ack(
148+
[
149+
ydb.TopicWriterMessage(data="123".encode()),
150+
ydb.TopicWriterMessage(data="456".encode()),
151+
]
152+
)
153+
154+
batch = topic_reader_sync.receive_batch()
155+
156+
assert batch.messages[0].offset == 0
157+
assert batch.messages[0].seqno == 1
158+
assert batch.messages[0].data == "123".encode()
159+
160+
# remove second recieve batch when implement batching
161+
# https://github.com/ydb-platform/ydb-python-sdk/issues/142
162+
batch = topic_reader_sync.receive_batch()
163+
assert batch.messages[0].offset == 1
164+
assert batch.messages[0].seqno == 2
165+
assert batch.messages[0].data == "456".encode()

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ async def receive_batch(
115115
*,
116116
max_messages: typing.Union[int, None] = None,
117117
max_bytes: typing.Union[int, None] = None,
118-
) -> typing.Union[topic_reader.PublicBatch, None]:
118+
) -> typing.Union[datatypes.PublicBatch, None]:
119119
"""
120120
Get one messages batch from reader.
121121
All messages in a batch from same partition.
@@ -243,7 +243,8 @@ def commit(
243243
return self._stream_reader.commit(batch)
244244

245245
async def close(self):
246-
await self._stream_reader.close()
246+
if self._stream_reader:
247+
await self._stream_reader.close()
247248
for task in self._background_tasks:
248249
task.cancel()
249250

ydb/_topic_writer/topic_writer.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import uuid
44
from dataclasses import dataclass
55
from enum import Enum
6-
from typing import List, Union, TextIO, BinaryIO, Optional, Any, Dict
6+
from typing import List, Union, Optional, Any, Dict
77

88
import typing
99

@@ -92,9 +92,9 @@ class PublicWriterInitInfo:
9292
class PublicMessage:
9393
seqno: Optional[int]
9494
created_at: Optional[datetime.datetime]
95-
data: Union[str, bytes, TextIO, BinaryIO]
95+
data: "PublicMessage.SimpleMessageSourceType"
9696

97-
SimpleMessageSourceType = Union[str, bytes, TextIO, BinaryIO]
97+
SimpleMessageSourceType = Union[str, bytes] # Will be extend
9898

9999
def __init__(
100100
self,
@@ -107,6 +107,14 @@ def __init__(
107107
self.created_at = created_at
108108
self.data = data
109109

110+
@staticmethod
111+
def _create_message(
112+
data: Union["PublicMessage", "PublicMessage.SimpleMessageSourceType"]
113+
) -> "PublicMessage":
114+
if isinstance(data, PublicMessage):
115+
return data
116+
return PublicMessage(data=data)
117+
110118

111119
class InternalMessage(StreamWriteMessage.WriteRequest.MessageData, IToProto):
112120
def __init__(self, mess: PublicMessage):

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
import datetime
33
from collections import deque
4-
from typing import Deque, AsyncIterator, Union, List, Optional
4+
from typing import Deque, AsyncIterator, Union, List
55

66
import ydb
77
from .topic_writer import (
@@ -76,7 +76,6 @@ async def close(self, *, flush: bool = True):
7676
async def write_with_ack(
7777
self,
7878
messages: Union[MessageType, List[MessageType]],
79-
*args: Optional[MessageType],
8079
) -> Union[PublicWriteResultTypes, List[PublicWriteResultTypes]]:
8180
"""
8281
IT IS SLOWLY WAY. IT IS BAD CHOISE IN MOST CASES.
@@ -86,7 +85,7 @@ async def write_with_ack(
8685
8786
For wait with timeout use asyncio.wait_for.
8887
"""
89-
futures = await self.write_with_ack_future(messages, *args)
88+
futures = await self.write_with_ack_future(messages)
9089
if not isinstance(futures, list):
9190
futures = [futures]
9291

@@ -98,7 +97,6 @@ async def write_with_ack(
9897
async def write_with_ack_future(
9998
self,
10099
messages: Union[MessageType, List[MessageType]],
101-
*args: Optional[MessageType],
102100
) -> Union[asyncio.Future, List[asyncio.Future]]:
103101
"""
104102
send one or number of messages to server.
@@ -108,20 +106,22 @@ async def write_with_ack_future(
108106
109107
For wait with timeout use asyncio.wait_for.
110108
"""
109+
input_single_message = not isinstance(messages, list)
111110
if isinstance(messages, PublicMessage):
112-
futures = await self._reconnector.write_with_ack_future([messages])
113-
return futures[0]
111+
messages = [PublicMessage._create_message(messages)]
114112
if isinstance(messages, list):
115-
for m in messages:
116-
if not isinstance(m, PublicMessage):
117-
raise NotImplementedError()
118-
return await self._reconnector.write_with_ack_future(messages)
119-
raise NotImplementedError()
113+
for index, m in enumerate(messages):
114+
messages[index] = PublicMessage._create_message(m)
115+
116+
futures = await self._reconnector.write_with_ack_future(messages)
117+
if input_single_message:
118+
return futures[0]
119+
else:
120+
return futures
120121

121122
async def write(
122123
self,
123124
messages: Union[MessageType, List[MessageType]],
124-
*args: Optional[MessageType],
125125
):
126126
"""
127127
send one or number of messages to server.

ydb/_topic_writer/topic_writer_sync.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,25 +92,22 @@ def wait_init(self, timeout: Optional[TimeoutType] = None) -> PublicWriterInitIn
9292

9393
def write(
9494
self,
95-
message: Union[PublicMessage, List[PublicMessage]],
96-
*args: Optional[PublicMessage],
95+
messages: Union[PublicMessage, List[PublicMessage]],
9796
timeout: Union[float, None] = None,
9897
):
99-
self._call_sync(self._async_writer.write(message, *args), timeout=timeout)
98+
self._call_sync(self._async_writer.write(messages), timeout=timeout)
10099

101100
def async_write_with_ack(
102101
self,
103102
messages: Union[MessageType, List[MessageType]],
104-
*args: Optional[MessageType],
105103
) -> Future[Union[PublicWriteResult, List[PublicWriteResult]]]:
106-
return self._call(self._async_writer.write_with_ack(messages, *args))
104+
return self._call(self._async_writer.write_with_ack(messages))
107105

108106
def write_with_ack(
109107
self,
110108
messages: Union[MessageType, List[MessageType]],
111-
*args: Optional[MessageType],
112109
timeout: Union[float, None] = None,
113110
) -> Union[PublicWriteResult, List[PublicWriteResult]]:
114111
return self._call_sync(
115-
self._async_writer.write_with_ack(messages, *args), timeout=timeout
112+
self._async_writer.write_with_ack(messages), timeout=timeout
116113
)

0 commit comments

Comments
 (0)