Skip to content

Commit 0dc7284

Browse files
committed
Add for_each_and_listen
1 parent 705976f commit 0dc7284

File tree

3 files changed

+37
-0
lines changed

3 files changed

+37
-0
lines changed

pulsar/tableview.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,19 @@ def for_each(self, callback: Callable[[str, Any], None]) -> None:
6666
"""
6767
self._table_view.for_each(lambda k, v: callback(k, self._schema.decode(v)))
6868

69+
def for_each_and_listen(self, callback: Callable[[str, Any], None]) -> None:
70+
"""
71+
Iterate over all entries in the table view and call the callback function
72+
with the key and value for each entry, then listen for changes. The callback
73+
will be called when a new entry is added or an existing entry is updated.
74+
75+
Parameters
76+
----------
77+
callback: Callable[[str, Any], None]
78+
The callback function to call for each entry.
79+
"""
80+
self._table_view.for_each_and_listen(lambda k, v: callback(k, self._schema.decode(v)))
81+
6982
def close(self) -> None:
7083
"""
7184
Close the table view.

src/table_view.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ void export_table_view(py::module_& m) {
5252
})
5353
.def("size", &TableView::size, py::call_guard<py::gil_scoped_release>())
5454
.def("for_each", &TableView::forEach, py::call_guard<py::gil_scoped_release>())
55+
.def("for_each_and_listen", &TableView::forEachAndListen, py::call_guard<py::gil_scoped_release>())
5556
.def("close", [](TableView& view) {
5657
waitForAsyncResult([&view](ResultCallback callback) { view.closeAsync(callback); });
5758
});

tests/table_view_test.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,29 @@ def test_for_each(self):
6666
'key-1': 'value-1'
6767
})
6868

69+
def listener(key: str, value: str):
70+
if len(value) == 0:
71+
d.pop(key)
72+
else:
73+
d[key] = value
74+
75+
d.clear()
76+
table_view.for_each_and_listen(listener)
77+
self.assertEqual(d, {
78+
'key-0': 'value-0',
79+
'key-1': 'value-1'
80+
})
81+
82+
producer.send('value-0-new'.encode(), partition_key='key-0')
83+
producer.send(''.encode(), partition_key='key-1')
84+
producer.send('value-2'.encode(), partition_key='key-2')
85+
def assert_latest_values():
86+
self.assertEqual(d, {
87+
'key-0': 'value-0-new',
88+
'key-2': 'value-2'
89+
})
90+
self._wait_for_assertion(assert_latest_values)
91+
6992
def _wait_for_assertion(self, assertion: Callable, timeout=5) -> None:
7093
start_time = time.time()
7194
while time.time() - start_time < timeout:

0 commit comments

Comments
 (0)