Skip to content

Commit 37bee34

Browse files
author
Shubham
committed
Add change listener to notify client on incoming changes
1 parent baf6d9c commit 37bee34

File tree

2 files changed

+63
-5
lines changed

2 files changed

+63
-5
lines changed

objectbox/c.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1273,6 +1273,10 @@ class OBX_sync_msg_objects_builder(ctypes.Structure):
12731273
# void obx_sync_listener_complete(OBX_sync* sync, OBX_sync_listener_complete* listener, void* listener_arg);
12741274
obx_sync_listener_error = c_fn('obx_sync_listener_error', None, [OBX_sync_p, OBX_sync_listener_error, ctypes.c_void_p])
12751275

1276+
# void obx_sync_listener_change(OBX_sync* sync, OBX_sync_listener_change* listener, void* listener_arg);
1277+
obx_sync_listener_change = c_fn('obx_sync_listener_change', None,
1278+
[OBX_sync_p, OBX_sync_listener_change, ctypes.c_void_p])
1279+
12761280
# Filter Variables
12771281

12781282
# obx_err obx_sync_filter_variables_put(OBX_sync* sync, const char* name, const char* value);

objectbox/sync.py

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
import objectbox.c as c
55
from objectbox import Store
6+
from objectbox.c import OBX_sync_change_array
7+
68

79
class SyncCredentials:
810
"""Credentials used to authenticate a sync client against a server."""
@@ -245,21 +247,17 @@ class SyncCode(IntEnum):
245247
class SyncChange:
246248
"""Sync incoming data event."""
247249

248-
def __init__(self, entity_id: int, entity: type, puts: list[int], removals: list[int]):
250+
def __init__(self, entity_id: int, puts: list[int], removals: list[int]):
249251
"""Creates a SyncChange event.
250252
251253
Args:
252254
entity_id: Entity ID this change relates to.
253-
entity: Entity type this change relates to.
254255
puts: List of "put" (inserted/updated) object IDs.
255256
removals: List of removed object IDs.
256257
"""
257258
self.entity_id = entity_id
258259
"""Entity ID this change relates to."""
259260

260-
self.entity = entity
261-
"""Entity type this change relates to."""
262-
263261
self.puts = puts
264262
"""List of "put" (inserted/updated) object IDs."""
265263

@@ -319,6 +317,17 @@ def on_error(self, sync_error_code: int):
319317
pass
320318

321319

320+
class SyncChangeListener:
321+
322+
def on_change(self, sync_changes: list[SyncChange]):
323+
"""Called when incoming data changes are received from the server.
324+
325+
Args:
326+
sync_changes: List of SyncChange events representing the changes.
327+
"""
328+
pass
329+
330+
322331
class SyncClient:
323332
"""Sync client is used to connect to an ObjectBox sync server.
324333
@@ -336,6 +345,7 @@ def __init__(self, store: Store, server_urls: list[str],
336345
server_urls: List of server URLs to connect to.
337346
filter_variables: Optional dictionary of filter variable names to values.
338347
"""
348+
self.__c_change_listener = None
339349
self.__c_login_listener = None
340350
self.__c_login_failure_listener = None
341351
self.__c_connect_listener = None
@@ -547,6 +557,12 @@ def close(self):
547557
It can no longer be used afterwards, make a new sync client instead.
548558
Does nothing if this sync client has already been closed.
549559
"""
560+
c.obx_sync_listener_error(self.__c_sync_client_ptr, None, None)
561+
c.obx_sync_listener_login(self.__c_sync_client_ptr, None, None)
562+
c.obx_sync_listener_login_failure(self.__c_sync_client_ptr, None, None)
563+
c.obx_sync_listener_connect(self.__c_sync_client_ptr, None, None)
564+
c.obx_sync_listener_disconnect(self.__c_sync_client_ptr, None, None)
565+
c.obx_sync_listener_change(self.__c_sync_client_ptr, None, None)
550566
c.obx_sync_close(self.__c_sync_client_ptr)
551567
self.__c_sync_client_ptr = None
552568

@@ -610,6 +626,44 @@ def set_error_listener(self, error_listener: SyncErrorListener):
610626
None
611627
)
612628

629+
def set_change_listener(self, change_listener: SyncChangeListener):
630+
"""Sets a listener to observe incoming data changes from the server.
631+
632+
Args:
633+
change_listener: The listener to receive change events.
634+
"""
635+
self.__check_sync_ptr_not_null()
636+
637+
def c_change_callback(arg, sync_change_array_ptr):
638+
sync_change_array = ctypes.cast(sync_change_array_ptr, ctypes.POINTER(OBX_sync_change_array)).contents
639+
changes: list[SyncChange] = []
640+
for i in range(sync_change_array.count):
641+
c_sync_change: c.OBX_sync_change = sync_change_array.list[i]
642+
puts = []
643+
if c_sync_change.puts:
644+
c_puts_id_array: c.OBX_id_array = ctypes.cast(c_sync_change.puts, c.OBX_id_array_p).contents
645+
puts = list(
646+
ctypes.cast(c_puts_id_array.ids, ctypes.POINTER(c.obx_id * c_puts_id_array.count)).contents)
647+
removals = []
648+
if c_sync_change.removals:
649+
c_removals_id_array: c.OBX_id_array = ctypes.cast(c_sync_change.removals, c.OBX_id_array_p).contents
650+
removals = list(
651+
ctypes.cast(c_removals_id_array.ids,
652+
ctypes.POINTER(c.obx_id * c_removals_id_array.count)).contents)
653+
changes.append(SyncChange(
654+
entity_id=c_sync_change.entity_id,
655+
puts=puts,
656+
removals=removals
657+
))
658+
change_listener.on_change(changes)
659+
660+
self.__c_change_listener = c.OBX_sync_listener_change(c_change_callback)
661+
c.obx_sync_listener_change(
662+
self.__c_sync_client_ptr,
663+
self.__c_change_listener,
664+
None
665+
)
666+
613667
def wait_for_logged_in_state(self, timeout_millis: int):
614668
"""Waits for the sync client to reach the logged-in state.
615669

0 commit comments

Comments
 (0)