|
12 | 12 | import signal |
13 | 13 | import multiprocessing as mp |
14 | 14 | import contextlib |
| 15 | +import deepdiff |
15 | 16 |
|
16 | 17 | # noinspection PyExceptionInherit,PyCallingNonCallable |
17 | 18 |
|
@@ -309,17 +310,46 @@ def _populate1( |
309 | 310 | ): |
310 | 311 | return False |
311 | 312 |
|
312 | | - self.connection.start_transaction() |
| 313 | + # if make is a generator, it transaction can be delayed until the final stage |
| 314 | + is_generator = inspect.isgeneratorfunction(make) |
| 315 | + if not is_generator: |
| 316 | + self.connection.start_transaction() |
| 317 | + |
313 | 318 | if key in self.target: # already populated |
314 | | - self.connection.cancel_transaction() |
| 319 | + if not is_generator: |
| 320 | + self.connection.cancel_transaction() |
315 | 321 | if jobs is not None: |
316 | 322 | jobs.complete(self.target.table_name, self._job_key(key)) |
317 | 323 | return False |
318 | 324 |
|
319 | 325 | logger.debug(f"Making {key} -> {self.target.full_table_name}") |
320 | 326 | self.__class__._allow_insert = True |
| 327 | + |
321 | 328 | try: |
322 | | - make(dict(key), **(make_kwargs or {})) |
| 329 | + if not is_generator: |
| 330 | + make(dict(key), **(make_kwargs or {})) |
| 331 | + else: |
| 332 | + # tripartite make - transaction is delayed until the final stage |
| 333 | + gen = make(dict(key), **(make_kwargs or {})) |
| 334 | + fetched_data = next(gen) |
| 335 | + fetch_hash = deepdiff.DeepHash( |
| 336 | + fetched_data, ignore_iterable_order=False |
| 337 | + )[fetched_data] |
| 338 | + computed_result = next(gen) # perform the computation |
| 339 | + gen = make(dict(key), **(make_kwargs or {})) # restart make |
| 340 | + # fetch and insert inside a transaction |
| 341 | + self.connnection.start_transaction() |
| 342 | + fetched_data = next(gen) |
| 343 | + if ( |
| 344 | + fetch_hash |
| 345 | + != deepdiff.DeepHash(fetched_data, ignore_iterable_order=False)[ |
| 346 | + fetched_data |
| 347 | + ] |
| 348 | + ): # rollback due to referential integrity fail |
| 349 | + self.connection.cancel_transaction() |
| 350 | + return False |
| 351 | + gen.send(computed_result) # insert |
| 352 | + |
323 | 353 | except (KeyboardInterrupt, SystemExit, Exception) as error: |
324 | 354 | try: |
325 | 355 | self.connection.cancel_transaction() |
|
0 commit comments