Skip to content

Commit 01643cd

Browse files
committed
another approach
1 parent cbc0352 commit 01643cd

File tree

5 files changed

+182
-111
lines changed

5 files changed

+182
-111
lines changed

tests/topics/test_topic_reader.py

Lines changed: 82 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import ydb
66

7+
from ydb._topic_reader.events import OnPartitionGetStartOffsetResponse
8+
79

810
@pytest.mark.asyncio
911
class TestTopicReaderAsyncIO:
@@ -253,6 +255,13 @@ async def wait(fut):
253255
await reader1.close()
254256

255257

258+
@pytest.fixture()
259+
def topic_selector(topic_with_messages):
260+
return ydb.TopicReaderSelector(
261+
path=topic_with_messages,
262+
partitions=[0]
263+
)
264+
256265
@pytest.mark.asyncio
257266
class TestTopicNoConsumerReaderAsyncIO:
258267
async def test_reader_with_no_partition_ids_raises(self, driver, topic_with_messages):
@@ -262,57 +271,63 @@ async def test_reader_with_no_partition_ids_raises(self, driver, topic_with_mess
262271
consumer=None,
263272
)
264273

265-
async def test_reader_with_default_lambda(self, driver, topic_with_messages):
266-
reader = driver.topic_client.reader(
267-
topic_with_messages,
268-
consumer=None,
269-
partition_ids=[0],
270-
)
274+
async def test_reader_with_no_partition_ids_selector_raises(self, driver, topic_selector):
275+
topic_selector.partitions = None
276+
277+
with pytest.raises(ydb.Error):
278+
driver.topic_client.reader(
279+
topic_selector,
280+
consumer=None,
281+
)
282+
283+
async def test_reader_with_default_lambda(self, driver, topic_selector):
284+
reader = driver.topic_client.reader(topic_selector, consumer=None)
271285
msg = await reader.receive_message()
272286

273287
assert msg.seqno == 1
274288

275289
await reader.close()
276290

277-
async def test_reader_with_sync_lambda(self, driver, topic_with_messages):
278-
def sync_lambda(partition_id: int):
279-
assert partition_id == 0
280-
return 1
291+
async def test_reader_with_sync_lambda(self, driver, topic_selector):
292+
class CustomEventHandler(ydb.TopicReaderEventHandler):
293+
def on_partition_get_start_offset(self, event):
294+
assert event.partition_id == 0
295+
return OnPartitionGetStartOffsetResponse(1)
281296

282297
reader = driver.topic_client.reader(
283-
topic_with_messages,
298+
topic_selector,
284299
consumer=None,
285-
partition_ids=[0],
286-
get_start_offset_lambda=sync_lambda,
300+
event_handler=CustomEventHandler(),
287301
)
302+
288303
msg = await reader.receive_message()
289304

290305
assert msg.seqno == 2
291306

292307
await reader.close()
293308

294-
async def test_reader_with_async_lambda(self, driver, topic_with_messages):
295-
async def async_lambda(partition_id: int) -> int:
296-
assert partition_id == 0
297-
return 1
309+
async def test_reader_with_async_lambda(self, driver, topic_selector):
310+
class CustomEventHandler(ydb.TopicReaderEventHandler):
311+
async def on_partition_get_start_offset(self, event):
312+
assert event.partition_id == 0
313+
return OnPartitionGetStartOffsetResponse(1)
298314

299315
reader = driver.topic_client.reader(
300-
topic_with_messages,
316+
topic_selector,
301317
consumer=None,
302-
partition_ids=[0],
303-
get_start_offset_lambda=async_lambda,
318+
event_handler=CustomEventHandler(),
304319
)
320+
305321
msg = await reader.receive_message()
306322

307323
assert msg.seqno == 2
308324

309325
await reader.close()
310326

311-
async def test_commit_not_allowed(self, driver, topic_with_messages):
327+
async def test_commit_not_allowed(self, driver, topic_selector):
312328
reader = driver.topic_client.reader(
313-
topic_with_messages,
329+
topic_selector,
314330
consumer=None,
315-
partition_ids=[0],
316331
)
317332
batch = await reader.receive_batch()
318333

@@ -324,18 +339,18 @@ async def test_commit_not_allowed(self, driver, topic_with_messages):
324339

325340
await reader.close()
326341

327-
async def test_offsets_updated_after_reconnect(self, driver, topic_with_messages):
342+
async def test_offsets_updated_after_reconnect(self, driver, topic_selector):
328343
current_offset = 0
329344

330-
def get_start_offset_lambda(partition_id: int) -> int:
331-
nonlocal current_offset
332-
return current_offset
345+
class CustomEventHandler(ydb.TopicReaderEventHandler):
346+
def on_partition_get_start_offset(self, event):
347+
nonlocal current_offset
348+
return OnPartitionGetStartOffsetResponse(current_offset)
333349

