Skip to content

Commit cbfb86a

Browse files
committed
rename producer_and_message_group_id to producer_id and make it optional
1 parent 4fba443 commit cbfb86a

File tree

8 files changed

+95
-65
lines changed

8 files changed

+95
-65
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
* BROKEN CHANGES: change names of public method in topic client
2+
* BROKEN CHANGES: rename parameter producer_and_message_group_id to producer_id
3+
* producer_id is optional now
24

35
## 3.0.1b5 ##
46
* Remove six package from code and dependencies (remove support python2)

examples/topic/writer_async_example.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,23 @@
1010
async def create_writer(db: ydb.aio.Driver):
1111
async with ydb.TopicClientAsyncIO(db).writer(
1212
"/database/topic/path",
13-
producer_and_message_group_id="producer-id",
13+
producer_id="producer-id",
1414
) as writer:
1515
await writer.write(TopicWriterMessage("asd"))
1616

1717

1818
async def connect_and_wait(db: ydb.aio.Driver):
1919
async with ydb.TopicClientAsyncIO(db).writer(
2020
"/database/topic/path",
21-
producer_and_message_group_id="producer-id",
21+
producer_id="producer-id",
2222
) as writer:
2323
writer.wait_init()
2424

2525

2626
async def connect_without_context_manager(db: ydb.aio.Driver):
2727
writer = ydb.TopicClientAsyncIO(db).writer(
2828
"/database/topic/path",
29-
producer_and_message_group_id="producer-id",
29+
producer_id="producer-id",
3030
)
3131
try:
3232
pass # some code

examples/topic/writer_example.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,31 +15,31 @@ async def connect():
1515
)
1616
writer = ydb.TopicClientAsyncIO(db).writer(
1717
"/local/topic",
18-
producer_and_message_group_id="producer-id",
18+
producer_id="producer-id",
1919
)
2020
await writer.write(TopicWriterMessage("asd"))
2121

2222

2323
def create_writer(db: ydb.Driver):
2424
with ydb.TopicClient(db).writer(
2525
"/database/topic/path",
26-
producer_and_message_group_id="producer-id",
26+
producer_id="producer-id",
2727
) as writer:
2828
writer.write(TopicWriterMessage("asd"))
2929

3030

3131
def connect_and_wait(db: ydb.Driver):
3232
with ydb.TopicClient(db).writer(
3333
"/database/topic/path",
34-
producer_and_message_group_id="producer-id",
34+
producer_id="producer-id",
3535
) as writer:
3636
writer.wait()
3737

3838

3939
def connect_without_context_manager(db: ydb.Driver):
4040
writer = ydb.TopicClient(db).writer(
4141
"/database/topic/path",
42-
producer_and_message_group_id="producer-id",
42+
producer_id="producer-id",
4343
)
4444
try:
4545
pass # some code

tests/conftest.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,24 @@ async def topic_path(driver, topic_consumer, database) -> str:
131131
@pytest.fixture()
132132
@pytest.mark.asyncio()
133133
async def topic_with_messages(driver, topic_path):
134-
writer = driver.topic_client.writer(
135-
topic_path, producer_and_message_group_id="fixture-producer-id"
136-
)
134+
writer = driver.topic_client.writer(topic_path, producer_id="fixture-producer-id")
137135
await writer.write_with_ack(
138136
ydb.TopicWriterMessage(data="123".encode()),
139137
ydb.TopicWriterMessage(data="456".encode()),
140138
)
141139
await writer.close()
140+
141+
142+
@pytest.fixture()
143+
@pytest.mark.asyncio()
144+
async def topic_reader(driver, topic_consumer, topic_path) -> ydb.TopicReaderAsyncIO:
145+
reader = driver.topic_client.reader(topic=topic_path, consumer=topic_consumer)
146+
yield reader
147+
await reader.close()
148+
149+
150+
@pytest.fixture()
151+
def topic_reader_sync(driver_sync, topic_consumer, topic_path) -> ydb.TopicReader:
152+
reader = driver_sync.topic_client.reader(topic=topic_path, consumer=topic_consumer)
153+
yield reader
154+
reader.close()

tests/topics/test_topic_writer.py

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,14 @@
66
@pytest.mark.asyncio
77
class TestTopicWriterAsyncIO:
88
async def test_send_message(self, driver: ydb.aio.Driver, topic_path):
9-
writer = driver.topic_client.writer(
10-
topic_path, producer_and_message_group_id="test"
11-
)
9+
writer = driver.topic_client.writer(topic_path, producer_id="test")
1210
await writer.write(ydb.TopicWriterMessage(data="123".encode()))
1311
await writer.close()
1412

1513
async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):
1614
async with driver.topic_client.writer(
1715
topic_path,
18-
producer_and_message_group_id="test",
16+
producer_id="test",
1917
auto_seqno=False,
2018
) as writer:
2119
await writer.write_with_ack(
@@ -24,16 +22,28 @@ async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):
2422

