Skip to content

Commit 3e6defb

Browse files
committed
add tests
1 parent 859f24f commit 3e6defb

File tree

6 files changed

+80
-1
lines changed

6 files changed

+80
-1
lines changed

tests/conftest.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,31 @@ async def topic_with_messages(driver, topic_consumer, database):
263263
return topic_path
264264

265265

266+
@pytest.fixture()
267+
@pytest.mark.asyncio()
268+
async def topic_with_messages_with_metadata(driver, topic_consumer, database):
269+
topic_path = database + "/test-topic-with-messages-with-metadata"
270+
try:
271+
await driver.topic_client.drop_topic(topic_path)
272+
except issues.SchemeError:
273+
pass
274+
275+
await driver.topic_client.create_topic(
276+
path=topic_path,
277+
consumers=[topic_consumer],
278+
)
279+
280+
writer = driver.topic_client.writer(topic_path, producer_id="fixture-producer-id", codec=ydb.TopicCodec.RAW)
281+
await writer.write_with_ack(
282+
[
283+
ydb.TopicWriterMessage(data="123".encode(), metadata_items={"key": "value"}),
284+
ydb.TopicWriterMessage(data="456".encode(), metadata_items={"key": b"value"}),
285+
]
286+
)
287+
await writer.close()
288+
return topic_path
289+
290+
266291
@pytest.fixture()
267292
@pytest.mark.asyncio()
268293
async def topic_reader(driver, topic_consumer, topic_path) -> ydb.TopicReaderAsyncIO:

tests/topics/test_topic_reader.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,21 @@ async def test_read_message(self, driver, topic_with_messages, topic_consumer):
3030

3131
await reader.close()
3232

33+
async def test_read_metadata(self, driver, topic_with_messages_with_metadata, topic_consumer):
34+
reader = driver.topic_client.reader(topic_with_messages_with_metadata, topic_consumer)
35+
36+
expected_metadata_items = {"key": b"value"}
37+
38+
for _ in range(2):
39+
await reader.wait_message()
40+
msg = await reader.receive_message()
41+
42+
assert msg is not None
43+
assert msg.metadata_items
44+
assert msg.metadata_items == expected_metadata_items
45+
46+
await reader.close()
47+
3348
async def test_read_and_commit_with_close_reader(self, driver, topic_with_messages, topic_consumer):
3449
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
3550
message = await reader.receive_message()
@@ -135,6 +150,20 @@ def test_read_message(self, driver_sync, topic_with_messages, topic_consumer):
135150

136151
reader.close()
137152

153+
def test_read_metadata(self, driver_sync, topic_with_messages_with_metadata, topic_consumer):
154+
reader = driver_sync.topic_client.reader(topic_with_messages_with_metadata, topic_consumer)
155+
156+
expected_metadata_items = {"key": b"value"}
157+
158+
for _ in range(2):
159+
msg = reader.receive_message()
160+
161+
assert msg is not None
162+
assert msg.metadata_items
163+
assert msg.metadata_items == expected_metadata_items
164+
165+
reader.close()
166+
138167
def test_read_and_commit_with_close_reader(self, driver_sync, topic_with_messages, topic_consumer):
139168
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
140169
message = reader.receive_message()

tests/topics/test_topic_writer.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ async def test_send_message(self, driver: ydb.aio.Driver, topic_path):
1515
await writer.write(ydb.TopicWriterMessage(data="123".encode()))
1616
await writer.close()
1717

