Skip to content

Commit 2584878

Browse files
committed
fix schema tests
1 parent a4405b8 commit 2584878

File tree

2 files changed

+27
-12
lines changed

2 files changed

+27
-12
lines changed

pulsar/asyncio.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class Producer:
7272
The Pulsar message producer, used to publish messages on a topic.
7373
"""
7474

75-
def __init__(self, producer: _pulsar.Producer) -> None:
75+
def __init__(self, producer: _pulsar.Producer, schema: pulsar.schema.Schema) -> None:
7676
"""
7777
Create the producer.
7878
Users should not call this constructor directly. Instead, create the
@@ -82,8 +82,11 @@ def __init__(self, producer: _pulsar.Producer) -> None:
8282
----------
8383
producer: _pulsar.Producer
8484
The underlying Producer object from the C extension.
85+
schema: pulsar.schema.Schema
86+
The schema of the data that will be sent by this producer.
8587
"""
86-
self._producer: _pulsar.Producer = producer
88+
self._producer = producer
89+
self._schema = schema
8790

8891
async def send(self, content: Any) -> pulsar.MessageId:
8992
"""
@@ -105,7 +108,7 @@ async def send(self, content: Any) -> pulsar.MessageId:
105108
PulsarException
106109
"""
107110
builder = _pulsar.MessageBuilder()
108-
builder.content(content)
111+
builder.content(self._schema.encode(content))
109112
future = asyncio.get_running_loop().create_future()
110113
self._producer.send_async(builder.build(), functools.partial(_set_future, future))
111114
msg_id = await future
@@ -454,7 +457,8 @@ def underlying_router(msg, num_partitions):
454457
self._client.create_producer_async(
455458
topic, conf, functools.partial(_set_future, future)
456459
)
457-
return Producer(await future)
460+
schema.attach_client(self._client)
461+
return Producer(await future, schema)
458462

459463
# pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-positional-arguments
460464
async def subscribe(self, topic: Union[str, List[str]],
@@ -632,11 +636,9 @@ async def subscribe(self, topic: Union[str, List[str]],
632636
functools.partial(_set_future, future)
633637
)
634638
else:
635-
raise ValueError(
636-
"Argument 'topic' is expected to be of a type between "
637-
"(str, list)"
638-
)
639+
raise ValueError( "Argument 'topic' is expected to be of type 'str' or 'list'")
639640

641+
schema.attach_client(self._client)
640642
return Consumer(await future, schema)
641643

642644
async def close(self) -> None:

tests/asyncio_test.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@
3939
Producer,
4040
PulsarException,
4141
)
42+
from pulsar.schema import ( # pylint: disable=import-error
43+
AvroSchema,
44+
Integer,
45+
Record,
46+
String,
47+
)
4248

4349
SERVICE_URL = 'pulsar://localhost:6650'
4450

@@ -253,16 +259,23 @@ async def test_seek_timestamp(self):
253259
self.assertEqual(msg.data(), b'msg-3')
254260

255261
async def test_schema(self):
262+
class ExampleRecord(Record): # pylint: disable=too-few-public-methods
263+
"""Example record schema for testing."""
264+
str_field = String()
265+
int_field = Integer()
266+
256267
topic = f'asyncio-test-schema-{time.time()}'
257268
producer = await self._client.create_producer(
258-
topic, schema=pulsar.schema.StringSchema()
269+
topic, schema=AvroSchema(ExampleRecord)
259270
)
260271
consumer = await self._client.subscribe(
261-
topic, 'sub', schema=pulsar.schema.StringSchema()
272+
topic, 'sub', schema=AvroSchema(ExampleRecord)
262273
)
263-
await producer.send('test-message')
274+
await producer.send(ExampleRecord(str_field='test', int_field=42))
264275
msg = await consumer.receive()
265-
self.assertEqual(msg.value(), 'test-message')
276+
self.assertIsInstance(msg.value(), ExampleRecord)
277+
self.assertEqual(msg.value().str_field, 'test')
278+
self.assertEqual(msg.value().int_field, 42)
266279

267280

268281
if __name__ == '__main__':

0 commit comments

Comments
 (0)