334350
reader = driver.topic_client.reader(
335-
topic_with_messages,
351+
topic_selector,
336352
consumer=None,
337-
partition_ids=[0],
338-
get_start_offset_lambda=get_start_offset_lambda,
353+
event_handler=CustomEventHandler(),
339354
)
340355
msg = await reader.receive_message()
341356

@@ -361,57 +376,63 @@ def test_reader_with_no_partition_ids_raises(self, driver_sync, topic_with_messa
361376
consumer=None,
362377
)
363378

364-
def test_reader_with_default_lambda(self, driver_sync, topic_with_messages):
365-
reader = driver_sync.topic_client.reader(
366-
topic_with_messages,
367-
consumer=None,
368-
partition_ids=[0],
369-
)
379+
def test_reader_with_no_partition_ids_selector_raises(self, driver_sync, topic_selector):
380+
topic_selector.partitions = None
381+
382+
with pytest.raises(ydb.Error):
383+
driver_sync.topic_client.reader(
384+
topic_selector,
385+
consumer=None,
386+
)
387+
388+
def test_reader_with_default_lambda(self, driver_sync, topic_selector):
389+
reader = driver_sync.topic_client.reader(topic_selector, consumer=None)
370390
msg = reader.receive_message()
371391

372392
assert msg.seqno == 1
373393

374394
reader.close()
375395

376-
def test_reader_with_sync_lambda(self, driver_sync, topic_with_messages):
377-
def sync_lambda(partition_id: int):
378-
assert partition_id == 0
379-
return 1
396+
def test_reader_with_sync_lambda(self, driver_sync, topic_selector):
397+
class CustomEventHandler(ydb.TopicReaderEventHandler):
398+
def on_partition_get_start_offset(self, event):
399+
assert event.partition_id == 0
400+
return OnPartitionGetStartOffsetResponse(1)
380401

381402
reader = driver_sync.topic_client.reader(
382-
topic_with_messages,
403+
topic_selector,
383404
consumer=None,
384-
partition_ids=[0],
385-
get_start_offset_lambda=sync_lambda,
405+
event_handler=CustomEventHandler(),
386406
)
407+
387408
msg = reader.receive_message()
388409

389410
assert msg.seqno == 2
390411

391412
reader.close()
392413

393-
def test_reader_with_async_lambda(self, driver_sync, topic_with_messages):
394-
async def async_lambda(partition_id: int) -> int:
395-
assert partition_id == 0
396-
return 1
414+
def test_reader_with_async_lambda(self, driver_sync, topic_selector):
415+
class CustomEventHandler(ydb.TopicReaderEventHandler):
416+
async def on_partition_get_start_offset(self, event):
417+
assert event.partition_id == 0
418+
return OnPartitionGetStartOffsetResponse(1)
397419

398420
reader = driver_sync.topic_client.reader(
399-
topic_with_messages,
421+
topic_selector,
400422
consumer=None,
401-
partition_ids=[0],
402-
get_start_offset_lambda=async_lambda,
423+
event_handler=CustomEventHandler(),
403424
)
425+
404426
msg = reader.receive_message()
405427

406428
assert msg.seqno == 2
407429

408430
reader.close()
409431

410-
def test_commit_not_allowed(self, driver_sync, topic_with_messages):
432+
def test_commit_not_allowed(self, driver_sync, topic_selector):
411433
reader = driver_sync.topic_client.reader(
412-
topic_with_messages,
434+
topic_selector,
413435
consumer=None,
414-
partition_ids=[0],
415436
)
416437
batch = reader.receive_batch()
417438

@@ -421,30 +442,27 @@ def test_commit_not_allowed(self, driver_sync, topic_with_messages):
421442
with pytest.raises(ydb.Error):
422443
reader.commit_with_ack(batch)
423444

424-
with pytest.raises(ydb.Error):
425-
reader.async_commit_with_ack(batch)
426-
427445
reader.close()
428446

429-
def test_offsets_updated_after_reconnect(self, driver_sync, topic_with_messages):
447+
def test_offsets_updated_after_reconnect(self, driver_sync, topic_selector):
430448
current_offset = 0
431449

432-
def get_start_offset_lambda(partition_id: int) -> int:
433-
nonlocal current_offset
434-
return current_offset
450+
class CustomEventHandler(ydb.TopicReaderEventHandler):
451+
def on_partition_get_start_offset(self, event):
452+
nonlocal current_offset
453+
return OnPartitionGetStartOffsetResponse(current_offset)
435454

