Skip to content

Commit a0538d7

Browse files
committed
[IMP] orm: add optional parallelism to iter_browse.create()
Like the same support added to `__attr__` in the parent commit, this can only be used by callers when it is known that database modifications will be distinct, not causing concurrency issues or side-effects on the results. `create` returns an `iter_browse` object for the caller to browse created records. With the multiprocessing strategy, we make the following changes to it: - To support vast amounts of created records in multiprocessing strategy, we process values in a generator and initialize the returned `iter_browse` object with it. As this requires the caller of `create` to always consume/iterate the result (otherwise records will not be created), it is not applied to the other strategies as it would break existing API. - make __iter__ yield chunks if strategy is multiprocessing. This way, a caller can process chunks of freshly created records `for records in util.iter_browse(strategy="multiprocessing").create(SQLStr)` and since everything from input to output is a generator, will be perfectly memory efficient. - do not pass the logger to the returned `iter_browse` object from `create`, if the strategy is multiprocessing, because it will lead to interleaved logging from the input generator and this one when the caller iterates it.
1 parent ede78cd commit a0538d7

File tree

1 file changed

+53
-0
lines changed

1 file changed

+53
-0
lines changed

src/util/orm.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,12 @@ def _mp_iter_browse_cb(ids_or_values, params):
354354
getattr(
355355
me.env[params["model_name"]].with_context(params["context"]).browse(ids_or_values), params["attr_name"]
356356
)(*params["args"], **params["kwargs"])
357+
if params["mode"] == "create":
358+
new_ids = me.env[params["model_name"]].with_context(params["context"]).create(ids_or_values).ids
357359
me.env.cr.commit()
360+
if params["mode"] == "create":
361+
return new_ids
362+
return None
358363

359364

360365
class iter_browse(object):
@@ -595,6 +600,12 @@ def create(self, values=None, query=None, **kw):
595600
except TypeError:
596601
raise ValueError("When passing a generator of values, the size kwarg is mandatory")
597602

603+
if self._strategy == "multiprocessing":
604+
return self._create_multiprocessing(values, size, multi)
605+
606+
return self._create(values, size, multi)
607+
608+
def _create(self, values, size, multi):
598609
it = chunks(values, self._chunk_size, fmt=list)
599610
if self._logger:
600611
sz = (size + self._chunk_size - 1) // self._chunk_size
@@ -620,6 +631,48 @@ def create(self, values=None, query=None, **kw):
620631
self._model, *args, chunk_size=self._chunk_size, logger=self._logger, strategy=self._strategy
621632
)
622633

634+
def _create_multiprocessing(self, values, size, multi):
635+
if not multi:
636+
raise ValueError("The multiprocessing strategy only supports the multi version of `create`")
637+
638+
it = chunks(values, self._superchunk_size, fmt=list)
639+
if self._logger:
640+
sz = (size + self._superchunk_size - 1) // self._superchunk_size
641+
qualifier = "env[%r].create([:%d])" % (self._model._name, self._superchunk_size)
642+
it = log_progress(it, self._logger, qualifier=qualifier, size=sz)
643+
644+
def iter_proc():
645+
params = {
646+
"dbname": self._model.env.cr.dbname,
647+
"model_name": self._model._name,
648+
# convert to dict for pickle. Will still break if any value in the context is not pickleable
649+
"context": dict(self._model.env.context),
650+
"mode": "create",
651+
}
652+
self._model.env.cr.commit()
653+
self._patch.start()
654+
extrakwargs = {"mp_context": multiprocessing.get_context("fork")} if sys.version_info >= (3, 7) else {}
655+
with ProcessPoolExecutor(max_workers=get_max_workers(), **extrakwargs) as executor:
656+
for sub_values in it:
657+
for task_result in executor.map(
658+
_mp_iter_browse_cb, chunks(sub_values, self._chunk_size, fmt=tuple), repeat(params)
659+
):
660+
self._model.env.cr.commit() # make task_result visible on main cursor before yielding ids
661+
for new_id in task_result:
662+
yield new_id
663+
next(self._end(), None)
664+
665+
self._patch = no_selection_cache_validation()
666+
args = self._cr_uid + (iter_proc(),)
667+
kwargs = {
668+
"size": size,
669+
"chunk_size": self._chunk_size,
670+
"logger": None,
671+
"strategy": self._strategy,
672+
"yield_chunks": self._yield_chunks,
673+
}
674+
return iter_browse(self._model, *args, **kwargs)
675+
623676

624677
@contextmanager
625678
def custom_module_field_as_manual(env, rollback=True, do_flush=False):

0 commit comments

Comments
 (0)