Skip to content

Commit 5bbddb5

Browse files
committed
WIP: Proper asyncio implementation using the c++ side
1 parent c3c12c4 commit 5bbddb5

File tree

3 files changed

+220
-2
lines changed

3 files changed

+220
-2
lines changed

pulsar/asyncio.py

Lines changed: 172 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323

2424
import asyncio
2525
import functools
26-
from typing import Any
26+
from typing import Any, Iterable, Optional
2727

2828
import _pulsar
29+
2930
import pulsar
3031

32+
3133
class PulsarException(BaseException):
3234
"""
3335
The exception that wraps the Pulsar error code
@@ -56,6 +58,7 @@ def __str__(self):
5658
"""
5759
return f'{self._result.value} {self._result.name}'
5860

61+
5962
class Producer:
6063
"""
6164
The Pulsar message producer, used to publish messages on a topic.
@@ -116,6 +119,152 @@ async def close(self) -> None:
116119
self._producer.close_async(functools.partial(_set_future, future, value=None))
117120
await future
118121

122+
async def flush(self):
123+
"""
124+
Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
125+
126+
Raises
127+
------
128+
PulsarException
129+
"""
130+
future = asyncio.get_running_loop().create_future()
131+
self._producer.flush_async(functools.partial(_set_future, future, value=None))
132+
await future
133+
134+
@property
135+
def is_connected(self) -> bool:
136+
"""
137+
Check if the producer is connected or not.
138+
"""
139+
140+
return self._producer.is_connected()
141+
142+
@property
143+
def last_sequence_id(self) -> int:
144+
"""
145+
Get the last sequence id
146+
"""
147+
return self._producer.last_sequence_id()
148+
149+
@property
150+
def name(self) -> str:
151+
"""
152+
Get the name of the producer.
153+
"""
154+
return self._producer.producer_name()
155+
156+
@property
157+
def topic(self) -> str:
158+
"""
159+
Get the topic name of the producer.
160+
"""
161+
return self._producer.topic()
162+
163+
164+
class Consumer:
165+
def __init__(self, consumer: _pulsar.Consumer) -> None:
166+
self._consumer: _pulsar.Consumer = consumer
167+
168+
async def acknowledge(self, msg: pulsar.Message) -> None:
169+
"""
170+
Acknowledge the reception of a single message.
171+
"""
172+
future = asyncio.get_running_loop().create_future()
173+
self._consumer.acknowledge_async(msg, functools.partial(_set_future, future))
174+
await future
175+
176+
async def acknowledge_cumulative(self, msg: pulsar.Message) -> None:
177+
"""
178+
Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
179+
"""
180+
future = asyncio.get_running_loop().create_future()
181+
self._consumer.acknowledge_cumulative_async(msg, functools.partial(_set_future, future))
182+
await future
183+
184+
async def negative_acknowledge(self, msg: pulsar.Message) -> None:
185+
"""
186+
Acknowledge the failure to process a single message.
187+
"""
188+
future = asyncio.get_running_loop().create_future()
189+
self._consumer.negative_acknowledge_async(msg, functools.partial(_set_future, future))
190+
await future
191+
192+
async def batch_receive(self) -> Iterable[pulsar.Message]:
193+
"""
194+
Batch receiving messages.
195+
"""
196+
future = asyncio.get_running_loop().create_future()
197+
self._consumer.batch_receive_async(functools.partial(_set_future, future))
198+
return await future
199+
200+
async def receive(self) -> pulsar.Message:
201+
"""
202+
Receive a single message.
203+
"""
204+
future = asyncio.get_running_loop().create_future()
205+
206+
self._consumer.receive_async(functools.partial(_set_future, future))
207+
return await future
208+
209+
async def close(self):
210+
"""
211+
Close the consumer.
212+
"""
213+
future = asyncio.get_running_loop().create_future()
214+
self._consumer.close_async(functools.partial(_set_future, future, value=None))
215+
await future
216+
217+
async def seek(self, position: int):
218+
"""
219+
Reset the subscription associated with this consumer to a specific message id or publish timestamp. The message id can either be a specific message or represent the first or last messages in the topic. ...
220+
"""
221+
future = asyncio.get_running_loop().create_future()
222+
self._consumer.seek_async(position, functools.partial(_set_future, future))
223+
await future
224+
225+
async def unsubscribe(self):
226+
"""
227+
Unsubscribe the current consumer from the topic.
228+
"""
229+
future = asyncio.get_running_loop().create_future()
230+
self._consumer.unsubscribe_async(functools.partial(_set_future, future))
231+
await future
232+
233+
def pause_message_listener(self):
234+
"""
235+
Pause receiving messages via the message_listener until resume_message_listener() is called.
236+
"""
237+
self._consumer.pause_message_listener()
238+
239+
def resume_message_listener(self):
240+
"""
241+
Resume receiving the messages via the message listener. Asynchronously receive all the messages enqueued from the time pause_message_listener() was called.
242+
"""
243+
self._consumer.resume_message_listener()
244+
245+
def redeliver_unacknowledged_messages(self):
246+
"""
247+
Redelivers all the unacknowledged messages. In failover mode, the request is ignored if the consumer is not active for the given topic. In shared mode, the consumer's messages to be redelivered are distributed across all the connected consumers...
248+
"""
249+
self._consumer.redeliver_unacknowledged_messages()
250+
251+
@property
252+
def last_message_id(self) -> int:
253+
return self._consumer.last_message_id
254+
255+
@property
256+
def is_connected(self) -> bool:
257+
return self._consumer.is_connected()
258+
259+
@property
260+
def subscription_name(self) -> str:
261+
return self._consumer.subscription_name()
262+
263+
@property
264+
def topic(self) -> str:
265+
return self._consumer.topic()
266+
267+
119268
class Client:
120269
"""
121270
The asynchronous version of `pulsar.Client`.
@@ -125,7 +274,18 @@ def __init__(self, service_url, **kwargs) -> None:
125274
"""
126275
See `pulsar.Client.__init__`
127276
"""
128-
self._client: _pulsar.Client = pulsar.Client(service_url, **kwargs)._client
277+
assert service_url.startswith('pulsar://'), "The service url must start with 'pulsar://'"
278+
self._client = pulsar.Client(service_url, **kwargs)._client
279+
280+
async def subscribe(self, topics: str, subscription_name: str, consumer_type: _pulsar.ConsumerType,
281+
schema: Optional[_pulsar.SchemaInfo] = _pulsar.SchemaInfo(_pulsar.SchemaType.BYTES, "bytes", "")) -> Consumer:
282+
conf = _pulsar.ConsumerConfiguration()
283+
conf.consumer_type(consumer_type)
284+
conf.schema(schema)
285+
286+
future = asyncio.get_running_loop().create_future()
287+
self._client.subscribe_async(topics, subscription_name, conf, functools.partial(_set_future, future))
288+
return Consumer(await future)
129289