436455
reader = driver_sync.topic_client.reader(
437-
topic_with_messages,
456+
topic_selector,
438457
consumer=None,
439-
partition_ids=[0],
440-
get_start_offset_lambda=get_start_offset_lambda,
458+
event_handler=CustomEventHandler(),
441459
)
442460
msg = reader.receive_message()
443461

444462
assert msg.seqno == current_offset + 1
445463

446464
current_offset += 2
447-
reader._async_reader._reconnector._stream_reader._set_first_error(ydb.Unavailable("some retriable error"))
465+
reader._reconnector._stream_reader._set_first_error(ydb.Unavailable("some retriable error"))
448466

449467
msg = reader.receive_message()
450468

ydb/_topic_reader/events.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from typing import Awaitable, Union
4+
5+
from ..issues import ClientInternalError
6+
7+
8+
class ReaderEvent:
9+
pass
10+
11+
12+
@dataclass
13+
class OnCommit(ReaderEvent):
14+
topic: str
15+
offset: int
16+
17+
18+
@dataclass
19+
class OnPartitionGetStartOffsetRequest(ReaderEvent):
20+
topic: str
21+
partition_id: int
22+
23+
24+
@dataclass
25+
class OnPartitionGetStartOffsetResponse:
26+
start_offset: int
27+
28+
29+
class OnInitPartition(ReaderEvent):
30+
pass
31+
32+
33+
class OnShutdownPartition:
34+
pass
35+
36+
37+
TopicEventDispatchType = Union[OnPartitionGetStartOffsetResponse, None]
38+
39+
40+
class ReaderEventHandler:
41+
def on_commit(self, event: OnCommit) -> Union[None, Awaitable[None]]:
42+
pass
43+
44+
def on_partition_get_start_offset(
45+
self,
46+
event: OnPartitionGetStartOffsetRequest,
47+
) -> Union[OnPartitionGetStartOffsetResponse, Awaitable[OnPartitionGetStartOffsetResponse]]:
48+
pass
49+
50+
def on_init_partition(self, event: OnInitPartition) -> Union[None, Awaitable[None]]:
51+
pass
52+
53+
def on_shutdown_partition(self, event: OnShutdownPartition) -> Union[None, Awaitable[None]]:
54+
pass
55+
56+
async def _dispatch(self, event: ReaderEvent) -> Awaitable[TopicEventDispatchType]:
57+
f = None
58+
if isinstance(event, OnCommit):
59+
f = self.on_commit
60+
elif isinstance(event, OnPartitionGetStartOffsetRequest):
61+
f = self.on_partition_get_start_offset
62+
elif isinstance(event, OnInitPartition):
63+
f = self.on_init_partition
64+
elif isinstance(event, OnShutdownPartition):
65+
f = self.on_shutdown_partition
66+
else:
67+
raise ClientInternalError("Unsupported topic reader event")
68+
69+
if asyncio.iscoroutinefunction(f):
70+
return await f(event)
71+
72+
return f(event)

ydb/_topic_reader/topic_reader.py

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
Callable,
1212
)
1313

14+
from .events import ReaderEventHandler
1415
from ..retries import RetrySettings
1516
from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange
1617

@@ -21,6 +22,7 @@ class PublicTopicSelector:
2122
partitions: Optional[Union[int, List[int]]] = None
2223
read_from: Optional[datetime.datetime] = None
2324
max_lag: Optional[datetime.timedelta] = None
25+
read_offset: Optional[int] = None
2426

2527
def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSettings:
2628
partitions = self.partitions
@@ -54,9 +56,7 @@ class PublicReaderSettings:
5456
# decoder_executor, must be set for handle non raw messages
5557
decoder_executor: Optional[concurrent.futures.Executor] = None
5658
update_token_interval: Union[int, float] = 3600
57-
58-
partition_ids: Optional[List[int]] = None
59-
get_start_offset_lambda: Optional[Union[Callable[[int], int], Callable[[int], Awaitable[int]]]] = None
59+
event_handler: Optional[ReaderEventHandler] = None
6060

6161
def __post_init__(self):
6262
# check possible create init message
@@ -89,25 +89,6 @@ def _retry_settings(self) -> RetrySettings:
8989
return RetrySettings(idempotent=True)
9090

9191

92-
class Events:
93-
class OnCommit:
94-
topic: str
95-
offset: int
96-
97-
class OnPartitionGetStartOffsetRequest:
98-
topic: str
99-
partition_id: int
100-
101-
class OnPartitionGetStartOffsetResponse:
102-
start_offset: int
103-
104-
class OnInitPartition:
105-
pass
106-
107-
class OnShutdownPatition:
108-
pass
109-
110-
11192
class RetryPolicy:
11293
connection_timeout_sec: float
11394
overload_timeout_sec: float

0 commit comments

Comments
 (0)