Skip to content

Commit a9edd55

Browse files
author
Shubham
committed
start implement sync client
1 parent 9b38d97 commit a9edd55

File tree

3 files changed

+315
-0
lines changed

3 files changed

+315
-0
lines changed

objectbox/c.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,123 @@ class OBX_query(ctypes.Structure):
251251

252252
OBX_query_p = ctypes.POINTER(OBX_query)
253253

254+
255+
# Sync types
256+
class OBX_sync(ctypes.Structure):
257+
pass
258+
259+
260+
OBX_sync_p = ctypes.POINTER(OBX_sync)
261+
262+
263+
class OBX_sync_server(ctypes.Structure):
264+
pass
265+
266+
267+
OBX_sync_server_p = ctypes.POINTER(OBX_sync_server)
268+
269+
270+
class OBXSyncCredentialsType(IntEnum):
271+
NONE = 1
272+
SHARED_SECRET = 2 # Deprecated, use SHARED_SECRET_SIPPED instead
273+
GOOGLE_AUTH = 3
274+
SHARED_SECRET_SIPPED = 4 # Uses shared secret to create a hashed credential
275+
OBX_ADMIN_USER = 5 # ObjectBox admin users (username/password)
276+
USER_PASSWORD = 6 # Generic credential type for admin users
277+
JWT_ID = 7 # JSON Web Token (JWT): ID token with user identity
278+
JWT_ACCESS = 8 # JSON Web Token (JWT): access token for resources
279+
JWT_REFRESH = 9 # JSON Web Token (JWT): refresh token
280+
JWT_CUSTOM = 10 # JSON Web Token (JWT): custom token type
281+
282+
283+
class OBXRequestUpdatesMode(IntEnum):
284+
MANUAL = 0 # No updates by default, must call obx_sync_updates_request() manually
285+
AUTO = 1 # Same as calling obx_sync_updates_request(sync, TRUE)
286+
AUTO_NO_PUSHES = 2 # Same as calling obx_sync_updates_request(sync, FALSE)
287+
288+
289+
class OBXSyncState(IntEnum):
290+
CREATED = 1
291+
STARTED = 2
292+
CONNECTED = 3
293+
LOGGED_IN = 4
294+
DISCONNECTED = 5
295+
STOPPED = 6
296+
DEAD = 7
297+
298+
299+
class OBXSyncCode(IntEnum):
300+
OK = 20
301+
REQ_REJECTED = 40
302+
CREDENTIALS_REJECTED = 43
303+
UNKNOWN = 50
304+
AUTH_UNREACHABLE = 53
305+
BAD_VERSION = 55
306+
CLIENT_ID_TAKEN = 61
307+
TX_VIOLATED_UNIQUE = 71
308+
309+
310+
class OBXSyncError(IntEnum):
311+
REJECT_TX_NO_PERMISSION = 1 # Sync client received rejection of transaction writes due to missing permissions
312+
313+
314+
class OBXSyncObjectType(IntEnum):
315+
FlatBuffers = 1
316+
String = 2
317+
Raw = 3
318+
319+
320+
class OBX_sync_change(ctypes.Structure):
321+
_fields_ = [
322+
('entity_id', obx_schema_id),
323+
('puts', ctypes.POINTER(OBX_id_array)),
324+
('removals', ctypes.POINTER(OBX_id_array)),
325+
]
326+
327+
328+
class OBX_sync_change_array(ctypes.Structure):
329+
_fields_ = [
330+
('list', ctypes.POINTER(OBX_sync_change)),
331+
('count', ctypes.c_size_t),
332+
]
333+
334+
335+
class OBX_sync_object(ctypes.Structure):
336+
_fields_ = [
337+
('type', ctypes.c_int), # OBXSyncObjectType
338+
('id', ctypes.c_uint64),
339+
('data', ctypes.c_void_p),
340+
('size', ctypes.c_size_t),
341+
]
342+
343+
344+
class OBX_sync_msg_objects(ctypes.Structure):
345+
_fields_ = [
346+
('topic', ctypes.c_void_p),
347+
('topic_size', ctypes.c_size_t),
348+
('objects', ctypes.POINTER(OBX_sync_object)),
349+
('count', ctypes.c_size_t),
350+
]
351+
352+
353+
class OBX_sync_msg_objects_builder(ctypes.Structure):
354+
pass
355+
356+
357+
OBX_sync_msg_objects_builder_p = ctypes.POINTER(OBX_sync_msg_objects_builder)
358+
359+
# Define callback types for sync listeners
360+
OBX_sync_listener_connect = ctypes.CFUNCTYPE(None, ctypes.c_void_p)
361+
OBX_sync_listener_disconnect = ctypes.CFUNCTYPE(None, ctypes.c_void_p)
362+
OBX_sync_listener_login = ctypes.CFUNCTYPE(None, ctypes.c_void_p)
363+
OBX_sync_listener_login_failure = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_int) # arg, OBXSyncCode
364+
OBX_sync_listener_complete = ctypes.CFUNCTYPE(None, ctypes.c_void_p)
365+
OBX_sync_listener_error = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_int) # arg, OBXSyncError
366+
OBX_sync_listener_change = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.POINTER(OBX_sync_change_array))
367+
OBX_sync_listener_server_time = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_int64)
368+
OBX_sync_listener_msg_objects = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.POINTER(OBX_sync_msg_objects))
369+
370+
254371
# manually configure error methods, we can't use `fn()` defined below yet due to circular dependencies
255372
C.obx_last_error_message.restype = ctypes.c_char_p
256373
C.obx_last_error_code.restype = obx_err
@@ -310,6 +427,11 @@ def check_obx_err(code: obx_err, func, args) -> obx_err:
310427
raise create_db_error(code)
311428
return code
312429

