Skip to content

Commit cfd5c67

Browse files
committed
Add simple TableView API
1 parent 9ce5e8b commit cfd5c67

File tree

5 files changed

+180
-3
lines changed

5 files changed

+180
-3
lines changed

pulsar/__init__.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,6 +1126,42 @@ def my_listener(reader, message):
11261126
self._consumers.append(c)
11271127
return c
11281128

1129+
def create_table_view(self, topic: str,
1130+
subscription_name: Optional[str] = None,
1131+
schema: schema.Schema = schema.BytesSchema()) -> TableView:
1132+
"""
1133+
Create a table view on a particular topic
1134+
1135+
Parameters
1136+
----------
1137+
1138+
topic: str
1139+
The name of the topic.
1140+
subscription_name: str, optional
1141+
The name of the subscription. If it's not specified, a random subscription name
1142+
will be used.
1143+
schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema
1144+
Define the schema of this table view. If the schema is incompatible with the topic's
1145+
schema, this method will throw an exception. This schema is also used to deserialize
1146+
the value of messages in the table view.
1147+
1148+
Returns
1149+
-------
1150+
TableView
1151+
A table view instance.
1152+
"""
1153+
_check_type(str, topic, 'topic')
1154+
_check_type_or_none(str, subscription_name, 'subscription_name')
1155+
_check_type(_schema.Schema, schema, 'schema')
1156+
1157+
tv_conf = _pulsar.TableViewConfiguration()
1158+
if subscription_name is not None:
1159+
tv_conf.subscription_name(subscription_name)
1160+
tv_conf.schema(schema.schema_info())
1161+
tv = self._client.create_table_view(topic, tv_conf)
1162+
self._table_view = TableView(tv, topic, subscription_name, schema)
1163+
return self._table_view
1164+
11291165
def get_topic_partitions(self, topic):
11301166
"""
11311167
Get the list of partitions for a given topic.

pulsar/tableview.py

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,60 @@
2121
The TableView implementation.
2222
"""
2323

24+
from typing import Any, Optional
25+
from pulsar.schema.schema import Schema
26+
import _pulsar
27+
2428
class TableView():
2529

26-
def __init__(self) -> None:
27-
pass
30+
def __init__(self, table_view: _pulsar.TableView, topic: str,
31+
subscription: Optional[str], schema: Schema) -> None:
32+
self._table_view = table_view
33+
self._topic = topic
34+
self._subscription = subscription
35+
self._schema = schema
36+
37+
def get(self, key: str) -> Optional[Any]:
38+
"""
39+
Return the value associated with the given key in the table view.
40+
41+
Parameters
42+
----------
43+
key: str
44+
The message key
45+
46+
Returns
47+
-------
48+
Optional[Any]
49+
The value associated with the key, or None if the key does not exist.
50+
"""
51+
pair = self._table_view.get(key)
52+
if pair[0]:
53+
return self._schema.decode(pair[1])
54+
else:
55+
return None
56+
#value = self._table_view.get(key)
57+
#if value is None:
58+
# return None
59+
#return self._schema.decode(value)
60+
61+
def close(self) -> None:
62+
"""
63+
Close the table view.
64+
"""
65+
self._table_view.close()
66+
67+
def __len__(self) -> int:
68+
"""
69+
Return the number of entries in the table view.
70+
"""
71+
return self._table_view.size()
72+
73+
def __str__(self) -> str:
74+
if self._subscription is None:
75+
return f"TableView(topic={self._topic})"
76+
else:
77+
return f"TableView(topic={self._topic}, subscription={self._subscription})"
78+
79+
def __repr__(self) -> str:
80+
return self.__str__()

src/table_view.cc

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
#include <pulsar/TableView.h>
2121
#include <pulsar/Schema.h>
2222
#include <pulsar/TableViewConfiguration.h>
23+
#include <pybind11/stl.h>
24+
#include <utility>
25+
#include "utils.h"
2326

2427
namespace py = pybind11;
2528
using namespace pulsar;
@@ -32,5 +35,22 @@ void export_table_view(py::module_& m) {
3235
.def("schema",
3336
[](TableViewConfiguration& config, const SchemaInfo& schema) { config.schemaInfo = schema; });
3437

35-
py::class_<TableView>(m, "TableView").def(py::init<>());
38+
py::class_<TableView>(m, "TableView")
39+
.def(py::init<>())
40+
.def("get",
41+
[](const TableView& view, const std::string& key) -> std::pair<bool, py::bytes> {
42+
py::gil_scoped_release release;
43+
std::string value;
44+
bool available = view.getValue(key, value);
45+
py::gil_scoped_acquire acquire;
46+
if (available) {
47+
return std::make_pair(true, py::bytes(std::move(value)));
48+
} else {
49+
return std::make_pair(false, py::bytes());
50+
}
51+
})
52+
.def("size", &TableView::size, py::call_guard<py::gil_scoped_release>())
53+
.def("close", [](TableView& view) {
54+
waitForAsyncResult([&view](ResultCallback callback) { view.closeAsync(callback); });
55+
});
3656
}

tests/run-unit-tests.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,6 @@ python3 debug_logger_test.py
2828
python3 interrupted_test.py
2929
python3 pulsar_test.py
3030
python3 schema_test.py
31+
python3 table_view_test.py
3132
python3 reader_test.py
3233
python3 asyncio_test.py

tests/table_view_test.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
21+
from typing import Callable
22+
from unittest import TestCase, main
23+
import time
24+
25+
from pulsar import Client
26+
27+
class TableViewTest(TestCase):
28+
29+
def setUp(self):
30+
self._client: Client = Client('pulsar://localhost:6650')
31+
32+
def tearDown(self) -> None:
33+
self._client.close()
34+
35+
def test_get(self):
36+
topic = f'table_view_test_get-{time.time()}'
37+
table_view = self._client.create_table_view(topic)
38+
self.assertEqual(len(table_view), 0)
39+
40+
producer = self._client.create_producer(topic)
41+
producer.send('value-0'.encode(), partition_key='key-0')
42+
producer.send(b'\xba\xd0\xba\xd0', partition_key='key-1') # an invalid UTF-8 bytes
43+
44+
self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2))
45+
self.assertEqual(table_view.get('key-0'), b'value-0')
46+
self.assertEqual(table_view.get('key-1'), b'\xba\xd0\xba\xd0')
47+
48+
producer.send('value-1'.encode(), partition_key='key-0')
49+
# TODO: Upgrade to C++ client 3.7.1 to include https://github.com/apache/pulsar-client-cpp/pull/487
50+
#self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key-0'), b'value-1'))
51+
52+
producer.close()
53+
table_view.close()
54+
55+
56+
def _wait_for_assertion(self, assertion: Callable, timeout=5) -> None:
57+
start_time = time.time()
58+
while time.time() - start_time < timeout:
59+
try:
60+
assertion()
61+
return
62+
except AssertionError:
63+
time.sleep(0.1)
64+
assertion()
65+
66+
if __name__ == "__main__":
67+
main()

0 commit comments

Comments
 (0)