Skip to content

Commit a61c5e4

Browse files
committed
Support basic functionality of asyncio based consumer
1 parent 813e295 commit a61c5e4

File tree

4 files changed

+360
-14
lines changed

4 files changed

+360
-14
lines changed

pulsar/asyncio.py

Lines changed: 177 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@
2323

2424
import asyncio
2525
import functools
26-
from typing import Any
26+
from typing import Any, List, Union
2727

2828
import _pulsar
29+
from _pulsar import InitialPosition
2930
import pulsar
3031

3132
class PulsarException(BaseException):
@@ -116,6 +117,134 @@ async def close(self) -> None:
116117
self._producer.close_async(functools.partial(_set_future, future, value=None))
117118
await future
118119

120+
class Consumer:
121+
"""
122+
The Pulsar message consumer, used to subscribe to messages from a topic.
123+
"""
124+
125+
def __init__(self, consumer: _pulsar.Consumer) -> None:
126+
"""
127+
Create the consumer.
128+
Users should not call this constructor directly. Instead, create the
129+
consumer via `Client.subscribe`.
130+
131+
Parameters
132+
----------
133+
consumer: _pulsar.Consumer
134+
The underlying Consumer object from the C extension.
135+
"""
136+
self._consumer: _pulsar.Consumer = consumer
137+
138+
async def receive(self) -> pulsar.Message:
139+
"""
140+
Receive a single message asynchronously.
141+
142+
Returns
143+
-------
144+
pulsar.Message
145+
The message received.
146+
147+
Raises
148+
------
149+
PulsarException
150+
"""
151+
future = asyncio.get_running_loop().create_future()
152+
self._consumer.receive_async(functools.partial(_set_future, future))
153+
msg = await future
154+
m = pulsar.Message()
155+
m._message = msg
156+
m._schema = pulsar.schema.BytesSchema()
157+
return m
158+
159+
async def acknowledge(self, message: Union[pulsar.Message, pulsar.MessageId, _pulsar.Message, _pulsar.MessageId]) -> None:
160+
"""
161+
Acknowledge the reception of a single message asynchronously.
162+
163+
Parameters
164+
----------
165+
message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
166+
The received message or message id.
167+
168+
Raises
169+
------
170+
PulsarException
171+
"""
172+
future = asyncio.get_running_loop().create_future()
173+
if isinstance(message, pulsar.Message):
174+
msg = message._message
175+
elif isinstance(message, pulsar.MessageId):
176+
msg = message._msg_id
177+
else:
178+
msg = message
179+
self._consumer.acknowledge_async(msg, functools.partial(_set_future, future, value=None))
180+
await future
181+
182+
async def acknowledge_cumulative(self, message: Union[pulsar.Message, pulsar.MessageId, _pulsar.Message, _pulsar.MessageId]) -> None:
183+
"""
184+
Acknowledge the reception of all the messages in the stream up to (and
185+
including) the provided message asynchronously.
186+
187+
Parameters
188+
----------
189+
message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
190+
The received message or message id.
191+
192+
Raises
193+
------
194+
PulsarException
195+
"""
196+
future = asyncio.get_running_loop().create_future()
197+
if isinstance(message, pulsar.Message):
198+
msg = message._message
199+
elif isinstance(message, pulsar.MessageId):
200+
msg = message._msg_id
201+
else:
202+
msg = message
203+
self._consumer.acknowledge_cumulative_async(msg, functools.partial(_set_future, future, value=None))
204+
await future
205+
206+
async def unsubscribe(self) -> None:
207+
"""
208+
Unsubscribe the current consumer from the topic asynchronously.
209+
210+
Raises
211+
------
212+
PulsarException
213+
"""
214+
future = asyncio.get_running_loop().create_future()
215+
self._consumer.unsubscribe_async(functools.partial(_set_future, future, value=None))
216+
await future
217+
218+
async def close(self) -> None:
219+
"""
220+
Close the consumer asynchronously.
221+
222+
Raises
223+
------
224+
PulsarException
225+
"""
226+
future = asyncio.get_running_loop().create_future()
227+
self._consumer.close_async(functools.partial(_set_future, future, value=None))
228+
await future
229+
230+
def topic(self) -> str:
231+
"""
232+
Return the topic this consumer is subscribed to.
233+
"""
234+
return self._consumer.topic()
235+
236+
def subscription_name(self) -> str:
237+
"""
238+
Return the subscription name.
239+
"""
240+
return self._consumer.subscription_name()
241+
242+
def consumer_name(self) -> str:
243+
"""
244+
Return the consumer name.
245+
"""
246+
return self._consumer.consumer_name()
247+
119248
class Client:
120249
"""
121250
The asynchronous version of `pulsar.Client`.
@@ -151,6 +280,53 @@ async def create_producer(self, topic: str) -> Producer:
151280
self._client.create_producer_async(topic, conf, functools.partial(_set_future, future))
152281
return Producer(await future)
153282

283+
async def subscribe(self, topic: Union[str, List[str]], subscription_name: str,
284+
is_pattern_topic: bool = False,
285+
consumer_type: pulsar.ConsumerType = pulsar.ConsumerType.Exclusive,
286+
initial_position: InitialPosition = InitialPosition.Latest) -> Consumer:
287+
"""
288+
Subscribe to the given topic and subscription combination.
289+
290+
Parameters
291+
----------
292+
topic: str, List[str], or regex pattern
293+
The name of the topic, list of topics or regex pattern.
294+
subscription_name: str
295+
The name of the subscription.
296+
is_pattern_topic: bool, default=False
297+
Whether `topic` is a regex pattern. This option takes no effect when `topic` is a list of topics.
298+
consumer_type: pulsar.ConsumerType, default=pulsar.ConsumerType.Exclusive
299+
Select the subscription type to be used when subscribing to the topic.
300+
initial_position: InitialPosition, default=InitialPosition.Latest
301+
Set the initial position of a consumer when subscribing to the topic.
302+
It could be either: ``InitialPosition.Earliest`` or ``InitialPosition.Latest``.
303+
304+
Returns
305+
-------
306+
Consumer
307+
The consumer created
308+
309+
Raises
310+
------
311+
PulsarException
312+
"""
313+
future = asyncio.get_running_loop().create_future()
314+
conf = _pulsar.ConsumerConfiguration()
315+
conf.consumer_type(consumer_type)
316+
conf.subscription_initial_position(initial_position)
317+
318+
if isinstance(topic, str):
319+
if is_pattern_topic:
320+
self._client.subscribe_async_pattern(topic, subscription_name, conf, functools.partial(_set_future, future))
321+
else:
322+
self._client.subscribe_async(topic, subscription_name, conf, functools.partial(_set_future, future))
323+
elif isinstance(topic, list):
324+
self._client.subscribe_async_topics(topic, subscription_name, conf, functools.partial(_set_future, future))
325+
else:
326+
raise ValueError("Argument 'topic' is expected to be of a type between (str, list)")
327+
328+
return Consumer(await future)
329+
154330
async def close(self) -> None:
155331
"""
156332
Close the client and all the associated producers and consumers

src/client.cc

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,26 @@ void Client_closeAsync(Client& client, ResultCallback callback) {
8080
client.closeAsync(callback);
8181
}
8282

83+
void Client_subscribeAsync(Client& client, const std::string& topic, const std::string& subscriptionName,
84+
ConsumerConfiguration conf, SubscribeCallback callback) {
85+
py::gil_scoped_release release;
86+
client.subscribeAsync(topic, subscriptionName, conf, callback);
87+
}
88+
89+
void Client_subscribeAsync_topics(Client& client, const std::vector<std::string>& topics,
90+
const std::string& subscriptionName, ConsumerConfiguration conf,
91+
SubscribeCallback callback) {
92+
py::gil_scoped_release release;
93+
client.subscribeAsync(topics, subscriptionName, conf, callback);
94+
}
95+
96+
void Client_subscribeAsync_pattern(Client& client, const std::string& topic_pattern,
97+
const std::string& subscriptionName, ConsumerConfiguration conf,
98+
SubscribeCallback callback) {
99+
py::gil_scoped_release release;
100+
client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback);
101+
}
102+
83103
void export_client(py::module_& m) {
84104
py::class_<Client, std::shared_ptr<Client>>(m, "Client")
85105
.def(py::init<const std::string&, const ClientConfiguration&>())
@@ -99,5 +119,8 @@ void export_client(py::module_& m) {
99119
.def("get_schema_info", &Client_getSchemaInfo)
100120
.def("close", &Client_close)
101121
.def("close_async", &Client_closeAsync)
122+
.def("subscribe_async", &Client_subscribeAsync)
123+
.def("subscribe_async_topics", &Client_subscribeAsync_topics)
124+
.def("subscribe_async_pattern", &Client_subscribeAsync_pattern)
102125
.def("shutdown", &Client::shutdown);
103126
}

src/consumer.cc

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "utils.h"
2020

2121
#include <pulsar/Consumer.h>
22+
#include <pybind11/functional.h>
2223
#include <pybind11/pybind11.h>
2324
#include <pybind11/stl.h>
2425

@@ -89,21 +90,56 @@ void Consumer_seek(Consumer& consumer, const MessageId& msgId) {
8990
waitForAsyncResult([msgId, &consumer](ResultCallback callback) { consumer.seekAsync(msgId, callback); });
9091
}
9192

93+
MessageId Consumer_get_last_message_id(Consumer& consumer) {
94+
MessageId msgId;
95+
Result res;
96+
Py_BEGIN_ALLOW_THREADS res = consumer.getLastMessageId(msgId);
97+
Py_END_ALLOW_THREADS CHECK_RESULT(res);
98+
return msgId;
99+
}
100+
92101
void Consumer_seek_timestamp(Consumer& consumer, uint64_t timestamp) {
93102
waitForAsyncResult(
94103
[timestamp, &consumer](ResultCallback callback) { consumer.seekAsync(timestamp, callback); });
95104
}
96105

97106
bool Consumer_is_connected(Consumer& consumer) { return consumer.isConnected(); }
98107

99-
MessageId Consumer_get_last_message_id(Consumer& consumer) {
100-
MessageId msgId;
101-
Result res;
102-
Py_BEGIN_ALLOW_THREADS res = consumer.getLastMessageId(msgId);
103-
Py_END_ALLOW_THREADS
108+
void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) {
109+
py::gil_scoped_release release;
110+
consumer.receiveAsync(callback);
111+
}
104112

105-
CHECK_RESULT(res);
106-
return msgId;
113+
void Consumer_acknowledgeAsync(Consumer& consumer, const Message& msg, ResultCallback callback) {
114+
py::gil_scoped_release release;
115+
consumer.acknowledgeAsync(msg, callback);
116+
}
117+
118+
void Consumer_acknowledgeAsync_message_id(Consumer& consumer, const MessageId& msgId,
119+
ResultCallback callback) {
120+
py::gil_scoped_release release;
121+
consumer.acknowledgeAsync(msgId, callback);
122+
}
123+
124+
void Consumer_acknowledgeCumulativeAsync(Consumer& consumer, const Message& msg, ResultCallback callback) {
125+
py::gil_scoped_release release;
126+
consumer.acknowledgeCumulativeAsync(msg, callback);
127+
}
128+
129+
void Consumer_acknowledgeCumulativeAsync_message_id(Consumer& consumer, const MessageId& msgId,
130+
ResultCallback callback) {
131+
py::gil_scoped_release release;
132+
consumer.acknowledgeCumulativeAsync(msgId, callback);
133+
}
134+
135+
void Consumer_closeAsync(Consumer& consumer, ResultCallback callback) {
136+
py::gil_scoped_release release;
137+
consumer.closeAsync(callback);
138+
}
139+
140+
void Consumer_unsubscribeAsync(Consumer& consumer, ResultCallback callback) {
141+
py::gil_scoped_release release;
142+
consumer.unsubscribeAsync(callback);
107143
}
108144

109145
void export_consumer(py::module_& m) {
@@ -130,5 +166,12 @@ void export_consumer(py::module_& m) {
130166
.def("seek", &Consumer_seek)
131167
.def("seek", &Consumer_seek_timestamp)
132168
.def("is_connected", &Consumer_is_connected)
133-
.def("get_last_message_id", &Consumer_get_last_message_id);
169+
.def("get_last_message_id", &Consumer_get_last_message_id)
170+
.def("receive_async", &Consumer_receiveAsync)
171+
.def("acknowledge_async", &Consumer_acknowledgeAsync)
172+
.def("acknowledge_async", &Consumer_acknowledgeAsync_message_id)
173+
.def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync)
174+
.def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync_message_id)
175+
.def("close_async", &Consumer_closeAsync)
176+
.def("unsubscribe_async", &Consumer_unsubscribeAsync);
134177
}

0 commit comments

Comments
 (0)