Skip to content

Commit c881807

Browse files
committed
Fix: sqlite3.OperationalError: database is locked
Multiprocessing causes locking error when write and commits from multiple threads are interleaved. Make sure that commits are performed under the same Lock() session.
1 parent 803d759 commit c881807

File tree

2 files changed

+53
-46
lines changed

2 files changed

+53
-46
lines changed

opcua/common/sqlite3_backend.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,34 +39,45 @@ def execute_read(self, dbCmd = None, params = (), CB = None):
3939
c = self._get_conn().cursor()
4040
for row in c.execute(dbCmd, params):
4141
CB(row)
42+
c.close()
4243

43-
def execute_write(self, dbCmd = None, params = ()):
44+
def execute_write(self, dbCmd = None, params = (), commit=True):
4445
with self._lock:
45-
c = self._get_conn().cursor()
46-
c.execute(dbCmd, params)
47-
48-
def commit(self):
46+
conn = self._get_conn()
47+
if dbCmd is not None:
48+
c = conn.cursor()
49+
c.execute(dbCmd, params)
50+
c.close()
51+
if bool(commit) is True:
52+
conn.commit()
53+
self._wal_throttled()
54+
55+
def wal_throttled_threadsafe(self):
4956
with self._lock:
50-
self._get_conn().commit()
5157
self._wal_throttled()
5258

53-
def wal_checkpoint(self):
59+
def wal_checkpoint_threadsafe(self):
5460
"""
5561
Store checkpoint: forces database modifications to be persistent.
5662
Automatically done when sqlite cache runs over the 1000 pages threshold.
5763
IMPORTANT: slow operation, manual syncs are only useful for sporadic
5864
transactions that you really want to survive a power loss.
5965
"""
60-
self._lastCheckP = time.time()
61-
c = self._get_conn().cursor()
62-
c.execute('PRAGMA wal_checkpoint')
63-
66+
with self._lock:
67+
self._wal_checkpoint()
68+
6469
# PRIVATE METHODS
6570
def _wal_throttled(self):
6671
# commits still require a wal_checkpoint to become persistent.
6772
if abs(time.time() - self._lastCheckP) < self.CHECKP_INTERVAL:
6873
return
69-
self.wal_checkpoint()
74+
self._wal_checkpoint()
75+
76+
def _wal_checkpoint(self):
77+
self._lastCheckP = time.time()
78+
c = self._get_conn().cursor()
79+
c.execute('PRAGMA wal_checkpoint')
80+
c.close()
7081

7182
def _db_connect(self):
7283
CID = SQLite3Backend._getCID()
@@ -101,9 +112,7 @@ def _db_connect_py2(self, CID):
101112
def _db_disconnect(self):
102113
# Commit, checkpoint.
103114
if self.readonly is False:
104-
with self._lock:
105-
self._get_conn().commit()
106-
self.wal_checkpoint()
115+
self.wal_checkpoint_threadsafe()
107116
# Close all connections to database.
108117
for CID in self._conn:
109118
self._conn[CID].close()

opcua/server/address_space_sqlite.py

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def __init__(self, aspace, ndata):
5757

5858
def __setitem__(self, attrId, attr):
5959
def onchange_cb():
60-
self.aspace._insert_attribute_threadsafe(self.nodeid, attrId, self[attrId])
60+
self.aspace._insert_attribute_threadsafe(self.nodeid, attrId, self[attrId], commit=True)
6161
mAttr = MonitoredAttribute(attr, onchange_cb)
6262
dict.__setitem__(self, attrId, mAttr)
6363
mAttr.onchange_cb()
@@ -74,7 +74,7 @@ def __init__(self, aspace, ndata):
7474

7575
def append(self, ref):
7676
list.append(self, ref)
77-
self._aspace._insert_reference_threadsafe(self.nodeid, ref)
77+
self._aspace._insert_reference_threadsafe(self.nodeid, ref, commit=True)
7878

7979
def remove(self, ref):
8080
raise NotImplementedError
@@ -172,7 +172,6 @@ def _setitem_backend(self, nodeid, ndata):
172172

