Skip to content

Commit 5519ad3

Browse files
Pirolscawo-odoo
andcommitted
[IMP] util/pg: serialize queries upon DeadlockDetected
As recently noted in odoo/upgrade#5753, it is possible that `parallel_execute` generates concurrency issues. For instance, if a `DELETE` query is run in `//` on a table with a self-referencing foreign key, based on the actual records in the db, the following error may be encountered: ``` Traceback (most recent call last): File "/home/odoo/src/odoo/15.0/odoo/service/server.py", line 1260, in preload_registries registry = Registry.new(dbname, update_module=update_module) File "/home/odoo/src/odoo/15.0/odoo/modules/registry.py", line 87, in new odoo.modules.load_modules(registry, force_demo, status, update_module) File "/home/odoo/src/odoo/15.0/odoo/modules/loading.py", line 415, in load_modules loaded_modules, processed_modules = load_module_graph( File "/home/odoo/src/odoo/15.0/odoo/modules/loading.py", line 174, in load_module_graph migrations.migrate_module(package, 'pre') File "/home/odoo/src/odoo/15.0/odoo/modules/migration.py", line 186, in migrate_module migrate(self.cr, installed_version) File "/tmp/tmpjgh5os94/migrations/base/0.0.0/pre-clean-assets.py", line 7, in migrate util.parallel_execute( File "/tmp/tmpjgh5os94/migrations/util/pg.py", line 103, in parallel_execute return sum( File "/tmp/tmpjgh5os94/migrations/util/misc.py", line 215, in log_progress for i, e in enumerate(it, 1): File "/usr/lib/python3.8/concurrent/futures/_base.py", line 619, in result_iterator yield fs.pop().result() File "/usr/lib/python3.8/concurrent/futures/_base.py", line 437, in result return self.__get_result() File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result raise self._exception File "/usr/lib/python3.8/concurrent/futures/thread.py", line 57, in run result = self.fn(*self.args, **self.kwargs) File "/tmp/tmpjgh5os94/migrations/util/pg.py", line 95, in execute cr.execute(query) File "<decorator-gen-5>", line 2, in execute File "/home/odoo/src/odoo/15.0/odoo/sql_db.py", line 90, in check return f(self, *args, **kwargs) File "/home/odoo/src/odoo/15.0/odoo/sql_db.py", line 311, in execute res = self._obj.execute(query, params) psycopg2.errors.DeadlockDetected: deadlock detected DETAIL: Process 1418247 waits for ShareLock on transaction 21800495; blocked by process 1418237. Process 1418237 waits for ShareLock on transaction 21800496; blocked by process 1418247. HINT: See server log for query details. CONTEXT: while updating tuple (450,6) in relation "ir_attachment" SQL statement "UPDATE ONLY "public"."ir_attachment" SET "doc_next_name_id" = NULL WHERE $1 OPERATOR(pg_catalog.=) "doc_next_name_id"" ``` In the case of similar DeadlockDetected issues, the solution we have adopted has always been to run the query (or the exploded query) sequentially. This commit implements a mechanism to automatically collect queries that fail due to DeadlockDetected issues and rerun them sequentially with no further failure control. Some remarks: * Potentially other errorcodes for other concurrent errors may be added in the future. * `-2` was chosen as the error code, because `-1` is already used up by `cr.rowcount` when the number of affected rows can't be fetched (e.g. `CREATED INDEX` queries) Part-of: #49 Co-authored-by: Carsten Wolff (cawo) <[email protected]>
1 parent d7d029b commit 5519ad3

File tree

1 file changed

+30
-12
lines changed

1 file changed

+30
-12
lines changed

src/util/pg.py

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
from multiprocessing import cpu_count
1515

1616
try:
17-
from concurrent.futures import ThreadPoolExecutor
17+
from concurrent.futures import ThreadPoolExecutor # noqa: I001
18+
import concurrent
1819
except ImportError:
1920
ThreadPoolExecutor = None
2021

@@ -29,7 +30,7 @@
2930
pass
3031

3132
import psycopg2
32-
from psycopg2 import sql
33+
from psycopg2 import errorcodes, sql
3334

3435
try:
3536
from odoo.sql_db import db_connect
@@ -117,17 +118,34 @@ def execute(query):
117118

118119
cr.commit()
119120

121+
CONCURRENCY_ERRORCODES = {errorcodes.DEADLOCK_DETECTED}
122+
failed_queries = []
123+
tot_cnt = 0
120124
with ThreadPoolExecutor(max_workers=max_workers) as executor:
121-
return sum(
122-
log_progress(
123-
executor.map(execute, queries),
124-
logger,
125-
qualifier="queries",
126-
size=len(queries),
127-
estimate=False,
128-
log_hundred_percent=True,
129-
)
130-
)
125+
future_queries = {executor.submit(execute, q): q for q in queries}
126+
for future in log_progress(
127+
concurrent.futures.as_completed(future_queries),
128+
logger,
129+
qualifier="queries",
130+
size=len(queries),
131+
estimate=False,
132+
log_hundred_percent=True,
133+
):
134+
try:
135+
tot_cnt += future.result()
136+
except psycopg2.OperationalError as exc:
137+
if exc.pgcode not in CONCURRENCY_ERRORCODES:
138+
raise
139+
140+
# to be retried without concurrency
141+
failed_queries.append(future_queries[future])
142+
143+
if failed_queries:
144+
logger.warning("Serialize queries that failed due to concurrency issues")
145+
tot_cnt += _parallel_execute_serial(cr, failed_queries, logger=logger)
146+
cr.commit()
147+
148+
return tot_cnt
131149

132150
else:
133151
_parallel_execute_threaded = _parallel_execute_serial

0 commit comments

Comments
 (0)