2523
async with driver.topic_client.writer(
2624
topic_path,
27-
producer_and_message_group_id="test",
28-
get_last_seqno=True,
25+
producer_id="test",
2926
) as writer2:
3027
init_info = await writer2.wait_init()
3128
assert init_info.last_seqno == 5
3229

30+
async def test_random_producer_id(
31+
self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO
32+
):
33+
async with driver.topic_client.writer(topic_path) as writer:
34+
await writer.write(ydb.TopicWriterMessage(data="123".encode()))
35+
async with driver.topic_client.writer(topic_path) as writer:
36+
await writer.write(ydb.TopicWriterMessage(data="123".encode()))
37+
38+
batch1 = await topic_reader.receive_batch()
39+
batch2 = await topic_reader.receive_batch()
40+
41+
assert batch1.messages[0].producer_id != batch2.messages[0].producer_id
42+
3343
async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path):
3444
async with driver.topic_client.writer(
3545
topic_path,
36-
producer_and_message_group_id="test",
46+
producer_id="test",
3747
auto_seqno=False,
3848
) as writer:
3949
last_seqno = 0
@@ -45,41 +55,37 @@ async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path):
4555

4656
async with driver.topic_client.writer(
4757
topic_path,
48-
producer_and_message_group_id="test",
49-
get_last_seqno=True,
58+
producer_id="test",
5059
) as writer:
5160
init_info = await writer.wait_init()
5261
assert init_info.last_seqno == last_seqno
5362

5463

5564
class TestTopicWriterSync:
5665
def test_send_message(self, driver_sync: ydb.Driver, topic_path):
57-
writer = driver_sync.topic_client.writer(
58-
topic_path, producer_and_message_group_id="test"
59-
)
66+
writer = driver_sync.topic_client.writer(topic_path, producer_id="test")
6067
writer.write(ydb.TopicWriterMessage(data="123".encode()))
6168
writer.close()
6269

6370
def test_wait_last_seqno(self, driver_sync: ydb.Driver, topic_path):
6471
with driver_sync.topic_client.writer(
6572
topic_path,
66-
producer_and_message_group_id="test",
73+
producer_id="test",
6774
auto_seqno=False,
6875
) as writer:
6976
writer.write_with_ack(ydb.TopicWriterMessage(data="123".encode(), seqno=5))
7077

7178
with driver_sync.topic_client.writer(
7279
topic_path,
73-
producer_and_message_group_id="test",
74-
get_last_seqno=True,
80+
producer_id="test",
7581
) as writer2:
7682
init_info = writer2.wait_init()
7783
assert init_info.last_seqno == 5
7884

7985
def test_auto_flush_on_close(self, driver_sync: ydb.Driver, topic_path):
8086
with driver_sync.topic_client.writer(
8187
topic_path,
82-
producer_and_message_group_id="test",
88+
producer_id="test",
8389
auto_seqno=False,
8490
) as writer:
8591
last_seqno = 0
@@ -89,8 +95,23 @@ def test_auto_flush_on_close(self, driver_sync: ydb.Driver, topic_path):
8995

9096
with driver_sync.topic_client.writer(
9197
topic_path,
92-
producer_and_message_group_id="test",
93-
get_last_seqno=True,
98+
producer_id="test",
9499
) as writer:
95100
init_info = writer.wait_init()
96101
assert init_info.last_seqno == last_seqno
102+
103+
def test_random_producer_id(
104+
self,
105+
driver_sync: ydb.aio.Driver,
106+
topic_path,
107+
topic_reader_sync: ydb.TopicReader,
108+
):
109+
with driver_sync.topic_client.writer(topic_path) as writer:
110+
writer.write(ydb.TopicWriterMessage(data="123".encode()))
111+
with driver_sync.topic_client.writer(topic_path) as writer:
112+
writer.write(ydb.TopicWriterMessage(data="123".encode()))
113+
114+
batch1 = topic_reader_sync.receive_batch()
115+
batch2 = topic_reader_sync.receive_batch()
116+
117+
assert batch1.messages[0].producer_id != batch2.messages[0].producer_id

ydb/_topic_writer/topic_writer.py

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import datetime
22
import enum
3+
import uuid
34
from dataclasses import dataclass
45
from enum import Enum
5-
from typing import List, Union, TextIO, BinaryIO, Optional, Callable, Mapping, Any, Dict
6+
from typing import List, Union, TextIO, BinaryIO, Optional, Any, Dict
67

78
import typing
89

@@ -16,21 +17,31 @@
1617

