|
12 | 12 | import signal
|
13 | 13 | import multiprocessing as mp
|
14 | 14 | import contextlib
|
| 15 | +import deepdiff |
15 | 16 |
|
16 | 17 | # noinspection PyExceptionInherit,PyCallingNonCallable
|
17 | 18 |
|
@@ -309,17 +310,46 @@ def _populate1(
|
309 | 310 | ):
|
310 | 311 | return False
|
311 | 312 |
|
312 |
| - self.connection.start_transaction() |
| 313 | + # if make is a generator, it transaction can be delayed until the final stage |
| 314 | + is_generator = inspect.isgeneratorfunction(make) |
| 315 | + if not is_generator: |
| 316 | + self.connection.start_transaction() |
| 317 | + |
313 | 318 | if key in self.target: # already populated
|
314 |
| - self.connection.cancel_transaction() |
| 319 | + if not is_generator: |
| 320 | + self.connection.cancel_transaction() |
315 | 321 | if jobs is not None:
|
316 | 322 | jobs.complete(self.target.table_name, self._job_key(key))
|
317 | 323 | return False
|
318 | 324 |
|
319 | 325 | logger.debug(f"Making {key} -> {self.target.full_table_name}")
|
320 | 326 | self.__class__._allow_insert = True
|
| 327 | + |
321 | 328 | try:
|
322 |
| - make(dict(key), **(make_kwargs or {})) |
| 329 | + if not is_generator: |
| 330 | + make(dict(key), **(make_kwargs or {})) |
| 331 | + else: |
| 332 | + # tripartite make - transaction is delayed until the final stage |
| 333 | + gen = make(dict(key), **(make_kwargs or {})) |
| 334 | + fetched_data = next(gen) |
| 335 | + fetch_hash = deepdiff.DeepHash( |
| 336 | + fetched_data, ignore_iterable_order=False |
| 337 | + )[fetched_data] |
| 338 | + computed_result = next(gen) # perform the computation |
| 339 | + gen = make(dict(key), **(make_kwargs or {})) # restart make |
| 340 | + # fetch and insert inside a transaction |
| 341 | + self.connnection.start_transaction() |
| 342 | + fetched_data = next(gen) |
| 343 | + if ( |
| 344 | + fetch_hash |
| 345 | + != deepdiff.DeepHash(fetched_data, ignore_iterable_order=False)[ |
| 346 | + fetched_data |
| 347 | + ] |
| 348 | + ): # rollback due to referential integrity fail |
| 349 | + self.connection.cancel_transaction() |
| 350 | + return False |
| 351 | + gen.send(computed_result) # insert |
| 352 | + |
323 | 353 | except (KeyboardInterrupt, SystemExit, Exception) as error:
|
324 | 354 | try:
|
325 | 355 | self.connection.cancel_transaction()
|
|
0 commit comments