430+
def check_obx_success(code: obx_err) -> bool:
431+
if code == DbErrorCode.OBX_NO_SUCCESS:
432+
return False
433+
check_obx_err(code, None, None)
434+
return True
313435

314436
def check_obx_qb_cond(qb_cond: obx_qb_cond, func, args) -> obx_qb_cond:
315437
""" Raises an exception if obx_qb_cond is not successful. """
@@ -1068,3 +1190,30 @@ def c_array_pointer(py_list: Union[List[Any], np.ndarray], c_type):
10681190

10691191
OBXBackupRestoreFlags_None = 0
10701192
OBXBackupRestoreFlags_OverwriteExistingData = 1
1193+
1194+
obx_sync = c_fn("obx_sync", obx_err, [OBX_store_p, ctypes.c_char_p])
1195+
obx_sync_urls = c_fn("obx_sync_urls", obx_err, [OBX_store_p, ctypes.POINTER(ctypes.c_char_p), ctypes.c_size_t])
1196+
1197+
1198+
obx_sync_credentials = c_fn_rc('obx_sync_credentials',
1199+
[OBX_sync_p, OBXSyncCredentialsType, ctypes.c_void_p, ctypes.c_size_t])
1200+
obx_sync_credentials_user_password = c_fn_rc('obx_sync_credentials_user_password',
1201+
[OBX_sync_p, OBXSyncCredentialsType, ctypes.c_char_p,
1202+
ctypes.c_char_p])
1203+
obx_sync_credentials_add = c_fn_rc('obx_sync_credentials_add',
1204+
[OBX_sync_p, OBXSyncCredentialsType, ctypes.c_void_p, ctypes.c_size_t, ctypes.c_bool])
1205+
obx_sync_credentials_add_user_password = c_fn_rc('obx_sync_credentials_add_user_password',
1206+
[OBX_sync_p, OBXSyncCredentialsType, ctypes.c_char_p, ctypes.c_char_p,
1207+
ctypes.c_bool])
1208+
1209+
obx_sync_request_updates_mode = c_fn_rc('obx_sync_request_updates_mode', [OBX_sync_p, OBXRequestUpdatesMode])
1210+
1211+
obx_sync_start = c_fn_rc('obx_sync_start', [OBX_sync_p])
1212+
obx_sync_stop = c_fn_rc('obx_sync_stop', [OBX_sync_p])
1213+
1214+
obx_sync_trigger_reconnect = c_fn_rc('obx_sync_trigger_reconnect', [OBX_sync_p])
1215+
1216+
obx_sync_protocol_version = c_fn('obx_sync_protocol_version', ctypes.c_uint32, [])
1217+
obx_sync_protocol_version_server = c_fn('obx_sync_protocol_version_server', ctypes.c_uint32, [OBX_sync_p])
1218+
1219+
obx_sync_close = c_fn_rc('obx_sync_close', [OBX_sync_p])