1718
@dataclass
1819
class PublicWriterSettings:
20+
"""
21+
Settings for topic writer.
22+
23+
order of fields IS NOT stable, use keywords only
24+
"""
25+
1926
topic: str
20-
producer_and_message_group_id: str
27+
producer_id: Optional[str] = None
2128
session_metadata: Optional[Dict[str, str]] = None
22-
encoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None
23-
serializer: Union[Callable[[Any], bytes], None] = None
24-
send_buffer_count: Optional[int] = 10000
25-
send_buffer_bytes: Optional[int] = 100 * 1024 * 1024
2629
partition_id: Optional[int] = None
27-
codec: Optional[int] = None
28-
codec_autoselect: bool = True
2930
auto_seqno: bool = True
3031
auto_created_at: bool = True
31-
get_last_seqno: bool = False
32-
retry_policy: Optional["RetryPolicy"] = None
33-
update_token_interval: Union[int, float] = 3600
32+
# get_last_seqno: bool = False
33+
# encoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None
34+
# serializer: Union[Callable[[Any], bytes], None] = None
35+
# send_buffer_count: Optional[int] = 10000
36+
# send_buffer_bytes: Optional[int] = 100 * 1024 * 1024
37+
# codec: Optional[int] = None
38+
# codec_autoselect: bool = True
39+
# retry_policy: Optional["RetryPolicy"] = None
40+
# update_token_interval: Union[int, float] = 3600
41+
42+
def __post_init__(self):
43+
if self.producer_id is None:
44+
self.producer_id = uuid.uuid4().hex
3445

3546

3647
@dataclass
@@ -55,18 +66,16 @@ def __init__(self, settings: PublicWriterSettings):
5566
def create_init_request(self) -> StreamWriteMessage.InitRequest:
5667
return StreamWriteMessage.InitRequest(
5768
path=self.topic,
58-
producer_id=self.producer_and_message_group_id,
69+
producer_id=self.producer_id,
5970
write_session_meta=self.session_metadata,
6071
partitioning=self.get_partitioning(),
61-
get_last_seq_no=self.get_last_seqno,
72+
get_last_seq_no=True,
6273
)
6374

6475
def get_partitioning(self) -> StreamWriteMessage.PartitioningType:
6576
if self.partition_id is not None:
6677
return StreamWriteMessage.PartitioningPartitionID(self.partition_id)
67-
return StreamWriteMessage.PartitioningMessageGroupID(
68-
self.producer_and_message_group_id
69-
)
78+
return StreamWriteMessage.PartitioningMessageGroupID(self.producer_id)
7079

7180

7281
class SendMode(Enum):

ydb/_topic_writer/topic_writer_asyncio_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ def default_settings(self) -> WriterSettings:
238238
return WriterSettings(
239239
PublicWriterSettings(
240240
topic="/local/topic",
241-
producer_and_message_group_id="test-producer",
241+
producer_id="test-producer",
242242
auto_seqno=False,
243243
auto_created_at=False,
244244
)
@@ -487,7 +487,7 @@ async def close(self):
487487
def default_settings(self) -> PublicWriterSettings:
488488
return PublicWriterSettings(
489489
topic="/local/topic",
490-
producer_and_message_group_id="producer-id",
490+
producer_id="producer-id",
491491
)
492492

493493
@pytest.fixture(autouse=True)

ydb/topic.py

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import datetime
2-
from typing import List, Callable, Union, Mapping, Any, Optional, Dict
2+
from typing import List, Union, Mapping, Optional, Dict
33

44
from . import aio, Credentials, _apis
55

@@ -143,19 +143,11 @@ def writer(
143143
self,
144144
topic,
145145
*,
146-
producer_and_message_group_id: str,
146+
producer_id: Optional[str] = None, # default - random
147147
session_metadata: Mapping[str, str] = None,
148-
encoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None,
149-
serializer: Union[Callable[[Any], bytes], None] = None,
150-
send_buffer_count: Union[int, None] = 10000,
151-
send_buffer_bytes: Union[int, None] = 100 * 1024 * 1024,
152148
partition_id: Union[int, None] = None,
153-
codec: Union[int, None] = None,
154-
codec_autoselect: bool = True,
155149
auto_seqno: bool = True,
156150
auto_created_at: bool = True,
157-
get_last_seqno: bool = False,
158-
retry_policy: Union["TopicWriterRetryPolicy", None] = None,
159151
) -> TopicWriterAsyncIO:
160152
args = locals()
161153
del args["self"]
@@ -265,19 +257,12 @@ def reader(
265257
def writer(
266258
self,
267259
topic,
268-
producer_and_message_group_id: str,
260+
*,
261+
producer_id: Optional[str] = None, # default - random
269262
session_metadata: Mapping[str, str] = None,
270-
encoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None,
271-
serializer: Union[Callable[[Any], bytes], None] = None,
272-
send_buffer_count: Union[int, None] = 10000,
273-
send_buffer_bytes: Union[int, None] = 100 * 1024 * 1024,
274263
partition_id: Union[int, None] = None,
275-
codec: Union[int, None] = None,
276-
codec_autoselect: bool = True,
277264
auto_seqno: bool = True,
278265
auto_created_at: bool = True,
279-
get_last_seqno: bool = False,
280-
retry_policy: Union["TopicWriterRetryPolicy", None] = None,
281266
) -> TopicWriter:
282267
args = locals()
283268
del args["self"]

0 commit comments

Comments
 (0)