173173
if ndata.attributes.aspace is self:
174174
self._write_nodedata(ndata)
175-
self.backend.commit()
176175

177176
@staticmethod
178177
def _nodeid_surjection(nodeid):
@@ -211,7 +210,6 @@ def dump(self, namespaceidx=AddressSpace.DEFAULT_USER_NAMESPACE_INDEX):
211210
"""
212211
with self._lock:
213212
self._dump(namespaceidx)
214-
self.backend.commit()
215213
print("Export to {:s} completed".format(str(self.backend)))
216214

217215
def _dump(self, namespaceidx=AddressSpace.DEFAULT_USER_NAMESPACE_INDEX):
@@ -224,16 +222,19 @@ def _dump(self, namespaceidx=AddressSpace.DEFAULT_USER_NAMESPACE_INDEX):
224222
assert(nodeid == ndata.nodeid)
225223
assert(isinstance(ndata, NodeData))
226224
if nodeid.NamespaceIndex == namespaceidx:
227-
self._write_nodedata(ndata)
225+
self._write_nodedata(ndata, commit=False)
228226
continue
229227
# inter-namespace references.
230228
for ref in ndata.references:
231229
if ref.NodeId.NamespaceIndex != namespaceidx:
232230
continue
233231
mapNodeId = AddressSpaceSQLite._nodeid_surjection(ndata.nodeid)
234-
self._insert_reference(mapNodeId, ref)
232+
self._insert_reference(mapNodeId, ref, commit=False)
235233

236-
# 3. Integrity checks.
234+
# 3. commit
235+
self.backend.execute_write(dbCmd=None, commit=True)
236+
237+
# 4. Integrity checks.
237238
for nodeid, ndata in self._cache.items():
238239
if nodeid.NamespaceIndex != namespaceidx:
239240
continue
@@ -242,22 +243,22 @@ def _dump(self, namespaceidx=AddressSpace.DEFAULT_USER_NAMESPACE_INDEX):
242243
AddressSpaceSQLite._cmp_nodedata(ndata, ndata2)
243244

244245
# Write NodeData to database
245-
def _write_nodedata(self, ndata):
246+
def _write_nodedata(self, ndata, commit=True):
246247
mapNodeId = AddressSpaceSQLite._nodeid_surjection(ndata.nodeid)
247-
self._write_attributes(mapNodeId, ndata)
248-
self._write_references(mapNodeId, ndata)
248+
self._write_attributes(mapNodeId, ndata, commit=commit)
249+
self._write_references(mapNodeId, ndata, commit=commit)
249250

250-
def _write_attributes(self, nodeid, ndata):
251+
def _write_attributes(self, nodeid, ndata, commit=True):
251252
assert(nodeid.NodeIdType == NodeIdType.Numeric)
252253
assert(isinstance(ndata.attributes, dict))
253254
for attrId, attr in ndata.attributes.items():
254-
AddressSpaceSQLite._insert_attribute(self.backend, nodeid, attrId, attr)
255+
AddressSpaceSQLite._insert_attribute(self.backend, nodeid, attrId, attr, commit=commit)
255256

256-
def _write_references(self, nodeid, ndata):
257+
def _write_references(self, nodeid, ndata, commit=True):
257258
assert(nodeid.NodeIdType == NodeIdType.Numeric)
258259
assert(isinstance(ndata.references, list))
259260
for ref in ndata.references:
260-
AddressSpaceSQLite._insert_reference(self.backend, nodeid, ref)
261+
AddressSpaceSQLite._insert_reference(self.backend, nodeid, ref, commit=commit)
261262

262263
# Read NodeData from database
263264
@staticmethod
@@ -309,22 +310,20 @@ def _create_attr_table(backend, table=ATTR_TABLE_NAME, drop=False):
309310
]
310311
if drop is True:
311312
dropCmd = 'DROP TABLE IF EXISTS "{tn}"'.format(tn=table)
312-
backend.execute_write(dropCmd)
313+
backend.execute_write(dropCmd, commit=True)
313314
cmd = 'CREATE TABLE IF NOT EXISTS "{tn}" ({c})'.format(tn=table, c=', '.join(ATTR_TABLE_COLS))
314-
backend.execute_write(cmd)
315+
backend.execute_write(cmd, commit=True)
315316

316-
def _insert_attribute_threadsafe(self, nodeid, attrId, attr, table=ATTR_TABLE_NAME):
317+
def _insert_attribute_threadsafe(self, nodeid, attrId, attr, table=ATTR_TABLE_NAME, commit=True):
317318
with self._lock:
318319
if nodeid == AddressSpaceSQLite.CUR_TIME_NODEID:
319-
pass # Prevents SD-card wear: don't write the time.
320+
# Prevent sd-card wear: don't write the time. Use as trigger for WAL checkpoints.
321+
self.backend.wal_throttled_threadsafe()
320322
else:
321-
AddressSpaceSQLite._insert_attribute(self.backend, nodeid, attrId, attr, table)
322-
# CurrentTime-node updates result in commits at COMMIT_INTERVAL sec.
323-
# Commits without previous actual transactions don't touch the file.
324-
self.backend.commit()
323+
AddressSpaceSQLite._insert_attribute(self.backend, nodeid, attrId, attr, table, commit=commit)
325324

326325
@staticmethod
327-
def _insert_attribute(backend, nodeid, attrId, attr, table=ATTR_TABLE_NAME):
326+
def _insert_attribute(backend, nodeid, attrId, attr, table=ATTR_TABLE_NAME, commit=True):
328327
assert(nodeid.NodeIdType == NodeIdType.Numeric)
329328
assert(isinstance(attrId, ua.AttributeIds))
330329
assert(isinstance(attr, AttributeValue))
@@ -363,7 +362,7 @@ def _insert_attribute(backend, nodeid, attrId, attr, table=ATTR_TABLE_NAME):
363362
sqlite3.Binary(ua.ua_binary.variant_to_binary(attr.value.Value)),
364363
str(nodeid)
365364
)
366-
backend.execute_write(cmd, params=params)
365+
backend.execute_write(cmd, params=params, commit=commit)
367366

368367
@staticmethod
369368
def _read_attribute_row(row):
@@ -402,17 +401,16 @@ def _create_refs_table(backend, table=REFS_TABLE_NAME, drop=False):
402401
]
403402
if drop is True:
404403
dropCmd = 'DROP TABLE IF EXISTS "{tn}"'.format(tn=table)
405-
backend.execute_write(dropCmd)
404+
backend.execute_write(dropCmd, commit=True)
406405
cmd = 'CREATE TABLE IF NOT EXISTS "{tn}" ({c})'.format(tn=table, c=', '.join(REFS_TABLE_COLS))
407-
backend.execute_write(cmd)
406+
backend.execute_write(cmd, commit=True)
408407

409-
def _insert_reference_threadsafe(self, nodeid, ref, table=REFS_TABLE_NAME):
408+
def _insert_reference_threadsafe(self, nodeid, ref, table=REFS_TABLE_NAME, commit=True):
410409
with self._lock:
411-
AddressSpaceSQLite._insert_reference(self.backend, nodeid, ref, table)
412-
self.backend.commit()
410+
AddressSpaceSQLite._insert_reference(self.backend, nodeid, ref, table, commit=commit)
413411

414412
@staticmethod
415-
def _insert_reference(backend, nodeid, ref, table=REFS_TABLE_NAME):
413+
def _insert_reference(backend, nodeid, ref, table=REFS_TABLE_NAME, commit=True):
416414
# NumericNodeId is required for searching.
417415
assert(nodeid.NodeIdType == NodeIdType.Numeric)
418416
assert(isinstance(ref, ua.uaprotocol_auto.ReferenceDescription))
@@ -452,7 +450,7 @@ def _insert_reference(backend, nodeid, ref, table=REFS_TABLE_NAME):
452450
sqlite3.Binary(ua.ua_binary.nodeid_to_binary(ref.TypeDefinition)),
453451
str(nodeid)
454452
)
455-
backend.execute_write(cmd, params=params)
453+
backend.execute_write(cmd, params=params, commit=commit)
456454

457455
@staticmethod
458456
def _read_reference_row(row):

0 commit comments

Comments
 (0)