Skip to content

Commit ba64920

Browse files
committed
address comments from copilot
1 parent dcc82fe commit ba64920

File tree

1 file changed

+13
-7
lines changed

1 file changed

+13
-7
lines changed

pulsar/asyncio.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import asyncio
2727
import functools
28+
from datetime import timedelta
2829
from typing import Any, Callable, List, Union
2930

3031
import _pulsar
@@ -98,7 +99,7 @@ async def send(self, content: Any,
9899
disable_replication: bool | None = None,
99100
event_timestamp: int | None = None,
100101
deliver_at: int | None = None,
101-
deliver_after: int | None = None) -> pulsar.MessageId:
102+
deliver_after: timedelta | None = None) -> pulsar.MessageId:
102103
"""
103104
Send a message asynchronously.
104105
@@ -108,7 +109,7 @@ async def send(self, content: Any,
108109
The message payload, whose type should respect the schema defined in
109110
`Client.create_producer`.
110111
properties: dict | None
111-
A dict of application0-defined string properties.
112+
A dict of application-defined string properties.
112113
partition_key: str | None
113114
Sets the partition key for the message routing. A hash of this key is
114115
used to determine the message's topic partition.
@@ -127,7 +128,7 @@ async def send(self, content: Any,
127128
Timestamp in millis of the timestamp of event creation
128129
deliver_at: int | None
129130
Specify the message should not be delivered earlier than the specified timestamp.
130-
deliver_after: int | None
131+
deliver_after: timedelta | None
131132
Specify a delay in timedelta for the delivery of the messages.
132133
133134
Returns
@@ -211,7 +212,12 @@ def producer_name(self):
211212

212213
def last_sequence_id(self):
213214
"""
214-
Return the last sequence id that was published by this producer.
215+
Return the last sequence id that was published and acknowledged by this producer.
216+
217+
The sequence id can be either automatically assigned or custom set on the message.
218+
After recreating a producer with the same name, this will return the sequence id
219+
of the last message that was published in the previous session, or -1 if no
220+
message was ever published.
215221
"""
216222
return self._producer.last_sequence_id()
217223

@@ -529,9 +535,9 @@ async def create_producer(self, topic: str,
529535
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
530536
conf.access_mode(access_mode)
531537
if message_router is not None:
532-
def default_router(msg: _pulsar.Message, num_partitions: int) -> int:
533-
return int(msg.partition_key()) % num_partitions
534-
conf.message_router(default_router)
538+
def underlying_router(msg: _pulsar.Message, num_partitions: int) -> int:
539+
return message_router(pulsar.Message._wrap(msg), num_partitions)
540+
conf.message_router(underlying_router)
535541

536542
self._client.create_producer_async(
537543
topic, conf, functools.partial(_set_future, future)

0 commit comments

Comments
 (0)