objectbox/store.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,3 +285,6 @@ def remove_db_files(db_dir: str):
285285
Path to DB directory.
286286
"""
287287
c.obx_remove_db_files(c.c_str(db_dir))
288+
289+
def c_store(self):
290+
return self._c_store

objectbox/sync.py

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import ctypes
2+
import c as c
3+
from objectbox import Store
4+
from objectbox.c import c_array_pointer
5+
6+
7+
class SyncCredentials:
8+
9+
def __init__(self, credential_type: c.OBXSyncCredentialsType):
10+
self.type = credential_type
11+
12+
@staticmethod
13+
def none() -> 'SyncCredentials':
14+
return SyncCredentialsNone()
15+
16+
@staticmethod
17+
def shared_secret_string(secret: str) -> 'SyncCredentials':
18+
return SyncCredentialsSecret(c.OBXSyncCredentialsType.SHARED_SECRET_SIPPED, secret.encode('utf-8'))
19+
20+
@staticmethod
21+
def google_auth(secret: str) -> 'SyncCredentials':
22+
return SyncCredentialsSecret(c.OBXSyncCredentialsType.GOOGLE_AUTH, secret.encode('utf-8'))
23+
24+
@staticmethod
25+
def user_and_password(username: str, password: str) -> 'SyncCredentials':
26+
return SyncCredentialsUserPassword(c.OBXSyncCredentialsType.USER_PASSWORD, username, password)
27+
28+
@staticmethod
29+
def jwt_id_token(jwt_id_token: str) -> 'SyncCredentials':
30+
return SyncCredentialsSecret(c.OBXSyncCredentialsType.JWT_ID, jwt_id_token.encode('utf-8'))
31+
32+
@staticmethod
33+
def jwt_access_token(jwt_access_token: str) -> 'SyncCredentials':
34+
return SyncCredentialsSecret(c.OBXSyncCredentialsType.JWT_ACCESS, jwt_access_token.encode('utf-8'))
35+
36+
@staticmethod
37+
def jwt_refresh_token(jwt_refresh_token: str) -> 'SyncCredentials':
38+
return SyncCredentialsSecret(c.OBXSyncCredentialsType.JWT_REFRESH, jwt_refresh_token.encode('utf-8'))
39+
40+
@staticmethod
41+
def jwt_custom_token(jwt_custom_token: str) -> 'SyncCredentials':
42+
return SyncCredentialsSecret(c.OBXSyncCredentialsType.JWT_CUSTOM, jwt_custom_token.encode('utf-8'))
43+
44+
45+
class SyncCredentialsNone(SyncCredentials):
46+
def __init__(self):
47+
super().__init__(c.OBXSyncCredentialsType.NONE)
48+
49+
50+
class SyncCredentialsSecret(SyncCredentials):
51+
def __init__(self, credential_type: c.OBXSyncCredentialsType, secret: bytes):
52+
super().__init__(credential_type)
53+
self.secret = secret
54+
55+
56+
class SyncCredentialsUserPassword(SyncCredentials):
57+
def __init__(self, credential_type: c.OBXSyncCredentialsType, username: str, password: str):
58+
super().__init__(credential_type)
59+
self.username = username
60+
self.password = password
61+
62+
63+
class SyncState:
64+
UNKNOWN = 'unknown'
65+
CREATED = 'created'
66+
STARTED = 'started'
67+
CONNECTED = 'connected'
68+
LOGGED_IN = 'logged_in'
69+
DISCONNECTED = 'disconnected'
70+
STOPPED = 'stopped'
71+
DEAD = 'dead'
72+
73+
74+
class SyncRequestUpdatesMode:
75+
MANUAL = 'manual'
76+
AUTO = 'auto'
77+
AUTO_NO_PUSHES = 'auto_no_pushes'
78+
79+
80+
class SyncConnectionEvent:
81+
CONNECTED = 'connected'
82+
DISCONNECTED = 'disconnected'
83+
84+
85+
class SyncLoginEvent:
86+
LOGGED_IN = 'logged_in'
87+
CREDENTIALS_REJECTED = 'credentials_rejected'
88+
UNKNOWN_ERROR = 'unknown_error'
89+
90+
91+
class SyncChange:
92+
def __init__(self, entity_id: int, entity: type, puts: list[int], removals: list[int]):
93+
self.entity_id = entity_id
94+
self.entity = entity
95+
self.puts = puts
96+
self.removals = removals
97+
98+
99+
class SyncClient:
100+
101+
def __init__(self, store: Store, server_urls: list[str], credentials: list[SyncCredentials],
102+
filter_variables: dict[str, str] | None = None):
103+
if not server_urls:
104+
raise ValueError("Provide at least one server URL")
105+
106+
if not Sync.is_available():
107+
raise RuntimeError(
108+
'Sync is not available in the loaded ObjectBox runtime library. '
109+
'Please visit https://objectbox.io/sync/ for options.')
110+
111+
self.__store = store
112+
self.__server_urls = server_urls
113+
self.__credentials = credentials
114+
115+
self.__c_sync_client_ptr = c.obx_sync_urls(store.c_store(), c_array_pointer(server_urls, ctypes.c_char_p),
116+
len(server_urls))
117+
118+
def set_credentials(self, credentials: SyncCredentials):
119+
if isinstance(credentials, SyncCredentialsNone):
120+
c.obx_sync_credentials(self.__c_sync_client_ptr, credentials.type, None, 0)
121+
elif isinstance(credentials, SyncCredentialsUserPassword):
122+
c.obx_sync_credentials_user_password(self.__c_sync_client_ptr,
123+
credentials.type,
124+
credentials.username.encode('utf-8'),
125+
credentials.password.encode('utf-8'))
126+
elif isinstance(credentials, SyncCredentialsSecret):
127+
c.obx_sync_credentials(self.__c_sync_client_ptr, credentials.type,
128+
credentials.secret,
129+
len(credentials.secret))
130+
131+
def set_request_updates_mode(self, mode: SyncRequestUpdatesMode):
132+
if mode == SyncRequestUpdatesMode.MANUAL:
133+
c_mode = c.OBXRequestUpdatesMode.MANUAL
134+
elif mode == SyncRequestUpdatesMode.AUTO:
135+
c_mode = c.OBXRequestUpdatesMode.AUTO
136+
elif mode == SyncRequestUpdatesMode.AUTO_NO_PUSHES:
137+
c_mode = c.OBXRequestUpdatesMode.AUTO_NO_PUSHES
138+
else:
139+
raise ValueError(f"Invalid mode: {mode}")
140+
c.obx_sync_request_updates_mode(self.__c_sync_client_ptr, c_mode)
141+
142+
def start(self):
143+
c.obx_sync_start(self.__c_sync_client_ptr)
144+
145+
def stop(self):
146+
c.obx_sync_stop(self.__c_sync_client_ptr)
147+
148+
def trigger_reconnect(self) -> bool:
149+
return c.check_obx_success(c.obx_sync_trigger_reconnect(self.__c_sync_client_ptr))
150+
151+
@staticmethod
152+
def protocol_version() -> int:
153+
return c.obx_sync_protocol_version()
154+
155+
def protocol_server_version(self) -> int:
156+
return c.obx_sync_protocol_version_server(self.__c_sync_client_ptr)
157+
158+
def close(self):
159+
c.obx_sync_close(self.__c_sync_client_ptr)
160+
self.__c_sync_client_ptr = None
161+
162+
def is_closed(self) -> bool:
163+
return self.__c_sync_client_ptr is None

0 commit comments

Comments
 (0)