130290
async def create_producer(self, topic: str) -> Producer:
131291
"""
@@ -163,10 +323,20 @@ async def close(self) -> None:
163323
self._client.close_async(functools.partial(_set_future, future, value=None))
164324
await future
165325

326+
async def get_topic_partitions(self, topic: str):
327+
future = asyncio.get_running_loop().create_future()
328+
self._client.get_partitions_for_topic_async(topic, functools.partial(_set_future, future))
329+
return await future
330+
331+
def shutdown(self) -> None:
332+
self._client.shutdown()
333+
334+
166335
def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any):
167336
def complete():
168337
if result == _pulsar.Result.Ok:
169338
future.set_result(value)
170339
else:
171340
future.set_exception(PulsarException(result))
341+
172342
future.get_loop().call_soon_threadsafe(complete)

src/client.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ Consumer Client_subscribe(Client& client, const std::string& topic, const std::s
4141
[&](SubscribeCallback callback) { client.subscribeAsync(topic, subscriptionName, conf, callback); });
4242
}
4343

44+
void Client_subscribeAsync(Client& client, const std::string& topic, const std::string& subscriptionName,
45+
const ConsumerConfiguration& conf, SubscribeCallback callback) {
46+
py::gil_scoped_release release;
47+
client.subscribeAsync(topic, subscriptionName, conf, callback);
48+
}
49+
4450
Consumer Client_subscribe_topics(Client& client, const std::vector<std::string>& topics,
4551
const std::string& subscriptionName, const ConsumerConfiguration& conf) {
4652
return waitForAsyncValue<Consumer>(
@@ -86,6 +92,7 @@ void export_client(py::module_& m) {
8692
.def("create_producer", &Client_createProducer)
8793
.def("create_producer_async", &Client_createProducerAsync)
8894
.def("subscribe", &Client_subscribe)
95+
.def("subscribe_async", &Client_subscribeAsync)
8996
.def("subscribe_topics", &Client_subscribe_topics)
9097
.def("subscribe_pattern", &Client_subscribe_pattern)
9198
.def("create_reader", &Client_createReader)

src/consumer.cc

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "utils.h"
2020

2121
#include <pulsar/Consumer.h>
22+
#include <pybind11/functional.h>
2223
#include <pybind11/pybind11.h>
2324
#include <pybind11/stl.h>
2425

@@ -32,6 +33,11 @@ Message Consumer_receive(Consumer& consumer) {
3233
return waitForAsyncValue<Message>([&](ReceiveCallback callback) { consumer.receiveAsync(callback); });
3334
}
3435

36+
void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) {
37+
py::gil_scoped_acquire acquire;
38+
consumer.receiveAsync(callback);
39+
}
40+
3541
Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) {
3642
Message msg;
3743
Result res;
@@ -54,6 +60,33 @@ void Consumer_acknowledge(Consumer& consumer, const Message& msg) {
5460
waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeAsync(msg, callback); });
5561
}
5662

63+
void Consumer_acknowledgeAsync(Consumer& consumer, const Message& msg, py::object callback) {
64+
// Capture the callback by value to ensure it's safe to use in the async context
65+
auto py_callback = std::make_shared<py::object>(callback);
66+
67+
// Call the Pulsar asynchronous acknowledge method
68+
consumer.acknowledgeAsync(msg, [py_callback](pulsar::Result result) {
69+
// Acquire the Python GIL before calling any Python code
70+
py::gil_scoped_acquire acquire;
71+
72+
try {
73+
if (result == pulsar::ResultOk) {
74+
py::print("result ok");
75+
// Call the Python callback with None if the acknowledgment was successful
76+
(*py_callback)(pulsar::ResultOk, py::none());
77+
py::print("callback called");
78+
} else {
79+
py::print("error");
80+
// Raise a Python exception for failure
81+
PyErr_SetString(PyExc_Exception, "AcknowledgeAsync failed");
82+
(*py_callback)(pulsar::ResultOk, py::none()); // Or pass some error object to indicate failure
83+
}
84+
} catch (const py::error_already_set& e) {
85+
throw py::error_already_set();
86+
}
87+
});
88+
}
89+
5790
void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
5891
waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeAsync(msgId, callback); });
5992
}
@@ -81,6 +114,11 @@ void Consumer_close(Consumer& consumer) {
81114
waitForAsyncResult([&consumer](ResultCallback callback) { consumer.closeAsync(callback); });
82115
}
83116

117+
void Consumer_closeAsync(Consumer& consumer, ResultCallback callback){
118+
py::gil_scoped_acquire acquire;
119+
consumer.closeAsync(callback);
120+
}
121+
84122
void Consumer_pauseMessageListener(Consumer& consumer) { CHECK_RESULT(consumer.pauseMessageListener()); }
85123

86124
void Consumer_resumeMessageListener(Consumer& consumer) { CHECK_RESULT(consumer.resumeMessageListener()); }
@@ -116,14 +154,17 @@ void export_consumer(py::module_& m) {
116154
.def("unsubscribe", &Consumer_unsubscribe)
117155
.def("receive", &Consumer_receive)
118156
.def("receive", &Consumer_receive_timeout)
157+
.def("receive_async", &Consumer_receiveAsync)
119158
.def("batch_receive", &Consumer_batch_receive)
120159
.def("acknowledge", &Consumer_acknowledge)
121160
.def("acknowledge", &Consumer_acknowledge_message_id)
161+
.def("acknowledge_async", &Consumer_acknowledgeAsync)
122162
.def("acknowledge_cumulative", &Consumer_acknowledge_cumulative)
123163
.def("acknowledge_cumulative", &Consumer_acknowledge_cumulative_message_id)
124164
.def("negative_acknowledge", &Consumer_negative_acknowledge)
125165
.def("negative_acknowledge", &Consumer_negative_acknowledge_message_id)
126166
.def("close", &Consumer_close)
167+
.def("close_async", &Consumer_closeAsync)
127168
.def("pause_message_listener", &Consumer_pauseMessageListener)
128169
.def("resume_message_listener", &Consumer_resumeMessageListener)
129170
.def("redeliver_unacknowledged_messages", &Consumer::redeliverUnacknowledgedMessages)

0 commit comments

Comments
 (0)