Skip to content

Commit 1104d9d

Browse files
committed
add custom encoders
1 parent 3902748 commit 1104d9d

File tree

4 files changed

+45
-3
lines changed

4 files changed

+45
-3
lines changed

ydb/_topic_writer/topic_writer.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ class PublicWriterSettings:
3434
encoder_executor: Optional[
3535
concurrent.futures.Executor
3636
] = None # default shared client executor pool
37+
encoders: Optional[
38+
typing.Mapping[PublicCodec, typing.Callable[[bytes], bytes]]
39+
] = None
3740
# get_last_seqno: bool = False
38-
# encoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None
3941
# serializer: Union[Callable[[Any], bytes], None] = None
4042
# send_buffer_count: Optional[int] = 10000
4143
# send_buffer_bytes: Optional[int] = 100 * 1024 * 1024

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,11 @@ def __init__(self, driver: SupportedDriverType, settings: WriterSettings):
198198
PublicCodec.RAW: lambda data: data,
199199
PublicCodec.GZIP: gzip.compress,
200200
}
201+
202+
if settings.encoders:
203+
for codec, encoder in settings.encoders.items():
204+
self._codec_functions[codec] = encoder
205+
201206
self._encode_executor = settings.encoder_executor
202207

203208
self._codec_selector_batch_num = 0

ydb/_topic_writer/topic_writer_asyncio_test.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
TopicWriterError,
2828
)
2929
from .._grpc.grpcwrapper.ydb_topic_public_types import PublicCodec
30-
from .._topic_common.test_helpers import StreamMock
30+
from .._topic_common.test_helpers import StreamMock, wait_for_fast
3131

3232
from .topic_writer_asyncio import (
3333
WriterAsyncIOStream,
@@ -572,6 +572,35 @@ async def test_encode_data_inplace(
572572
assert mess.codec == codec
573573
assert mess.get_bytes() == expected_datas[index]
574574

575+
async def test_custom_encoder(
576+
self, default_driver, default_settings, get_stream_writer
577+
):
578+
codec = 10001
579+
580+
settings = copy.copy(default_settings)
581+
settings.encoders = {codec: lambda x: bytes(reversed(x))}
582+
settings.codec = codec
583+
reconnector = WriterAsyncIOReconnector(default_driver, settings)
584+
585+
now = datetime.datetime.now()
586+
seqno = self.init_last_seqno + 1
587+
588+
await reconnector.write_with_ack_future(
589+
[PublicMessage(data=b"123", seqno=seqno, created_at=now)]
590+
)
591+
592+
stream_writer = get_stream_writer()
593+
sent_messages = await wait_for_fast(stream_writer.from_client.get())
594+
595+
expected_mess = InternalMessage(
596+
PublicMessage(data=b"321", seqno=seqno, created_at=now)
597+
)
598+
expected_mess.codec = codec
599+
600+
assert sent_messages == [expected_mess]
601+
602+
await reconnector.close(flush=False)
603+
575604

576605
@pytest.mark.asyncio
577606
class TestWriterAsyncIO:

ydb/topic.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import concurrent.futures
44
import datetime
55
from dataclasses import dataclass
6-
from typing import List, Union, Mapping, Optional, Dict
6+
from typing import List, Union, Mapping, Optional, Dict, Callable
77

88
from . import aio, Credentials, _apis, issues
99

@@ -167,6 +167,9 @@ def writer(
167167
auto_seqno: bool = True,
168168
auto_created_at: bool = True,
169169
codec: Optional[TopicCodec] = None, # default mean auto-select
170+
encoders: Optional[
171+
Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]
172+
] = None,
170173
encoder_executor: Optional[
171174
concurrent.futures.Executor
172175
] = None, # default shared client executor pool
@@ -325,6 +328,9 @@ def writer(
325328
auto_seqno: bool = True,
326329
auto_created_at: bool = True,
327330
codec: Optional[TopicCodec] = None, # default mean auto-select
331+
encoders: Optional[
332+
Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]
333+
] = None,
328334
encoder_executor: Optional[
329335
concurrent.futures.Executor
330336
] = None, # default shared client executor pool

0 commit comments

Comments
 (0)