Skip to content

Commit a4405b8

Browse files
committed
fix incorrect schema
1 parent e0b1de7 commit a4405b8

File tree

2 files changed

+23
-7
lines changed

2 files changed

+23
-7
lines changed

pulsar/asyncio.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,15 @@ def __init__(self, producer: _pulsar.Producer) -> None:
8585
"""
8686
self._producer: _pulsar.Producer = producer
8787

88-
async def send(self, content: bytes) -> pulsar.MessageId:
88+
async def send(self, content: Any) -> pulsar.MessageId:
8989
"""
9090
Send a message asynchronously.
9191
9292
parameters
9393
----------
94-
content: bytes
95-
The message payload
94+
content: Any
95+
The message payload, whose type should respect the schema defined in
96+
`Client.create_producer`.
9697
9798
Returns
9899
-------
@@ -132,7 +133,7 @@ class Consumer:
132133
The Pulsar message consumer, used to subscribe to messages from a topic.
133134
"""
134135

135-
def __init__(self, consumer: _pulsar.Consumer) -> None:
136+
def __init__(self, consumer: _pulsar.Consumer, schema: pulsar.schema.Schema) -> None:
136137
"""
137138
Create the consumer.
138139
Users should not call this constructor directly. Instead, create the
@@ -142,8 +143,11 @@ def __init__(self, consumer: _pulsar.Consumer) -> None:
142143
----------
143144
consumer: _pulsar.Consumer
144145
The underlying Consumer object from the C extension.
146+
schema: pulsar.schema.Schema
147+
The schema of the data that will be received by this consumer.
145148
"""
146-
self._consumer: _pulsar.Consumer = consumer
149+
self._consumer = consumer
150+
self._schema = schema
147151

148152
async def receive(self) -> pulsar.Message:
149153
"""
@@ -163,7 +167,7 @@ async def receive(self) -> pulsar.Message:
163167
msg = await future
164168
m = pulsar.Message()
165169
m._message = msg
166-
m._schema = pulsar.schema.BytesSchema()
170+
m._schema = self._schema
167171
return m
168172

169173
async def acknowledge(
@@ -633,7 +637,7 @@ async def subscribe(self, topic: Union[str, List[str]],
633637
"(str, list)"
634638
)
635639

636-
return Consumer(await future)
640+
return Consumer(await future, schema)
637641

638642
async def close(self) -> None:
639643
"""

tests/asyncio_test.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,18 @@ async def test_seek_timestamp(self):
252252
msg = await consumer.receive()
253253
self.assertEqual(msg.data(), b'msg-3')
254254

255+
async def test_schema(self):
256+
topic = f'asyncio-test-schema-{time.time()}'
257+
producer = await self._client.create_producer(
258+
topic, schema=pulsar.schema.StringSchema()
259+
)
260+
consumer = await self._client.subscribe(
261+
topic, 'sub', schema=pulsar.schema.StringSchema()
262+
)
263+
await producer.send('test-message')
264+
msg = await consumer.receive()
265+
self.assertEqual(msg.value(), 'test-message')
266+
255267

256268
if __name__ == '__main__':
257269
main()

0 commit comments

Comments
 (0)