Skip to content

Commit 2b18909

Browse files
committed
[IMP] orm: add optional parallelism to iter_browse.create()
Like the same support added to `__attr__` in the parent commit, this can only be used by callers when it is known that database modifications will be distinct, not causing concurrency issues or side-effects on the results. `create` returns an `iter_browse` object for the caller to browse created records. With the multiprocessing strategy, we make the following changes to it: - To support vast amounts of created records in multiprocessing strategy, we process values in a generator and initialize the returned `iter_browse` object with it. As this requires the caller of `create` to always consume/iterate the result (otherwise records will not be created), it is not applied to the other strategies as it would break existing API. - make __iter__ yield chunks if strategy is multiprocessing. This way, a caller can process chunks of freshly created records `for records in util.iter_browse(strategy="multiprocessing").create(SQLStr)` and since everything from input to output is a generator, will be perfectly memory efficient. - do not pass the logger to the returned `iter_browse` object from `create`, if the strategy is multiprocessing, because it will lead to interleaved logging from the input generator and this one when the caller iterates it.
1 parent 75855a6 commit 2b18909

File tree

1 file changed

+52
-1
lines changed

1 file changed

+52
-1
lines changed

src/util/orm.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,12 @@ def _mp_iter_browse_cb(ids_or_values, params):
354354
getattr(
355355
me.env[params["model_name"]].with_context(params["context"]).browse(ids_or_values), params["attr_name"]
356356
)(*params["args"], **params["kwargs"])
357+
if params["mode"] == "create":
358+
new_ids = me.env[params["model_name"]].with_context(params["context"]).create(ids_or_values).ids
357359
me.env.cr.commit()
360+
if params["mode"] == "create":
361+
return new_ids
362+
return None
358363

359364

360365
class iter_browse(object):
@@ -506,8 +511,12 @@ def __iter__(self):
506511
raise RuntimeError("%r ran twice" % (self,))
507512

508513
it = chain.from_iterable(self._it)
514+
sz = self._size
515+
if self._strategy == "multiprocessing":
516+
it = self._it
517+
sz = (self._size + self._chunk_size - 1) // self._chunk_size
509518
if self._logger:
510-
it = log_progress(it, self._logger, qualifier=self._model._name, size=self._size)
519+
it = log_progress(it, self._logger, qualifier=self._model._name, size=sz)
511520
self._it = None
512521
return chain(it, self._end())
513522

@@ -590,6 +599,12 @@ def create(self, values=None, query=None, **kw):
590599
except TypeError:
591600
raise ValueError("When passing a generator of values, the size kwarg is mandatory")
592601

602+
if self._strategy == "multiprocessing":
603+
return self._create_multiprocess(values, size, multi)
604+
605+
return self._create(values, size, multi)
606+
607+
def _create(self, values, size, multi):
593608
it = chunks(values, self._chunk_size, fmt=list)
594609
if self._logger:
595610
sz = (size + self._chunk_size - 1) // self._chunk_size
@@ -615,6 +630,42 @@ def create(self, values=None, query=None, **kw):
615630
self._model, *args, chunk_size=self._chunk_size, logger=self._logger, strategy=self._strategy
616631
)
617632

633+
def _create_multiprocess(self, values, size, multi):
634+
if not multi:
635+
raise ValueError("The multiprocessing strategy only supports the multi version of `create`")
636+
637+
it = chunks(values, self._superchunk_size, fmt=list)
638+
if self._logger:
639+
sz = (size + self._superchunk_size - 1) // self._superchunk_size
640+
qualifier = "env[%r].create([:%d])" % (self._model._name, self._superchunk_size)
641+
it = log_progress(it, self._logger, qualifier=qualifier, size=sz)
642+
643+
def iter_proc():
644+
params = {
645+
"dbname": self._model.env.cr.dbname,
646+
"model_name": self._model._name,
647+
# convert to dict for pickle. Will still break if any value in the context is not pickleable
648+
"context": dict(self._model.env.context),
649+
"mode": "create",
650+
}
651+
self._model.env.cr.commit()
652+
self._patch.start()
653+
extrakwargs = {"mp_context": multiprocessing.get_context("fork")} if sys.version_info >= (3, 7) else {}
654+
with ProcessPoolExecutor(max_workers=get_max_workers(), **extrakwargs) as executor:
655+
for sub_values in it:
656+
for task_result in executor.map(
657+
_mp_iter_browse_cb, chunks(sub_values, self._chunk_size, fmt=tuple), repeat(params)
658+
):
659+
self._model.env.cr.commit() # make task_result visible on main cursor before yielding ids
660+
for new_id in task_result:
661+
yield new_id
662+
next(self._end(), None)
663+
664+
self._patch = no_selection_cache_validation()
665+
args = self._cr_uid + (iter_proc(),)
666+
kwargs = {"size": size, "chunk_size": self._chunk_size, "logger": None, "strategy": self._strategy}
667+
return iter_browse(self._model, *args, **kwargs)
668+
618669

619670
@contextmanager
620671
def custom_module_field_as_manual(env, rollback=True, do_flush=False):

0 commit comments

Comments
 (0)