33import sys
44import time
55from collections import deque
6+ from contextlib import AsyncExitStack
67from contextlib import contextmanager
78from contextlib import suppress
89from os .path import exists
5859from dipdup .utils import FormattedLogger
5960from dipdup .utils import slowdown
6061from dipdup .utils .database import execute_sql_scripts
62+ from dipdup .utils .database import in_global_transaction
6163from dipdup .utils .database import wipe_schema
6264
6365DatasourceT = TypeVar ('DatasourceT' , bound = Datasource )
@@ -331,6 +333,7 @@ async def fire_handler(
331333 handler_config = handler_config ,
332334 datasource = datasource ,
333335 )
336+ # NOTE: Handlers are not atomic, levels are. Do not open transaction here.
334337 with self ._callback_wrapper ('handler' , name ):
335338 await handler_config .callback_fn (new_ctx , * args , ** kwargs )
336339
@@ -355,7 +358,12 @@ async def fire_hook(
355358 self ._verify_arguments (new_ctx , * args , ** kwargs )
356359
357360 async def _wrapper ():
358- with self ._callback_wrapper ('hook' , name ):
361+ async with AsyncExitStack () as stack :
362+
363+ stack .enter_context (self ._callback_wrapper ('hook' , name ))
364+ if hook_config .atomic :
365+ await stack .enter_async_context (in_global_transaction ())
366+
359367 await hook_config .callback_fn (ctx , * args , ** kwargs )
360368
361369 if wait :
0 commit comments