Skip to content

Commit e95c8d8

Browse files
author
David Erb
committed
adds cursor lock
1 parent 0a3fdab commit e95c8d8

File tree

1 file changed

+119
-104
lines changed

1 file changed

+119
-104
lines changed

src/dls_normsql/aiomysql.py

Lines changed: 119 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ def __init__(self, specification, database_definition_object):
7676

7777
self.__backup_restore_lock = asyncio.Lock()
7878

79+
# TODO: Remove cursor lock when adding connection pooling.
80+
self.__cursor_lock = asyncio.Lock()
81+
7982
# Last undo position.
8083
self.__last_restore = 0
8184

@@ -296,36 +299,43 @@ async def create_table(self, table):
296299
warnings.simplefilter("ignore")
297300
await self.execute("DROP TABLE IF EXISTS %s" % (table.name))
298301

299-
async with self.__connection.cursor() as cursor:
300-
301-
fields_sql = []
302-
indices_sql = []
303-
304-
for field_name in table.fields:
305-
field = table.fields[field_name]
306-
field_type = field["type"].upper()
307-
if field_type == "TEXT PRIMARY KEY":
308-
field_type = "VARCHAR(64) PRIMARY KEY"
309-
fields_sql.append("`%s` %s" % (field_name, field_type))
310-
if field.get("index", False):
311-
if field_type == "TEXT":
312-
index_length = "(128)"
313-
else:
314-
index_length = ""
315-
indices_sql.append(
316-
"CREATE INDEX `%s_%s` ON `%s`(`%s`%s)"
317-
% (table.name, field_name, table.name, field_name, index_length)
318-
)
302+
async with self.__cursor_lock:
303+
async with self.__connection.cursor() as cursor:
319304

320-
sql = "CREATE TABLE `%s`\n(%s)" % (table.name, ",\n ".join(fields_sql))
305+
fields_sql = []
306+
indices_sql = []
307+
308+
for field_name in table.fields:
309+
field = table.fields[field_name]
310+
field_type = field["type"].upper()
311+
if field_type == "TEXT PRIMARY KEY":
312+
field_type = "VARCHAR(64) PRIMARY KEY"
313+
fields_sql.append("`%s` %s" % (field_name, field_type))
314+
if field.get("index", False):
315+
if field_type == "TEXT":
316+
index_length = "(128)"
317+
else:
318+
index_length = ""
319+
indices_sql.append(
320+
"CREATE INDEX `%s_%s` ON `%s`(`%s`%s)"
321+
% (
322+
table.name,
323+
field_name,
324+
table.name,
325+
field_name,
326+
index_length,
327+
)
328+
)
321329

322-
logger.debug("\n%s\n%s" % (sql, "\n".join(indices_sql)))
330+
sql = "CREATE TABLE `%s`\n(%s)" % (table.name, ",\n ".join(fields_sql))
323331

324-
await cursor.execute(sql)
332+
logger.debug("\n%s\n%s" % (sql, "\n".join(indices_sql)))
325333

326-
for sql in indices_sql:
327334
await cursor.execute(sql)
328335

