From 9c9f200c421f751d20c295a6cdba268b65f985ef Mon Sep 17 00:00:00 2001 From: "Carsten Wolff (cawo)" Date: Fri, 12 Sep 2025 07:23:19 +0000 Subject: [PATCH 1/4] [IMP] orm: iter_browse accept generator or query as ids This allows the caller to be memory efficient on huge numbers of ids, allowing for even more millions of records to be browsed. --- src/util/orm.py | 42 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/src/util/orm.py b/src/util/orm.py index f43213a3d..bf1c77d26 100644 --- a/src/util/orm.py +++ b/src/util/orm.py @@ -11,6 +11,7 @@ import logging import re +import uuid from contextlib import contextmanager from functools import wraps from itertools import chain @@ -374,7 +375,8 @@ class iter_browse(object): :param model: the model to iterate :type model: :class:`odoo.model.Model` - :param list(int) ids: list of IDs of the records to iterate + :param iterable(int) or SQLStr ids: iterable of IDs of the records to iterate, or a SQL query + that can produce the IDs :param int chunk_size: number of records to load in each iteration chunk, `200` by default :param logger: logger used to report the progress, by default @@ -387,14 +389,14 @@ class iter_browse(object): See also :func:`~odoo.upgrade.util.orm.env` """ - __slots__ = ("_chunk_size", "_cr_uid", "_it", "_logger", "_model", "_patch", "_size", "_strategy") + __slots__ = ("_chunk_size", "_cr_uid", "_ids", "_it", "_logger", "_model", "_patch", "_size", "_strategy") def __init__(self, model, *args, **kw): assert len(args) in [1, 3] # either (cr, uid, ids) or (ids,) self._model = model self._cr_uid = args[:-1] - ids = args[-1] - self._size = len(ids) + self._ids = args[-1] + self._size = kw.pop("size", None) self._chunk_size = kw.pop("chunk_size", 200) # keyword-only argument self._logger = kw.pop("logger", _logger) self._strategy = kw.pop("strategy", "flush") @@ -402,8 +404,38 @@ def __init__(self, model, *args, **kw): if kw: raise TypeError("Unknown arguments: %s" % ", ".join(kw)) + if isinstance(self._ids, SQLStr): + self._ids_query() + + if not self._size: + try: + self._size = len(self._ids) + except TypeError: + raise ValueError("When passing ids as a generator, the size kwarg is mandatory") self._patch = None - self._it = chunks(ids, self._chunk_size, fmt=self._browse) + self._it = chunks(self._ids, self._chunk_size, fmt=self._browse) + + def _ids_query(self): + cr = self._model.env.cr + tmp_tbl = "_upgrade_ib_{}".format(uuid.uuid4().hex) + cr.execute( + format_query( + cr, "CREATE UNLOGGED TABLE {}(id) AS (WITH query AS ({}) SELECT * FROM query)", tmp_tbl, self._ids + ) + ) + self._size = cr.rowcount + cr.execute( + format_query(cr, "ALTER TABLE {} ADD CONSTRAINT {} PRIMARY KEY (id)", tmp_tbl, "pk_{}_id".format(tmp_tbl)) + ) + + def get_ids(): + with named_cursor(cr, itersize=self._chunk_size) as ncr: + ncr.execute(format_query(cr, "SELECT id FROM {} ORDER BY id", tmp_tbl)) + for (id_,) in ncr: + yield id_ + cr.execute(format_query(cr, "DROP TABLE IF EXISTS {}", tmp_tbl)) + + self._ids = get_ids() def _browse(self, ids): next(self._end(), None) From 4a56a321e171e508e73318d384a29f5ded9d3a38 Mon Sep 17 00:00:00 2001 From: "Carsten Wolff (cawo)" Date: Fri, 12 Sep 2025 07:31:23 +0000 Subject: [PATCH 2/4] [IMP] orm: add optional parallelism to iter_browse.__attr__() In some cases, e.g. if it is known that calling a certain method on the model will only trigger inserts or it is clear that updates will be disjunct, such method calls can be done in parallel. --- src/util/orm.py | 111 ++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 98 insertions(+), 13 deletions(-) diff --git a/src/util/orm.py b/src/util/orm.py index bf1c77d26..f5b2b69fc 100644 --- a/src/util/orm.py +++ b/src/util/orm.py @@ -9,14 +9,23 @@ on this module work along the ORM of *all* supported versions. """ +import collections import logging +import multiprocessing +import os import re +import sys import uuid from contextlib import contextmanager from functools import wraps -from itertools import chain +from itertools import chain, repeat from textwrap import dedent +try: + from concurrent.futures import ProcessPoolExecutor +except ImportError: + ProcessPoolExecutor = None + try: from unittest.mock import patch except ImportError: @@ -28,9 +37,9 @@ except ImportError: from odoo import SUPERUSER_ID from odoo import fields as ofields - from odoo import modules, release + from odoo import modules, release, sql_db except ImportError: - from openerp import SUPERUSER_ID, modules, release + from openerp import SUPERUSER_ID, modules, release, sql_db try: from openerp import fields as ofields @@ -42,8 +51,8 @@ from .const import BIG_TABLE_THRESHOLD from .exceptions import MigrationError from .helpers import table_of_model -from .misc import chunks, log_progress, version_between, version_gte -from .pg import SQLStr, column_exists, format_query, get_columns, named_cursor +from .misc import chunks, log_progress, str2bool, version_between, version_gte +from .pg import SQLStr, column_exists, format_query, get_columns, get_max_workers, named_cursor # python3 shims try: @@ -53,6 +62,10 @@ _logger = logging.getLogger(__name__) +UPG_PARALLEL_ITER_BROWSE = str2bool(os.environ.get("UPG_PARALLEL_ITER_BROWSE", "0")) +# FIXME: for CI! Remove before merge +UPG_PARALLEL_ITER_BROWSE = True + def env(cr): """ @@ -342,6 +355,21 @@ def get_ids(): cr.execute("DROP TABLE IF EXISTS _upgrade_rf") +def _mp_iter_browse_cb(ids_or_values, params): + me = _mp_iter_browse_cb + # init upon first call. Done here instead of initializer callback, because py3.6 doesn't have it + if not hasattr(me, "env"): + sql_db._Pool = None # children cannot borrow from copies of the same pool, it will cause protocol error + me.env = env(sql_db.db_connect(params["dbname"]).cursor()) + me.env.clear() + # process + if params["mode"] == "browse": + getattr( + me.env[params["model_name"]].with_context(params["context"]).browse(ids_or_values), params["attr_name"] + )(*params["args"], **params["kwargs"]) + me.env.cr.commit() + + class iter_browse(object): """ Iterate over recordsets. @@ -389,7 +417,18 @@ class iter_browse(object): See also :func:`~odoo.upgrade.util.orm.env` """ - __slots__ = ("_chunk_size", "_cr_uid", "_ids", "_it", "_logger", "_model", "_patch", "_size", "_strategy") + __slots__ = ( + "_chunk_size", + "_cr_uid", + "_ids", + "_it", + "_logger", + "_model", + "_patch", + "_size", + "_strategy", + "_superchunk_size", + ) def __init__(self, model, *args, **kw): assert len(args) in [1, 3] # either (cr, uid, ids) or (ids,) @@ -398,9 +437,30 @@ def __init__(self, model, *args, **kw): self._ids = args[-1] self._size = kw.pop("size", None) self._chunk_size = kw.pop("chunk_size", 200) # keyword-only argument + self._superchunk_size = self._chunk_size self._logger = kw.pop("logger", _logger) self._strategy = kw.pop("strategy", "flush") - assert self._strategy in {"flush", "commit"} + assert self._strategy in {"flush", "commit", "multiprocessing"} + if self._strategy == "multiprocessing": + if not ProcessPoolExecutor: + raise ValueError("multiprocessing strategy can not be used in scripts run by python2") + if UPG_PARALLEL_ITER_BROWSE: + self._superchunk_size = min(get_max_workers() * 10 * self._chunk_size, 1000000) + else: + self._strategy = "commit" # downgrade + if self._size > 100000: + _logger.warning( + "Browsing %d %s, which may take a long time. " + "This can be sped up by setting the env variable UPG_PARALLEL_ITER_BROWSE to 1. " + "If you do, be sure to examine the results carefully.", + self._size, + self._model._name, + ) + else: + _logger.info( + "Caller requested multiprocessing strategy, but UPG_PARALLEL_ITER_BROWSE env var is not set. " + "Downgrading strategy to commit.", + ) if kw: raise TypeError("Unknown arguments: %s" % ", ".join(kw)) @@ -429,7 +489,7 @@ def _ids_query(self): ) def get_ids(): - with named_cursor(cr, itersize=self._chunk_size) as ncr: + with named_cursor(cr, itersize=self._superchunk_size) as ncr: ncr.execute(format_query(cr, "SELECT id FROM {} ORDER BY id", tmp_tbl)) for (id_,) in ncr: yield id_ @@ -447,7 +507,7 @@ def _browse(self, ids): return self._model.browse(*args) def _end(self): - if self._strategy == "commit": + if self._strategy in ["commit", "multiprocessing"]: self._model.env.cr.commit() else: flush(self._model) @@ -474,15 +534,40 @@ def __getattr__(self, attr): if not callable(getattr(self._model, attr)): raise TypeError("The attribute %r is not callable" % attr) - it = self._it + it = chunks(self._ids, self._superchunk_size, fmt=self._browse) if self._logger: - sz = (self._size + self._chunk_size - 1) // self._chunk_size - qualifier = "%s[:%d]" % (self._model._name, self._chunk_size) + sz = (self._size + self._superchunk_size - 1) // self._superchunk_size + qualifier = "%s[:%d]" % (self._model._name, self._superchunk_size) it = log_progress(it, self._logger, qualifier=qualifier, size=sz) def caller(*args, **kwargs): args = self._cr_uid + args - return [getattr(chnk, attr)(*args, **kwargs) for chnk in chain(it, self._end())] + if self._strategy != "multiprocessing": + return [getattr(chnk, attr)(*args, **kwargs) for chnk in chain(it, self._end())] + params = { + "dbname": self._model.env.cr.dbname, + "model_name": self._model._name, + # convert to dict for pickle. Will still break if any value in the context is not pickleable + "context": dict(self._model.env.context), + "attr_name": attr, + "args": args, + "kwargs": kwargs, + "mode": "browse", + } + self._model.env.cr.commit() + extrakwargs = {"mp_context": multiprocessing.get_context("fork")} if sys.version_info >= (3, 7) else {} + with ProcessPoolExecutor(max_workers=get_max_workers(), **extrakwargs) as executor: + for chunk in it: + collections.deque( + executor.map( + _mp_iter_browse_cb, chunks(chunk._ids, self._chunk_size, fmt=tuple), repeat(params) + ), + maxlen=0, + ) + next(self._end(), None) + # do not return results in // mode, we expect it to be used for huge numbers of + # records and thus would risk MemoryError, also we cannot know if what attr returns is pickleable + return None self._it = None return caller From a40dde7c8e46bd454d607cb8c87efc06765c197e Mon Sep 17 00:00:00 2001 From: "Carsten Wolff (cawo)" Date: Tue, 30 Sep 2025 07:43:06 +0000 Subject: [PATCH 3/4] [IMP] orm: add optional parallelism to iter_browse.create() Like the same support added to `__attr__` in the parent commit, this can only be used by callers when it is known that database modifications will be distinct, not causing concurrency issues or side-effects on the results. `create` returns an `iter_browse` object for the caller to browse created records. With the multiprocessing strategy, we make the following changes to it: - To support vast amounts of created records in multiprocessing strategy, we process values in a generator and initialize the returned `iter_browse` object with it. As this requires the caller of `create` to always consume/iterate the result (otherwise records will not be created), it is not applied to the other strategies as it would break existing API. - make __iter__ yield chunks if strategy is multiprocessing. This way, a caller can process chunks of freshly created records `for records in util.iter_browse(strategy="multiprocessing").create(SQLStr)` and since everything from input to output is a generator, will be perfectly memory efficient. - do not pass the logger to the returned `iter_browse` object from `create`, if the strategy is multiprocessing, because it will lead to interleaved logging from the input generator and this one when the caller iterates it. --- src/util/orm.py | 78 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 60 insertions(+), 18 deletions(-) diff --git a/src/util/orm.py b/src/util/orm.py index f5b2b69fc..9a40d0139 100644 --- a/src/util/orm.py +++ b/src/util/orm.py @@ -367,7 +367,12 @@ def _mp_iter_browse_cb(ids_or_values, params): getattr( me.env[params["model_name"]].with_context(params["context"]).browse(ids_or_values), params["attr_name"] )(*params["args"], **params["kwargs"]) + if params["mode"] == "create": + new_ids = me.env[params["model_name"]].with_context(params["context"]).create(ids_or_values).ids me.env.cr.commit() + if params["mode"] == "create": + return new_ids + return None class iter_browse(object): @@ -522,8 +527,12 @@ def __iter__(self): raise RuntimeError("%r ran twice" % (self,)) it = chain.from_iterable(self._it) + sz = self._size + if self._strategy == "multiprocessing": + it = self._it + sz = (self._size + self._chunk_size - 1) // self._chunk_size if self._logger: - it = log_progress(it, self._logger, qualifier=self._model._name, size=self._size) + it = log_progress(it, self._logger, qualifier=self._model._name, size=sz) self._it = None return chain(it, self._end()) @@ -593,31 +602,64 @@ def create(self, values, **kw): if self._size: raise ValueError("`create` can only called on empty `browse_record` objects.") - ids = [] + if self._strategy == "multiprocessing" and not multi: + raise ValueError("The multiprocessing strategy only supports the multi version of `create`") + size = len(values) - it = chunks(values, self._chunk_size, fmt=list) + chunk_size = self._superchunk_size if self._strategy == "multiprocessing" else self._chunk_size + it = chunks(values, chunk_size, fmt=list) if self._logger: - sz = (size + self._chunk_size - 1) // self._chunk_size - qualifier = "env[%r].create([:%d])" % (self._model._name, self._chunk_size) + sz = (size + chunk_size - 1) // chunk_size + qualifier = "env[%r].create([:%d])" % (self._model._name, chunk_size) it = log_progress(it, self._logger, qualifier=qualifier, size=sz) - self._patch = no_selection_cache_validation() - for sub_values in it: + def mp_create(): + params = { + "dbname": self._model.env.cr.dbname, + "model_name": self._model._name, + # convert to dict for pickle. Will still break if any value in the context is not pickleable + "context": dict(self._model.env.context), + "mode": "create", + } + self._model.env.cr.commit() self._patch.start() + extrakwargs = {"mp_context": multiprocessing.get_context("fork")} if sys.version_info >= (3, 7) else {} + with ProcessPoolExecutor(max_workers=get_max_workers(), **extrakwargs) as executor: + for sub_values in it: + for task_result in executor.map( + _mp_iter_browse_cb, chunks(sub_values, self._chunk_size, fmt=tuple), repeat(params) + ): + self._model.env.cr.commit() # make task_result visible on main cursor before yielding ids + for new_id in task_result: + yield new_id + next(self._end(), None) - if multi: - ids += self._model.create(sub_values).ids - elif not self._cr_uid: - ids += [self._model.create(sub_value).id for sub_value in sub_values] - else: - # old API, `create` directly return the id - ids += [self._model.create(*(self._cr_uid + (sub_value,))) for sub_value in sub_values] + self._patch = no_selection_cache_validation() + if self._strategy == "multiprocessing": + ids = mp_create() + else: + ids = [] + for sub_values in it: + self._patch.start() + + if multi: + ids += self._model.create(sub_values).ids + elif not self._cr_uid: + ids += [self._model.create(sub_value).id for sub_value in sub_values] + else: + # old API, `create` directly return the id + ids += [self._model.create(*(self._cr_uid + (sub_value,))) for sub_value in sub_values] + + next(self._end(), None) - next(self._end(), None) args = self._cr_uid + (ids,) - return iter_browse( - self._model, *args, chunk_size=self._chunk_size, logger=self._logger, strategy=self._strategy - ) + kwargs = { + "size": size, + "chunk_size": self._chunk_size, + "logger": None if self._strategy == "multiprocessing" else self._logger, + "strategy": self._strategy, + } + return iter_browse(self._model, *args, **kwargs) @contextmanager From 060b1a38e4b27bd625c8380009d3c874325645eb Mon Sep 17 00:00:00 2001 From: "Carsten Wolff (cawo)" Date: Tue, 30 Sep 2025 10:14:28 +0000 Subject: [PATCH 4/4] [IMP] orm: iter_browse.create() accept generator or query as values Done to be able to create millions of records memory-efficiently. --- src/util/orm.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/src/util/orm.py b/src/util/orm.py index 9a40d0139..4a8872a2e 100644 --- a/src/util/orm.py +++ b/src/util/orm.py @@ -502,6 +502,19 @@ def get_ids(): self._ids = get_ids() + def _values_query(self, query): + cr = self._model.env.cr + cr.execute(format_query(cr, "WITH query AS ({}) SELECT count(*) FROM query", query)) + size = cr.fetchone()[0] + + def get_values(): + with named_cursor(cr, itersize=self._chunk_size) as ncr: + ncr.execute(query) + for row in ncr.iterdict(): + yield row + + return size, get_values() + def _browse(self, ids): next(self._end(), None) args = self._cr_uid + (list(ids),) @@ -593,6 +606,7 @@ def create(self, values, **kw): `True` from Odoo 12 and above """ multi = kw.pop("multi", version_gte("saas~11.5")) + size = kw.pop("size", None) if kw: raise TypeError("Unknown arguments: %s" % ", ".join(kw)) @@ -605,7 +619,15 @@ def create(self, values, **kw): if self._strategy == "multiprocessing" and not multi: raise ValueError("The multiprocessing strategy only supports the multi version of `create`") - size = len(values) + if isinstance(values, SQLStr): + size, values = self._values_query(values) + + if size is None: + try: + size = len(values) + except TypeError: + raise ValueError("When passing values as a generator, the size kwarg is mandatory") + chunk_size = self._superchunk_size if self._strategy == "multiprocessing" else self._chunk_size it = chunks(values, chunk_size, fmt=list) if self._logger: