Skip to content

Commit 3a6aa86

Browse files
committed
fix api docs
1 parent 84c29a4 commit 3a6aa86

File tree

3 files changed

+46
-44
lines changed

3 files changed

+46
-44
lines changed

pulsar/asyncio.py

Lines changed: 35 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -308,9 +308,9 @@ def __init__(self, service_url, **kwargs) -> None:
308308

309309
# pylint: disable=too-many-arguments,too-many-locals
310310
async def create_producer(self, topic: str,
311-
producer_name: str = None,
312-
schema: pulsar.schema.Schema = None,
313-
initial_sequence_id: int = None,
311+
producer_name: str | None = None,
312+
schema: pulsar.schema.Schema | None = None,
313+
initial_sequence_id: int | None = None,
314314
send_timeout_millis: int = 30000,
315315
compression_type: CompressionType = CompressionType.NONE,
316316
max_pending_messages: int = 1000,
@@ -324,12 +324,12 @@ async def create_producer(self, topic: str,
324324
message_routing_mode: PartitionsRoutingMode =
325325
PartitionsRoutingMode.RoundRobinDistribution,
326326
lazy_start_partitioned_producers: bool = False,
327-
properties: dict = None,
327+
properties: dict | None = None,
328328
batching_type: BatchingType = BatchingType.Default,
329-
encryption_key: str = None,
330-
crypto_key_reader: pulsar.CryptoKeyReader = None,
329+
encryption_key: str | None = None,
330+
crypto_key_reader: pulsar.CryptoKeyReader | None = None,
331331
access_mode: ProducerAccessMode = ProducerAccessMode.Shared,
332-
message_router: Callable[[pulsar.Message, int], int] = None,
332+
message_router: Callable[[pulsar.Message, int], int] | None = None,
333333
) -> Producer:
334334
"""
335335
Create a new producer on a given topic
@@ -338,15 +338,15 @@ async def create_producer(self, topic: str,
338338
----------
339339
topic: str
340340
The topic name
341-
producer_name: str, optional
341+
producer_name: str | None, default=None
342342
Specify a name for the producer. If not assigned, the system will
343343
generate a globally unique name which can be accessed with
344344
`Producer.producer_name()`. When specifying a name, it is app to
345345
the user to ensure that, for a given topic, the producer name is
346346
unique across all Pulsar's clusters.
347-
schema: pulsar.schema.Schema, optional
347+
schema: pulsar.schema.Schema | None, default=None
348348
Define the schema of the data that will be published by this producer.
349-
initial_sequence_id: int, optional
349+
initial_sequence_id: int | None, default=None
350350
Set the baseline for the sequence ids for messages published by
351351
the producer.
352352
send_timeout_millis: int, default=30000
@@ -364,7 +364,9 @@ async def create_producer(self, topic: str,
364364
Set whether send operations should block when the outgoing
365365
message queue is full.
366366
batching_enabled: bool, default=False
367-
Enable automatic message batching.
367+
Enable automatic message batching. Note that, unlike the synchronous
368+
producer API in ``pulsar.__init__``, batching is enabled by default
369+
for the asyncio producer.
368370
batching_max_messages: int, default=1000
369371
Maximum number of messages in a batch.
370372
batching_max_allowed_size_in_bytes: int, default=128*1024
@@ -378,17 +380,17 @@ async def create_producer(self, topic: str,
378380
Set the message routing mode for the partitioned producer.
379381
lazy_start_partitioned_producers: bool, default=False
380382
Start partitioned producers lazily on demand.
381-
properties: dict, optional
383+
properties: dict | None, default=None
382384
Sets the properties for the producer.
383385
batching_type: BatchingType, default=BatchingType.Default
384386
Sets the batching type for the producer.
385-
encryption_key: str, optional
387+
encryption_key: str | None, default=None
386388
The key used for symmetric encryption.
387-
crypto_key_reader: CryptoKeyReader, optional
389+
crypto_key_reader: pulsar.CryptoKeyReader | None, default=None
388390
Symmetric encryption class implementation.
389391
access_mode: ProducerAccessMode, default=ProducerAccessMode.Shared
390392
Set the type of access mode that the producer requires on the topic.
391-
message_router: optional
393+
message_router: Callable[[pulsar.Message, int], int] | None, default=None
392394
A custom message router function that takes a Message and the
393395
number of partitions and returns the partition index.
394396
@@ -455,32 +457,31 @@ async def subscribe(self, topic: Union[str, List[str]],
455457
subscription_name: str,
456458
consumer_type: pulsar.ConsumerType =
457459
pulsar.ConsumerType.Exclusive,
458-
schema: pulsar.schema.Schema = None,
459-
message_listener = None,
460+
schema: pulsar.schema.Schema | None = None,
461+
message_listener: Callable[['Consumer', pulsar.Message], None] | None = None,
460462
receiver_queue_size: int = 1000,
461463
max_total_receiver_queue_size_across_partitions: int =
462464
50000,
463-
consumer_name: str = None,
464-
unacked_messages_timeout_ms: int = None,
465+
consumer_name: str | None = None,
466+
unacked_messages_timeout_ms: int | None = None,
465467
broker_consumer_stats_cache_time_ms: int = 30000,
466468
negative_ack_redelivery_delay_ms: int = 60000,
467469
is_read_compacted: bool = False,
468-
properties: dict = None,
469-
pattern_auto_discovery_period: int = 60, # pylint: disable=unused-argument
470+
properties: dict | None = None,
470471
initial_position: InitialPosition = InitialPosition.Latest,
471-
crypto_key_reader: pulsar.CryptoKeyReader = None,
472+
crypto_key_reader: pulsar.CryptoKeyReader | None = None,
472473
replicate_subscription_state_enabled: bool = False,
473474
max_pending_chunked_message: int = 10,
474475
auto_ack_oldest_chunked_message_on_queue_full: bool = False,
475476
start_message_id_inclusive: bool = False,
476-
batch_receive_policy: pulsar.ConsumerBatchReceivePolicy =
477+
batch_receive_policy: pulsar.ConsumerBatchReceivePolicy | None =
477478
None,
478-
key_shared_policy: pulsar.ConsumerKeySharedPolicy =
479+
key_shared_policy: pulsar.ConsumerKeySharedPolicy | None =
479480
None,
480481
batch_index_ack_enabled: bool = False,
481482
regex_subscription_mode: RegexSubscriptionMode =
482483
RegexSubscriptionMode.PersistentOnly,
483-
dead_letter_policy: pulsar.ConsumerDeadLetterPolicy =
484+
dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | None =
484485
None,
485486
crypto_failure_action: ConsumerCryptoFailureAction =
486487
ConsumerCryptoFailureAction.FAIL,
@@ -497,17 +498,17 @@ async def subscribe(self, topic: Union[str, List[str]],
497498
The name of the subscription.
498499
consumer_type: pulsar.ConsumerType, default=pulsar.ConsumerType.Exclusive
499500
Select the subscription type to be used when subscribing to the topic.
500-
schema: pulsar.schema.Schema, optional
501+
schema: pulsar.schema.Schema | None, default=None
501502
Define the schema of the data that will be received by this consumer.
502-
message_listener: optional
503+
message_listener: Callable[[Consumer, pulsar.Message], None] | None, default=None
503504
Sets a message listener for the consumer.
504505
receiver_queue_size: int, default=1000
505506
Sets the size of the consumer receive queue.
506507
max_total_receiver_queue_size_across_partitions: int, default=50000
507508
Set the max total receiver queue size across partitions.
508-
consumer_name: str, optional
509+
consumer_name: str | None, default=None
509510
Sets the consumer name.
510-
unacked_messages_timeout_ms: int, optional
511+
unacked_messages_timeout_ms: int | None, default=None
511512
Sets the timeout in milliseconds for unacknowledged messages.
512513
broker_consumer_stats_cache_time_ms: int, default=30000
513514
Sets the time duration for which the broker-side consumer stats
@@ -517,13 +518,11 @@ async def subscribe(self, topic: Union[str, List[str]],
517518
processed.
518519
is_read_compacted: bool, default=False
519520
Selects whether to read the compacted version of the topic.
520-
properties: dict, optional
521+
properties: dict | None, default=None
521522
Sets the properties for the consumer.
522-
pattern_auto_discovery_period: int, default=60
523-
Periods of seconds for consumer to auto discover match topics.
524523
initial_position: InitialPosition, default=InitialPosition.Latest
525524
Set the initial position of a consumer when subscribing to the topic.
526-
crypto_key_reader: CryptoKeyReader, optional
525+
crypto_key_reader: pulsar.CryptoKeyReader | None, default=None
527526
Symmetric encryption class implementation.
528527
replicate_subscription_state_enabled: bool, default=False
529528
Set whether the subscription status should be replicated.
@@ -535,9 +534,9 @@ async def subscribe(self, topic: Union[str, List[str]],
535534
start_message_id_inclusive: bool, default=False
536535
Set the consumer to include the given position of any reset
537536
operation.
538-
batch_receive_policy: ConsumerBatchReceivePolicy, optional
537+
batch_receive_policy: pulsar.ConsumerBatchReceivePolicy | None, default=None
539538
Set the batch collection policy for batch receiving.
540-
key_shared_policy: ConsumerKeySharedPolicy, optional
539+
key_shared_policy: pulsar.ConsumerKeySharedPolicy | None, default=None
541540
Set the key shared policy for use when the ConsumerType is
542541
KeyShared.
543542
batch_index_ack_enabled: bool, default=False
@@ -546,7 +545,7 @@ async def subscribe(self, topic: Union[str, List[str]],
546545
default=RegexSubscriptionMode.PersistentOnly
547546
Set the regex subscription mode for use when the topic is a regex
548547
pattern.
549-
dead_letter_policy: ConsumerDeadLetterPolicy, optional
548+
dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | None, default=None
550549
Set dead letter policy for consumer.
551550
crypto_failure_action: ConsumerCryptoFailureAction,
552551
default=ConsumerCryptoFailureAction.FAIL

src/consumer.cc

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,21 +90,22 @@ void Consumer_seek(Consumer& consumer, const MessageId& msgId) {
9090
waitForAsyncResult([msgId, &consumer](ResultCallback callback) { consumer.seekAsync(msgId, callback); });
9191
}
9292

93-
MessageId Consumer_get_last_message_id(Consumer& consumer) {
94-
MessageId msgId;
95-
Result res;
96-
Py_BEGIN_ALLOW_THREADS res = consumer.getLastMessageId(msgId);
97-
Py_END_ALLOW_THREADS CHECK_RESULT(res);
98-
return msgId;
99-
}
100-
10193
void Consumer_seek_timestamp(Consumer& consumer, uint64_t timestamp) {
10294
waitForAsyncResult(
10395
[timestamp, &consumer](ResultCallback callback) { consumer.seekAsync(timestamp, callback); });
10496
}
10597

10698
bool Consumer_is_connected(Consumer& consumer) { return consumer.isConnected(); }
10799

100+
MessageId Consumer_get_last_message_id(Consumer& consumer) {
101+
MessageId msgId;
102+
Result res;
103+
Py_BEGIN_ALLOW_THREADS res = consumer.getLastMessageId(msgId);
104+
Py_END_ALLOW_THREADS;
105+
CHECK_RESULT(res);
106+
return msgId;
107+
}
108+
108109
void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) {
109110
py::gil_scoped_release release;
110111
consumer.receiveAsync(callback);

tests/asyncio_test.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,9 @@ async def test_unsubscribe(self):
187187
sub = 'sub'
188188
consumer = await self._client.subscribe(topic, sub)
189189
await consumer.unsubscribe()
190+
# Verify the consumer can be created successfully with the same subscription name
190191
consumer = await self._client.subscribe(topic, sub)
192+
await consumer.close()
191193

192194
async def test_seek_message_id(self):
193195
topic = f'asyncio-test-seek-message-id-{time.time()}'

0 commit comments

Comments
 (0)