18+
async def test_send_message_with_metadata(self, driver: ydb.aio.Driver, topic_path):
19+
writer = driver.topic_client.writer(topic_path, producer_id="test")
20+
await writer.write(ydb.TopicWriterMessage(data="123".encode(), metadata_items={"key": "value"}))
21+
await writer.close()
22+
1823
async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):
1924
async with driver.topic_client.writer(
2025
topic_path,
@@ -136,6 +141,11 @@ def test_send_message(self, driver_sync: ydb.Driver, topic_path):
136141
writer.write(ydb.TopicWriterMessage(data="123".encode()))
137142
writer.close()
138143

144+
def test_send_message_with_metadata(self, driver_sync: ydb.Driver, topic_path):
145+
writer = driver_sync.topic_client.writer(topic_path, producer_id="test")
146+
writer.write(ydb.TopicWriterMessage(data="123".encode(), metadata_items={"key": "value"}))
147+
writer.close()
148+
139149
def test_wait_last_seqno(self, driver_sync: ydb.Driver, topic_path):
140150
with driver_sync.topic_client.writer(
141151
topic_path,

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ def create_message(
208208
written_at=datetime.datetime(2023, 2, 3, 14, 16),
209209
producer_id="test-producer-id",
210210
data=bytes(),
211+
metadata_items={},
211212
_partition_session=partition_session,
212213
_commit_start_offset=partition_session._next_message_start_commit_offset + offset_delta - 1,
213214
_commit_end_offset=partition_session._next_message_start_commit_offset + offset_delta,
@@ -251,6 +252,7 @@ def batch_size():
251252
seq_no=message.seqno,
252253
created_at=message.created_at,
253254
data=message.data,
255+
metadata_items={},
254256
uncompresed_size=len(message.data),
255257
message_group_id=message.message_group_id,
256258
)
@@ -773,6 +775,7 @@ async def test_free_buffer_after_partition_stop(self, stream, stream_reader, par
773775
seq_no=123,
774776
created_at=t,
775777
data=bytes(),
778+
metadata_items={},
776779
uncompresed_size=message_size,
777780
message_group_id="test-message-group",
778781
)
@@ -853,6 +856,7 @@ def reader_batch_count():
853856
created_at=created_at,
854857
data=data,
855858
uncompresed_size=len(data),
859+
metadata_items={},
856860
message_group_id=message_group_id,
857861
)
858862
],
@@ -884,6 +888,7 @@ def reader_batch_count():
884888
written_at=written_at,
885889
producer_id=producer_id,
886890
data=data,
891+
metadata_items={},
887892
_partition_session=partition_session,
888893
_commit_start_offset=expected_message_offset,
889894
_commit_end_offset=expected_message_offset + 1,
@@ -930,6 +935,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
930935
seq_no=3,
931936
created_at=created_at,
932937
data=data,
938+
metadata_items={},
933939
uncompresed_size=len(data),
934940
message_group_id=message_group_id,
935941
)
@@ -951,6 +957,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
951957
seq_no=2,
952958
created_at=created_at2,
953959
data=data,
960+
metadata_items={},
954961
uncompresed_size=len(data),
955962
message_group_id=message_group_id,
956963
)
@@ -967,6 +974,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
967974
seq_no=3,
968975
created_at=created_at3,
969976
data=data2,
977+
metadata_items={},
970978
uncompresed_size=len(data2),
971979
message_group_id=message_group_id,
972980
),
@@ -975,6 +983,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
975983
seq_no=5,
976984
created_at=created_at4,
977985
data=data,
986+
metadata_items={},
978987
uncompresed_size=len(data),
979988
message_group_id=message_group_id2,
980989
),
@@ -1005,6 +1014,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
10051014
written_at=written_at,
10061015
producer_id=producer_id,
10071016
data=data,
1017+
metadata_items={},
10081018
_partition_session=partition_session,
10091019
_commit_start_offset=partition1_mess1_expected_offset,
10101020
_commit_end_offset=partition1_mess1_expected_offset + 1,
@@ -1025,6 +1035,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
10251035
written_at=written_at2,
10261036
producer_id=producer_id,
10271037
data=data,
1038+
metadata_items={},
10281039
_partition_session=second_partition_session,
10291040
_commit_start_offset=partition2_mess1_expected_offset,
10301041
_commit_end_offset=partition2_mess1_expected_offset + 1,
@@ -1045,6 +1056,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
10451056
written_at=written_at2,
10461057
producer_id=producer_id2,
10471058
data=data2,
1059+
metadata_items={},
10481060
_partition_session=second_partition_session,
10491061
_commit_start_offset=partition2_mess2_expected_offset,
10501062
_commit_end_offset=partition2_mess2_expected_offset + 1,
@@ -1058,6 +1070,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
10581070
written_at=written_at2,
10591071
producer_id=producer_id,
10601072
data=data,
1073+
metadata_items={},
10611074
_partition_session=second_partition_session,
10621075
_commit_start_offset=partition2_mess3_expected_offset,
10631076
_commit_end_offset=partition2_mess3_expected_offset + 1,

ydb/_topic_writer/topic_writer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,12 @@ class InternalMessage(StreamWriteMessage.WriteRequest.MessageData, IToProto):
120120
codec: PublicCodec
121121

122122
def __init__(self, mess: PublicMessage):
123+
metadata_items = mess.metadata_items or {}
123124
super().__init__(
124125
seq_no=mess.seqno,
125126
created_at=mess.created_at,
126127
data=mess.data,
127-
metadata_items=mess.metadata_items,
128+
metadata_items=metadata_items,
128129
uncompressed_size=len(mess.data),
129130
partitioning=None,
130131
)

ydb/_topic_writer/topic_writer_asyncio_test.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ async def test_write_a_message(self, writer_and_stream: WriterWithMockedStream):
153153
seq_no=1,
154154
created_at=now,
155155
data=data,
156+
metadata_items={},
156157
uncompressed_size=len(data),
157158
partitioning=None,
158159
)

0 commit comments

Comments
 (0)