Skip to content

Commit dcc57d4

Browse files
committed
[IMP] orm: add optional parallelism to iter_browse.__attr__()
In some cases, e.g. if it is known that calling a certain method on the model will only trigger inserts or it is clear that updates will be disjunct, such method calls can be done in parallel.
1 parent cdfe851 commit dcc57d4

File tree

1 file changed

+99
-13
lines changed

1 file changed

+99
-13
lines changed

src/util/orm.py

Lines changed: 99 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,22 @@
99
on this module work along the ORM of *all* supported versions.
1010
"""
1111

12+
import collections
1213
import logging
14+
import multiprocessing
15+
import os
1316
import re
17+
import sys
1418
from contextlib import contextmanager
1519
from functools import wraps
16-
from itertools import chain
20+
from itertools import chain, repeat
1721
from textwrap import dedent
1822

23+
try:
24+
from concurrent.futures import ProcessPoolExecutor
25+
except ImportError:
26+
ProcessPoolExecutor = None
27+
1928
try:
2029
from unittest.mock import patch
2130
except ImportError:
@@ -27,9 +36,9 @@
2736
except ImportError:
2837
from odoo import SUPERUSER_ID
2938
from odoo import fields as ofields
30-
from odoo import modules, release
39+
from odoo import modules, release, sql_db
3140
except ImportError:
32-
from openerp import SUPERUSER_ID, modules, release
41+
from openerp import SUPERUSER_ID, modules, release, sql_db
3342

3443
try:
3544
from openerp import fields as ofields
@@ -41,8 +50,8 @@
4150
from .const import BIG_TABLE_THRESHOLD
4251
from .exceptions import MigrationError
4352
from .helpers import table_of_model
44-
from .misc import chunks, log_progress, version_between, version_gte
45-
from .pg import SQLStr, column_exists, format_query, get_columns, named_cursor, query_ids
53+
from .misc import chunks, log_progress, str2bool, version_between, version_gte
54+
from .pg import SQLStr, column_exists, format_query, get_columns, get_max_workers, named_cursor, query_ids
4655

4756
# python3 shims
4857
try:
@@ -52,6 +61,10 @@
5261

5362
_logger = logging.getLogger(__name__)
5463

64+
UPG_PARALLEL_ITER_BROWSE = str2bool(os.environ.get("UPG_PARALLEL_ITER_BROWSE", "0"))
65+
# FIXME: for CI! Remove before merge
66+
UPG_PARALLEL_ITER_BROWSE = True
67+
5568

5669
def env(cr):
5770
"""
@@ -329,6 +342,21 @@ def recompute_fields(cr, model, fields, ids=None, logger=_logger, chunk_size=256
329342
invalidate(records)
330343

331344

345+
def _mp_iter_browse_cb(ids_or_values, params):
346+
me = _mp_iter_browse_cb
347+
# init upon first call. Done here instead of initializer callback, because py3.6 doesn't have it
348+
if not hasattr(me, "env"):
349+
sql_db._Pool = None # children cannot borrow from copies of the same pool, it will cause protocol error
350+
me.env = env(sql_db.db_connect(params["dbname"]).cursor())
351+
me.env.clear()
352+
# process
353+
if params["mode"] == "browse":
354+
getattr(
355+
me.env[params["model_name"]].with_context(params["context"]).browse(ids_or_values), params["attr_name"]
356+
)(*params["args"], **params["kwargs"])
357+
me.env.cr.commit()
358+
359+
332360
class iter_browse(object):
333361
"""
334362
Iterate over recordsets.
@@ -377,7 +405,19 @@ class iter_browse(object):
377405
See also :func:`~odoo.upgrade.util.orm.env`
378406
"""
379407

380-
__slots__ = ("_chunk_size", "_cr_uid", "_ids", "_it", "_logger", "_model", "_patch", "_query", "_size", "_strategy")
408+
__slots__ = (
409+
"_chunk_size",
410+
"_cr_uid",
411+
"_ids",
412+
"_it",
413+
"_logger",
414+
"_model",
415+
"_patch",
416+
"_query",
417+
"_size",
418+
"_strategy",
419+
"_superchunk_size",
420+
)
381421

382422
def __init__(self, model, *args, **kw):
383423
assert len(args) in [1, 3] # either (cr, uid, ids) or (ids,)
@@ -387,17 +427,38 @@ def __init__(self, model, *args, **kw):
387427
self._size = kw.pop("size", None)
388428
self._query = kw.pop("query", None)
389429
self._chunk_size = kw.pop("chunk_size", 200) # keyword-only argument
430+
self._superchunk_size = self._chunk_size
390431
self._logger = kw.pop("logger", _logger)
391432
self._strategy = kw.pop("strategy", "flush")
392-
assert self._strategy in {"flush", "commit"}
433+
assert self._strategy in {"flush", "commit", "multiprocessing"}
434+
if self._strategy == "multiprocessing":
435+
if not ProcessPoolExecutor:
436+
raise ValueError("multiprocessing strategy can not be used in scripts run by python2")
437+
if UPG_PARALLEL_ITER_BROWSE:
438+
self._superchunk_size = min(get_max_workers() * 10 * self._chunk_size, 1000000)
439+
else:
440+
self._strategy = "commit" # downgrade
441+
if self._size > 100000:
442+
_logger.warning(
443+
"Browsing %d %s, which may take a long time. "
444+
"This can be sped up by setting the env variable UPG_PARALLEL_ITER_BROWSE to 1. "
445+
"If you do, be sure to examine the results carefully.",
446+
self._size,
447+
self._model._name,
448+
)
449+
else:
450+
_logger.info(
451+
"Caller requested multiprocessing strategy, but UPG_PARALLEL_ITER_BROWSE env var is not set. "
452+
"Downgrading strategy to commit.",
453+
)
393454
if kw:
394455
raise TypeError("Unknown arguments: %s" % ", ".join(kw))
395456

396457
if not (self._ids is None) ^ (self._query is None):
397458
raise TypeError("Must be initialized using exactly one of `ids` or `query`")
398459

399460
if self._query:
400-
self._ids = query_ids(self._model.env.cr, self._query, itersize=self._chunk_size)
461+
self._ids = query_ids(self._model.env.cr, self._query, itersize=self._superchunk_size)
401462

402463
if not self._size:
403464
try:
@@ -430,7 +491,7 @@ def _browse(self, ids):
430491
return self._model.browse(*args)
431492

432493
def _end(self):
433-
if self._strategy == "commit":
494+
if self._strategy in ["commit", "multiprocessing"]:
434495
self._model.env.cr.commit()
435496
else:
436497
flush(self._model)
@@ -457,15 +518,40 @@ def __getattr__(self, attr):
457518
if not callable(getattr(self._model, attr)):
458519
raise TypeError("The attribute %r is not callable" % attr)
459520

460-
it = self._it
521+
it = chunks(self._ids, self._superchunk_size, fmt=self._browse)
461522
if self._logger:
462-
sz = (self._size + self._chunk_size - 1) // self._chunk_size
463-
qualifier = "%s[:%d]" % (self._model._name, self._chunk_size)
523+
sz = (self._size + self._superchunk_size - 1) // self._superchunk_size
524+
qualifier = "%s[:%d]" % (self._model._name, self._superchunk_size)
464525
it = log_progress(it, self._logger, qualifier=qualifier, size=sz)
465526

466527
def caller(*args, **kwargs):
467528
args = self._cr_uid + args
468-
return [getattr(chnk, attr)(*args, **kwargs) for chnk in chain(it, self._end())]
529+
if self._strategy != "multiprocessing":
530+
return [getattr(chnk, attr)(*args, **kwargs) for chnk in chain(it, self._end())]
531+
params = {
532+
"dbname": self._model.env.cr.dbname,
533+
"model_name": self._model._name,
534+
# convert to dict for pickle. Will still break if any value in the context is not pickleable
535+
"context": dict(self._model.env.context),
536+
"attr_name": attr,
537+
"args": args,
538+
"kwargs": kwargs,
539+
"mode": "browse",
540+
}
541+
self._model.env.cr.commit()
542+
extrakwargs = {"mp_context": multiprocessing.get_context("fork")} if sys.version_info >= (3, 7) else {}
543+
with ProcessPoolExecutor(max_workers=get_max_workers(), **extrakwargs) as executor:
544+
for chunk in it:
545+
collections.deque(
546+
executor.map(
547+
_mp_iter_browse_cb, chunks(chunk._ids, self._chunk_size, fmt=tuple), repeat(params)
548+
),
549+
maxlen=0,
550+
)
551+
next(self._end(), None)
552+
# do not return results in // mode, we expect it to be used for huge numbers of
553+
# records and thus would risk MemoryError, also we cannot know if what attr returns is pickleable
554+
return None
469555

470556
self._it = None
471557
return caller

0 commit comments

Comments
 (0)