Skip to content

Commit 9abb7d6

Browse files
committed
[IMP] orm: add optional parallelism to iter_browse.__attr__()
In some cases, e.g. if it is known that calling a certain method on the model will only trigger inserts or it is clear that updates will be disjunct, such method calls can be done in parallel.
1 parent ad0d9e5 commit 9abb7d6

File tree

1 file changed

+87
-12
lines changed

1 file changed

+87
-12
lines changed

src/util/orm.py

Lines changed: 87 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,22 @@
99
on this module work along the ORM of *all* supported versions.
1010
"""
1111

12+
import collections
1213
import logging
14+
import multiprocessing
15+
import os
1316
import re
17+
import sys
1418
from contextlib import contextmanager
1519
from functools import wraps
16-
from itertools import chain
20+
from itertools import chain, repeat
1721
from textwrap import dedent
1822

23+
try:
24+
from concurrent.futures import ProcessPoolExecutor
25+
except ImportError:
26+
ProcessPoolExecutor = None
27+
1928
try:
2029
from unittest.mock import patch
2130
except ImportError:
@@ -27,9 +36,9 @@
2736
except ImportError:
2837
from odoo import SUPERUSER_ID
2938
from odoo import fields as ofields
30-
from odoo import modules, release
39+
from odoo import modules, release, sql_db
3140
except ImportError:
32-
from openerp import SUPERUSER_ID, modules, release
41+
from openerp import SUPERUSER_ID, modules, release, sql_db
3342

3443
try:
3544
from openerp import fields as ofields
@@ -41,8 +50,8 @@
4150
from .const import BIG_TABLE_THRESHOLD
4251
from .exceptions import MigrationError
4352
from .helpers import table_of_model
44-
from .misc import chunks, log_progress, version_between, version_gte
45-
from .pg import SQLStr, column_exists, format_query, get_columns, named_cursor, query_ids
53+
from .misc import chunks, log_progress, str2bool, version_between, version_gte
54+
from .pg import SQLStr, column_exists, format_query, get_columns, get_max_workers, named_cursor, query_ids
4655

4756
# python3 shims
4857
try:
@@ -52,6 +61,10 @@
5261

5362
_logger = logging.getLogger(__name__)
5463

64+
UPG_PARALLEL_ITER_BROWSE = str2bool(os.environ.get("UPG_PARALLEL_ITER_BROWSE", "0"))
65+
# FIXME: for CI! Remove before merge
66+
UPG_PARALLEL_ITER_BROWSE = True
67+
5568

5669
def env(cr):
5770
"""
@@ -329,6 +342,21 @@ def recompute_fields(cr, model, fields, ids=None, logger=_logger, chunk_size=256
329342
invalidate(records)
330343

331344

345+
def _mp_iter_browse_cb(ids_or_values, params):
346+
me = _mp_iter_browse_cb
347+
# init upon first call. Done here instead of initializer callback, because py3.6 doesn't have it
348+
if not hasattr(me, "env"):
349+
sql_db._Pool = None # children cannot borrow from copies of the same pool, it will cause protocol error
350+
me.env = env(sql_db.db_connect(params["dbname"]).cursor())
351+
me.env.clear()
352+
# process
353+
if params["mode"] == "browse":
354+
getattr(
355+
me.env[params["model_name"]].with_context(params["context"]).browse(ids_or_values), params["attr_name"]
356+
)(*params["args"], **params["kwargs"])
357+
me.env.cr.commit()
358+
359+
332360
class iter_browse(object):
333361
"""
334362
Iterate over recordsets.
@@ -391,6 +419,7 @@ class iter_browse(object):
391419
"_size",
392420
"_strategy",
393421
"_yield_chunks",
422+
"_superchunk_size",
394423
)
395424

396425
def __init__(self, model, *args, **kw):
@@ -402,17 +431,38 @@ def __init__(self, model, *args, **kw):
402431
self._query = kw.pop("query", None)
403432
self._chunk_size = kw.pop("chunk_size", 200) # keyword-only argument
404433
self._yield_chunks = kw.pop("yield_chunks", False)
434+
self._superchunk_size = self._chunk_size
405435
self._logger = kw.pop("logger", _logger)
406436
self._strategy = kw.pop("strategy", "flush")
407-
assert self._strategy in {"flush", "commit"}
437+
assert self._strategy in {"flush", "commit", "multiprocessing"}
438+
if self._strategy == "multiprocessing":
439+
if not ProcessPoolExecutor:
440+
raise ValueError("multiprocessing strategy can not be used in scripts run by python2")
441+
if UPG_PARALLEL_ITER_BROWSE:
442+
self._superchunk_size = min(get_max_workers() * 10 * self._chunk_size, 1000000)
443+
else:
444+
self._strategy = "commit" # downgrade
445+
if self._size > 100000:
446+
_logger.warning(
447+
"Browsing %d %s, which may take a long time. "
448+
"This can be sped up by setting the env variable UPG_PARALLEL_ITER_BROWSE to 1. "
449+
"If you do, be sure to examine the results carefully.",
450+
self._size,
451+
self._model._name,
452+
)
453+
else:
454+
_logger.info(
455+
"Caller requested multiprocessing strategy, but UPG_PARALLEL_ITER_BROWSE env var is not set. "
456+
"Downgrading strategy to commit.",
457+
)
408458
if kw:
409459
raise TypeError("Unknown arguments: %s" % ", ".join(kw))
410460

411461
if not (self._ids is None) ^ (self._query is None):
412462
raise TypeError("Must be initialized using exactly one of `ids` or `query`")
413463

414464
if self._query:
415-
self._ids = query_ids(self._model.env.cr, self._query, itersize=self._chunk_size)
465+
self._ids = query_ids(self._model.env.cr, self._query, itersize=self._superchunk_size)
416466

417467
if not self._size:
418468
try:
@@ -445,7 +495,7 @@ def _browse(self, ids):
445495
return self._model.browse(*args)
446496

447497
def _end(self):
448-
if self._strategy == "commit":
498+
if self._strategy in ["commit", "multiprocessing"]:
449499
self._model.env.cr.commit()
450500
else:
451501
flush(self._model)
@@ -473,15 +523,40 @@ def __getattr__(self, attr):
473523
if not callable(getattr(self._model, attr)):
474524
raise TypeError("The attribute %r is not callable" % attr)
475525

476-
it = self._it
526+
it = chunks(self._ids, self._superchunk_size, fmt=self._browse)
477527
if self._logger:
478-
sz = (self._size + self._chunk_size - 1) // self._chunk_size
479-
qualifier = "%s[:%d]" % (self._model._name, self._chunk_size)
528+
sz = (self._size + self._superchunk_size - 1) // self._superchunk_size
529+
qualifier = "%s[:%d]" % (self._model._name, self._superchunk_size)
480530
it = log_progress(it, self._logger, qualifier=qualifier, size=sz)
481531

482532
def caller(*args, **kwargs):
483533
args = self._cr_uid + args
484-
return [getattr(chnk, attr)(*args, **kwargs) for chnk in chain(it, self._end())]
534+
if self._strategy != "multiprocessing":
535+
return [getattr(chnk, attr)(*args, **kwargs) for chnk in chain(it, self._end())]
536+
params = {
537+
"dbname": self._model.env.cr.dbname,
538+
"model_name": self._model._name,
539+
# convert to dict for pickle. Will still break if any value in the context is not pickleable
540+
"context": dict(self._model.env.context),
541+
"attr_name": attr,
542+
"args": args,
543+
"kwargs": kwargs,
544+
"mode": "browse",
545+
}
546+
self._model.env.cr.commit()
547+
extrakwargs = {"mp_context": multiprocessing.get_context("fork")} if sys.version_info >= (3, 7) else {}
548+
with ProcessPoolExecutor(max_workers=get_max_workers(), **extrakwargs) as executor:
549+
for chunk in it:
550+
collections.deque(
551+
executor.map(
552+
_mp_iter_browse_cb, chunks(chunk._ids, self._chunk_size, fmt=tuple), repeat(params)
553+
),
554+
maxlen=0,
555+
)
556+
next(self._end(), None)
557+
# do not return results in // mode, we expect it to be used for huge numbers of
558+
# records and thus would risk MemoryError, also we cannot know if what attr returns is pickleable
559+
return None
485560

486561
self._it = None
487562
return caller

0 commit comments

Comments
 (0)