336+
for sql in indices_sql:
337+
await cursor.execute(sql)
338+
329339
# ----------------------------------------------------------------------------------------
330340
async def insert(
331341
self,
@@ -388,9 +398,10 @@ async def insert(
388398
message = "%s:\n%s\n%s" % (why, sql, values_rows)
389399

390400
try:
391-
async with self.__connection.cursor() as cursor:
392-
await cursor.executemany(sql, values_rows)
393-
logger.debug(message)
401+
async with self.__cursor_lock:
402+
async with self.__connection.cursor() as cursor:
403+
await cursor.executemany(sql, values_rows)
404+
logger.debug(message)
394405

395406
except (TypeError, aiomysql.OperationalError) as exception:
396407
raise RuntimeError(f"{exception} doing {message}")
@@ -437,26 +448,27 @@ async def update(
437448

438449
sql = self.__parameterize(sql, subs)
439450

440-
async with self.__connection.cursor() as cursor:
441-
try:
442-
await cursor.execute(sql, values_row)
443-
rowcount = cursor.rowcount
451+
async with self.__cursor_lock:
452+
async with self.__connection.cursor() as cursor:
453+
try:
454+
await cursor.execute(sql, values_row)
455+
rowcount = cursor.rowcount
444456

445-
if why is None:
446-
logger.debug(
447-
"%d rows from:\n%s\nvalues %s" % (rowcount, sql, values_row)
448-
)
449-
else:
450-
logger.debug(
451-
"%d rows from %s:\n%s\nvalues %s"
452-
% (rowcount, why, sql, values_row)
453-
)
454-
455-
except (TypeError, aiomysql.OperationalError):
456-
if why is None:
457-
raise RuntimeError(f"failed to execute {sql}")
458-
else:
459-
raise RuntimeError(f"failed to execute {why}: {sql}")
457+
if why is None:
458+
logger.debug(
459+
"%d rows from:\n%s\nvalues %s" % (rowcount, sql, values_row)
460+
)
461+
else:
462+
logger.debug(
463+
"%d rows from %s:\n%s\nvalues %s"
464+
% (rowcount, why, sql, values_row)
465+
)
466+
467+
except (TypeError, aiomysql.OperationalError):
468+
if why is None:
469+
raise RuntimeError(f"failed to execute {sql}")
470+
else:
471+
raise RuntimeError(f"failed to execute {why}: {sql}")
460472

461473
return rowcount
462474

@@ -472,40 +484,41 @@ async def execute(
472484
If subs is a list of lists, then these are presumed the values for executemany.
473485
"""
474486

475-
async with self.__connection.cursor() as cursor:
476-
try:
477-
sql = self.__parameterize(sql, subs)
478-
479-
# Subs is a list of lists?
480-
if (
481-
isinstance(subs, list)
482-
and len(subs) > 0
483-
and isinstance(subs[0], list)
484-
):
485-
logger.debug(f"inserting {len(subs)} of {len(subs[0])}")
486-
await cursor.executemany(sql, subs)
487-
else:
488-
await cursor.execute(sql, subs)
489-
490-
if why is None:
491-
if cursor.rowcount > 0:
492-
logger.debug(
493-
f"{cursor.rowcount} records affected by\n{sql}\nvalues {subs}"
494-
)
487+
async with self.__cursor_lock:
488+
async with self.__connection.cursor() as cursor:
489+
try:
490+
sql = self.__parameterize(sql, subs)
491+
492+
# Subs is a list of lists?
493+
if (
494+
isinstance(subs, list)
495+
and len(subs) > 0
496+
and isinstance(subs[0], list)
497+
):
498+
logger.debug(f"inserting {len(subs)} of {len(subs[0])}")
499+
await cursor.executemany(sql, subs)
495500
else:
496-
logger.debug(f"{sql}\nvalues {subs}")
497-
else:
498-
if cursor.rowcount > 0:
499-
logger.debug(
500-
f"{cursor.rowcount} records affected by {why}:\n{sql} values {subs}"
501-
)
501+
await cursor.execute(sql, subs)
502+
503+
if why is None:
504+
if cursor.rowcount > 0:
505+
logger.debug(
506+
f"{cursor.rowcount} records affected by\n{sql}\nvalues {subs}"
507+
)
508+
else:
509+
logger.debug(f"{sql}\nvalues {subs}")
502510
else:
503-
logger.debug(f"{why}: {sql}\nvalues {subs}")
504-
except (TypeError, aiomysql.OperationalError):
505-
if why is None:
506-
raise RuntimeError(f"failed to execute {sql}")
507-
else:
508-
raise RuntimeError(f"failed to execute {why}: {sql}")
511+
if cursor.rowcount > 0:
512+
logger.debug(
513+
f"{cursor.rowcount} records affected by {why}:\n{sql} values {subs}"
514+
)
515+
else:
516+
logger.debug(f"{why}: {sql}\nvalues {subs}")
517+
except (TypeError, aiomysql.OperationalError):
518+
if why is None:
519+
raise RuntimeError(f"failed to execute {sql}")
520+
else:
521+
raise RuntimeError(f"failed to execute {why}: {sql}")
509522

510523
# ----------------------------------------------------------------------------------------
511524
async def query(self, sql, subs=None, why=None):
@@ -515,32 +528,34 @@ async def query(self, sql, subs=None, why=None):
515528

516529
sql = self.__parameterize(sql, subs)
517530

518-
cursor = None
519-
async with self.__connection.cursor() as cursor:
520-
try:
521-
cursor = await self.__connection.cursor()
522-
await cursor.execute(sql, subs)
523-
rows = await cursor.fetchall()
524-
cols = []
525-
for col in cursor.description:
526-
cols.append(col[0])
527-
528-
if why is None:
529-
logger.debug("%d records from: %s" % (len(rows), sql))
530-
else:
531-
logger.debug("%d records from %s: %s" % (len(rows), why, sql))
532-
records = []
533-
for row in rows:
534-
record = OrderedDict()
535-
for index, col in enumerate(cols):
536-
record[col] = row[index]
537-
records.append(record)
538-
return records
539-
except (TypeError, aiomysql.OperationalError) as exception:
540-
if why is None:
541-
raise RuntimeError(explain(exception, f"executing {sql}"))
542-
else:
543-
raise RuntimeError(explain(exception, f"executing {why}: {sql}"))
531+
async with self.__cursor_lock:
532+
async with self.__connection.cursor() as cursor:
533+
try:
534+
cursor = await self.__connection.cursor()
535+
await cursor.execute(sql, subs)
536+
rows = await cursor.fetchall()
537+
cols = []
538+
for col in cursor.description:
539+
cols.append(col[0])
540+
541+
if why is None:
542+
logger.debug("%d records from: %s" % (len(rows), sql))
543+
else:
544+
logger.debug("%d records from %s: %s" % (len(rows), why, sql))
545+
records = []
546+
for row in rows:
547+
record = OrderedDict()
548+
for index, col in enumerate(cols):
549+
record[col] = row[index]
550+
records.append(record)
551+
return records
552+
except (TypeError, aiomysql.OperationalError) as exception:
553+
if why is None:
554+
raise RuntimeError(explain(exception, f"executing {sql}"))
555+
else:
556+
raise RuntimeError(
557+
explain(exception, f"executing {why}: {sql}")
558+
)
544559

545560
# ----------------------------------------------------------------------------------------
546561
async def backup(self):

0 commit comments

Comments
 (0)