@@ -403,6 +403,24 @@ def new_transaction(
403403 * args : Any ,
404404 ** kwargs : Any
405405 ) -> R :
406+ """Start a new database transaction with the given connection.
407+
408+ Note: The given func may be called multiple times under certain
409+ failure modes. This is normally fine when in a standard transaction,
410+ but care must be taken if the connection is in `autocommit` mode that
411+ the function will correctly handle being aborted and retried half way
412+ through its execution.
413+
414+ Args:
415+ conn
416+ desc
417+ after_callbacks
418+ exception_callbacks
419+ func
420+ *args
421+ **kwargs
422+ """
423+
406424 start = monotonic_time ()
407425 txn_id = self ._TXN_ID
408426
@@ -508,7 +526,12 @@ def new_transaction(
508526 sql_txn_timer .labels (desc ).observe (duration )
509527
510528 async def runInteraction (
511- self , desc : str , func : "Callable[..., R]" , * args : Any , ** kwargs : Any
529+ self ,
530+ desc : str ,
531+ func : "Callable[..., R]" ,
532+ * args : Any ,
533+ db_autocommit : bool = False ,
534+ ** kwargs : Any
512535 ) -> R :
513536 """Starts a transaction on the database and runs a given function
514537
@@ -518,6 +541,18 @@ async def runInteraction(
518541 database transaction (twisted.enterprise.adbapi.Transaction) as
519542 its first argument, followed by `args` and `kwargs`.
520543
544+ db_autocommit: Whether to run the function in "autocommit" mode,
545+ i.e. outside of a transaction. This is useful for transactions
546+ that are only a single query.
547+
548+ Currently, this is only implemented for Postgres. SQLite will still
549+ run the function inside a transaction.
550+
551+ WARNING: This means that if func fails half way through then
552+ the changes will *not* be rolled back. `func` may also get
553+ called multiple times if the transaction is retried, so must
554+ correctly handle that case.
555+
521556 args: positional args to pass to `func`
522557 kwargs: named args to pass to `func`
523558
@@ -538,6 +573,7 @@ async def runInteraction(
538573 exception_callbacks ,
539574 func ,
540575 * args ,
576+ db_autocommit = db_autocommit ,
541577 ** kwargs
542578 )
543579
@@ -551,7 +587,11 @@ async def runInteraction(
551587 return cast (R , result )
552588
553589 async def runWithConnection (
554- self , func : "Callable[..., R]" , * args : Any , ** kwargs : Any
590+ self ,
591+ func : "Callable[..., R]" ,
592+ * args : Any ,
593+ db_autocommit : bool = False ,
594+ ** kwargs : Any
555595 ) -> R :
556596 """Wraps the .runWithConnection() method on the underlying db_pool.
557597
@@ -560,6 +600,9 @@ async def runWithConnection(
560600 database connection (twisted.enterprise.adbapi.Connection) as
561601 its first argument, followed by `args` and `kwargs`.
562602 args: positional args to pass to `func`
603+ db_autocommit: Whether to run the function in "autocommit" mode,
604+ i.e. outside of a transaction. This is useful for transaction
605+ that are only a single query. Currently only affects postgres.
563606 kwargs: named args to pass to `func`
564607
565608 Returns:
@@ -575,6 +618,13 @@ async def runWithConnection(
575618 start_time = monotonic_time ()
576619
577620 def inner_func (conn , * args , ** kwargs ):
621+ # We shouldn't be in a transaction. If we are then something
622+ # somewhere hasn't committed after doing work. (This is likely only
623+ # possible during startup, as `run*` will ensure changes are
624+ # committed/rolled back before putting the connection back in the
625+ # pool).
626+ assert not self .engine .in_transaction (conn )
627+
578628 with LoggingContext ("runWithConnection" , parent_context ) as context :
579629 sched_duration_sec = monotonic_time () - start_time
580630 sql_scheduling_timer .observe (sched_duration_sec )
@@ -584,7 +634,14 @@ def inner_func(conn, *args, **kwargs):
584634 logger .debug ("Reconnecting closed database connection" )
585635 conn .reconnect ()
586636
587- return func (conn , * args , ** kwargs )
637+ try :
638+ if db_autocommit :
639+ self .engine .attempt_to_set_autocommit (conn , True )
640+
641+ return func (conn , * args , ** kwargs )
642+ finally :
643+ if db_autocommit :
644+ self .engine .attempt_to_set_autocommit (conn , False )
588645
589646 return await make_deferred_yieldable (
590647 self ._db_pool .runWithConnection (inner_func , * args , ** kwargs )
0 commit comments