Skip to content

Commit 705976f

Browse files
committed
Support for_each
1 parent e3a152d commit 705976f

File tree

3 files changed

+30
-6
lines changed

3 files changed

+30
-6
lines changed

pulsar/tableview.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
The TableView implementation.
2222
"""
2323

24-
from typing import Any, Optional
24+
from typing import Any, Callable, Optional
2525
from pulsar.schema.schema import Schema
2626
import _pulsar
2727

@@ -53,10 +53,18 @@ def get(self, key: str) -> Optional[Any]:
5353
return self._schema.decode(pair[1])
5454
else:
5555
return None
56-
#value = self._table_view.get(key)
57-
#if value is None:
58-
# return None
59-
#return self._schema.decode(value)
56+
57+
def for_each(self, callback: Callable[[str, Any], None]) -> None:
58+
"""
59+
Iterate over all entries in the table view and call the callback function
60+
with the key and value for each entry.
61+
62+
Parameters
63+
----------
64+
callback: Callable[[str, Any], None]
65+
The callback function to call for each entry.
66+
"""
67+
self._table_view.for_each(lambda k, v: callback(k, self._schema.decode(v)))
6068

6169
def close(self) -> None:
6270
"""

src/table_view.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <pulsar/Schema.h>
2222
#include <pulsar/TableViewConfiguration.h>
2323
#include <pybind11/stl.h>
24+
#include <pybind11/functional.h>
2425
#include <utility>
2526
#include "utils.h"
2627

@@ -50,6 +51,7 @@ void export_table_view(py::module_& m) {
5051
}
5152
})
5253
.def("size", &TableView::size, py::call_guard<py::gil_scoped_release>())
54+
.def("for_each", &TableView::forEach, py::call_guard<py::gil_scoped_release>())
5355
.def("close", [](TableView& view) {
5456
waitForAsyncResult([&view](ResultCallback callback) { view.closeAsync(callback); });
5557
});

tests/table_view_test.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class TableViewTest(TestCase):
2929
def setUp(self):
3030
self._client: Client = Client('pulsar://localhost:6650')
3131

32-
def tearDown(self) -> None:
32+
def tearDown(self):
3333
self._client.close()
3434

3535
def test_get(self):
@@ -51,6 +51,20 @@ def test_get(self):
5151
producer.close()
5252
table_view.close()
5353

54+
def test_for_each(self):
55+
topic = f'table_view_test_for_each-{time.time()}'
56+
table_view = self._client.create_table_view(topic)
57+
producer = self._client.create_producer(topic)
58+
producer.send('value-0'.encode(), partition_key='key-0')
59+
producer.send('value-1'.encode(), partition_key='key-1')
60+
self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2))
61+
62+
d = dict()
63+
table_view.for_each(lambda key, value: d.__setitem__(key, value))
64+
self.assertEqual(d, {
65+
'key-0': 'value-0',
66+
'key-1': 'value-1'
67+
})
5468

5569
def _wait_for_assertion(self, assertion: Callable, timeout=5) -> None:
5670
start_time = time.time()

0 commit comments

Comments
 (0)