|  | 
| 1 | 1 | package db | 
| 2 | 2 | 
 | 
| 3 |  | -import ( | 
| 4 |  | -	"context" | 
| 5 |  | -	"database/sql" | 
| 6 |  | -	"math" | 
| 7 |  | -	prand "math/rand" | 
| 8 |  | -	"time" | 
| 9 |  | - | 
| 10 |  | -	"github.com/lightninglabs/lightning-terminal/db/sqlc" | 
| 11 |  | -	"github.com/lightningnetwork/lnd/sqldb/v2" | 
| 12 |  | -) | 
| 13 |  | - | 
| 14 |  | -var ( | 
| 15 |  | -	// DefaultStoreTimeout is the default timeout used for any interaction | 
| 16 |  | -	// with the storage/database. | 
| 17 |  | -	DefaultStoreTimeout = time.Second * 10 | 
| 18 |  | -) | 
| 19 |  | - | 
| 20 |  | -const ( | 
| 21 |  | -	// DefaultNumTxRetries is the default number of times we'll retry a | 
| 22 |  | -	// transaction if it fails with an error that permits transaction | 
| 23 |  | -	// repetition. | 
| 24 |  | -	DefaultNumTxRetries = 10 | 
| 25 |  | - | 
| 26 |  | -	// DefaultInitialRetryDelay is the default initial delay between | 
| 27 |  | -	// retries. This will be used to generate a random delay between -50% | 
| 28 |  | -	// and +50% of this value, so 20 to 60 milliseconds. The retry will be | 
| 29 |  | -	// doubled after each attempt until we reach DefaultMaxRetryDelay. We | 
| 30 |  | -	// start with a random value to avoid multiple goroutines that are | 
| 31 |  | -	// created at the same time to effectively retry at the same time. | 
| 32 |  | -	DefaultInitialRetryDelay = time.Millisecond * 40 | 
| 33 |  | - | 
| 34 |  | -	// DefaultMaxRetryDelay is the default maximum delay between retries. | 
| 35 |  | -	DefaultMaxRetryDelay = time.Second * 3 | 
| 36 |  | -) | 
| 37 |  | - | 
| 38 |  | -// TxOptions represents a set of options one can use to control what type of | 
| 39 |  | -// database transaction is created. Transaction can wither be read or write. | 
| 40 |  | -type TxOptions interface { | 
| 41 |  | -	// ReadOnly returns true if the transaction should be read only. | 
| 42 |  | -	ReadOnly() bool | 
| 43 |  | -} | 
| 44 |  | - | 
| 45 |  | -// BatchedTx is a generic interface that represents the ability to execute | 
| 46 |  | -// several operations to a given storage interface in a single atomic | 
| 47 |  | -// transaction. Typically, Q here will be some subset of the main sqlc.Querier | 
| 48 |  | -// interface allowing it to only depend on the routines it needs to implement | 
| 49 |  | -// any additional business logic. | 
| 50 |  | -type BatchedTx[Q any] interface { | 
| 51 |  | -	// ExecTx will execute the passed txBody, operating upon generic | 
| 52 |  | -	// parameter Q (usually a storage interface) in a single transaction. | 
| 53 |  | -	// The set of TxOptions are passed in in order to allow the caller to | 
| 54 |  | -	// specify if a transaction should be read-only and optionally what | 
| 55 |  | -	// type of concurrency control should be used. | 
| 56 |  | -	ExecTx(ctx context.Context, txOptions TxOptions, | 
| 57 |  | -		txBody func(Q) error) error | 
| 58 |  | - | 
| 59 |  | -	// Backend returns the type of the database backend used. | 
| 60 |  | -	Backend() sqldb.BackendType | 
| 61 |  | -} | 
| 62 |  | - | 
| 63 |  | -// Tx represents a database transaction that can be committed or rolled back. | 
| 64 |  | -type Tx interface { | 
| 65 |  | -	// Commit commits the database transaction, an error should be returned | 
| 66 |  | -	// if the commit isn't possible. | 
| 67 |  | -	Commit() error | 
| 68 |  | - | 
| 69 |  | -	// Rollback rolls back an incomplete database transaction. | 
| 70 |  | -	// Transactions that were able to be committed can still call this as a | 
| 71 |  | -	// noop. | 
| 72 |  | -	Rollback() error | 
| 73 |  | -} | 
| 74 |  | - | 
| 75 |  | -// QueryCreator is a generic function that's used to create a Querier, which is | 
| 76 |  | -// a type of interface that implements storage related methods from a database | 
| 77 |  | -// transaction. This will be used to instantiate an object callers can use to | 
| 78 |  | -// apply multiple modifications to an object interface in a single atomic | 
| 79 |  | -// transaction. | 
| 80 |  | -type QueryCreator[Q any] func(*sql.Tx) Q | 
| 81 |  | - | 
| 82 |  | -// BatchedQuerier is a generic interface that allows callers to create a new | 
| 83 |  | -// database transaction based on an abstract type that implements the TxOptions | 
| 84 |  | -// interface. | 
| 85 |  | -type BatchedQuerier interface { | 
| 86 |  | -	// Querier is the underlying query source, this is in place so we can | 
| 87 |  | -	// pass a BatchedQuerier implementation directly into objects that | 
| 88 |  | -	// create a batched version of the normal methods they need. | 
| 89 |  | -	sqlc.Querier | 
| 90 |  | - | 
| 91 |  | -	// CustomQueries is the set of custom queries that we have manually | 
| 92 |  | -	// defined in addition to the ones generated by sqlc. | 
| 93 |  | -	sqlc.CustomQueries | 
| 94 |  | - | 
| 95 |  | -	// BeginTx creates a new database transaction given the set of | 
| 96 |  | -	// transaction options. | 
| 97 |  | -	BeginTx(ctx context.Context, options TxOptions) (*sql.Tx, error) | 
| 98 |  | -} | 
| 99 |  | - | 
| 100 |  | -// txExecutorOptions is a struct that holds the options for the transaction | 
| 101 |  | -// executor. This can be used to do things like retry a transaction due to an | 
| 102 |  | -// error a certain amount of times. | 
| 103 |  | -type txExecutorOptions struct { | 
| 104 |  | -	numRetries        int | 
| 105 |  | -	initialRetryDelay time.Duration | 
| 106 |  | -	maxRetryDelay     time.Duration | 
| 107 |  | -} | 
| 108 |  | - | 
| 109 |  | -// defaultTxExecutorOptions returns the default options for the transaction | 
| 110 |  | -// executor. | 
| 111 |  | -func defaultTxExecutorOptions() *txExecutorOptions { | 
| 112 |  | -	return &txExecutorOptions{ | 
| 113 |  | -		numRetries:        DefaultNumTxRetries, | 
| 114 |  | -		initialRetryDelay: DefaultInitialRetryDelay, | 
| 115 |  | -		maxRetryDelay:     DefaultMaxRetryDelay, | 
| 116 |  | -	} | 
| 117 |  | -} | 
| 118 |  | - | 
| 119 |  | -// randRetryDelay returns a random retry delay between -50% and +50% | 
| 120 |  | -// of the configured delay that is doubled for each attempt and capped at a max | 
| 121 |  | -// value. | 
| 122 |  | -func (t *txExecutorOptions) randRetryDelay(attempt int) time.Duration { | 
| 123 |  | -	halfDelay := t.initialRetryDelay / 2 | 
| 124 |  | -	randDelay := prand.Int63n(int64(t.initialRetryDelay)) //nolint:gosec | 
| 125 |  | - | 
| 126 |  | -	// 50% plus 0%-100% gives us the range of 50%-150%. | 
| 127 |  | -	initialDelay := halfDelay + time.Duration(randDelay) | 
| 128 |  | - | 
| 129 |  | -	// If this is the first attempt, we just return the initial delay. | 
| 130 |  | -	if attempt == 0 { | 
| 131 |  | -		return initialDelay | 
| 132 |  | -	} | 
| 133 |  | - | 
| 134 |  | -	// For each subsequent delay, we double the initial delay. This still | 
| 135 |  | -	// gives us a somewhat random delay, but it still increases with each | 
| 136 |  | -	// attempt. If we double something n times, that's the same as | 
| 137 |  | -	// multiplying the value with 2^n. We limit the power to 32 to avoid | 
| 138 |  | -	// overflows. | 
| 139 |  | -	factor := time.Duration(math.Pow(2, math.Min(float64(attempt), 32))) | 
| 140 |  | -	actualDelay := initialDelay * factor | 
| 141 |  | - | 
| 142 |  | -	// Cap the delay at the maximum configured value. | 
| 143 |  | -	if actualDelay > t.maxRetryDelay { | 
| 144 |  | -		return t.maxRetryDelay | 
| 145 |  | -	} | 
| 146 |  | - | 
| 147 |  | -	return actualDelay | 
| 148 |  | -} | 
| 149 |  | - | 
| 150 |  | -// TxExecutorOption is a functional option that allows us to pass in optional | 
| 151 |  | -// argument when creating the executor. | 
| 152 |  | -type TxExecutorOption func(*txExecutorOptions) | 
| 153 |  | - | 
| 154 |  | -// WithTxRetries is a functional option that allows us to specify the number of | 
| 155 |  | -// times a transaction should be retried if it fails with a repeatable error. | 
| 156 |  | -func WithTxRetries(numRetries int) TxExecutorOption { | 
| 157 |  | -	return func(o *txExecutorOptions) { | 
| 158 |  | -		o.numRetries = numRetries | 
| 159 |  | -	} | 
| 160 |  | -} | 
| 161 |  | - | 
| 162 |  | -// WithTxRetryDelay is a functional option that allows us to specify the delay | 
| 163 |  | -// to wait before a transaction is retried. | 
| 164 |  | -func WithTxRetryDelay(delay time.Duration) TxExecutorOption { | 
| 165 |  | -	return func(o *txExecutorOptions) { | 
| 166 |  | -		o.initialRetryDelay = delay | 
| 167 |  | -	} | 
| 168 |  | -} | 
| 169 |  | - | 
| 170 |  | -// TransactionExecutor is a generic struct that abstracts away from the type of | 
| 171 |  | -// query a type needs to run under a database transaction, and also the set of | 
| 172 |  | -// options for that transaction. The QueryCreator is used to create a query | 
| 173 |  | -// given a database transaction created by the BatchedQuerier. | 
| 174 |  | -type TransactionExecutor[Query any] struct { | 
| 175 |  | -	BatchedQuerier | 
| 176 |  | - | 
| 177 |  | -	createQuery QueryCreator[Query] | 
| 178 |  | - | 
| 179 |  | -	opts *txExecutorOptions | 
| 180 |  | -} | 
| 181 |  | - | 
| 182 |  | -// NewTransactionExecutor creates a new instance of a TransactionExecutor given | 
| 183 |  | -// a Querier query object and a concrete type for the type of transactions the | 
| 184 |  | -// Querier understands. | 
| 185 |  | -func NewTransactionExecutor[Querier any](db BatchedQuerier, | 
| 186 |  | -	createQuery QueryCreator[Querier], | 
| 187 |  | -	opts ...TxExecutorOption) *TransactionExecutor[Querier] { | 
| 188 |  | - | 
| 189 |  | -	txOpts := defaultTxExecutorOptions() | 
| 190 |  | -	for _, optFunc := range opts { | 
| 191 |  | -		optFunc(txOpts) | 
| 192 |  | -	} | 
| 193 |  | - | 
| 194 |  | -	return &TransactionExecutor[Querier]{ | 
| 195 |  | -		BatchedQuerier: db, | 
| 196 |  | -		createQuery:    createQuery, | 
| 197 |  | -		opts:           txOpts, | 
| 198 |  | -	} | 
| 199 |  | -} | 
| 200 |  | - | 
| 201 |  | -// ExecTx is a wrapper for txBody to abstract the creation and commit of a db | 
| 202 |  | -// transaction. The db transaction is embedded in a `*Queries` that txBody | 
| 203 |  | -// needs to use when executing each one of the queries that need to be applied | 
| 204 |  | -// atomically. This can be used by other storage interfaces to parameterize the | 
| 205 |  | -// type of query and options run, in order to have access to batched operations | 
| 206 |  | -// related to a storage object. | 
| 207 |  | -func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context, | 
| 208 |  | -	txOptions TxOptions, txBody func(Q) error) error { | 
| 209 |  | - | 
| 210 |  | -	waitBeforeRetry := func(attemptNumber int) { | 
| 211 |  | -		retryDelay := t.opts.randRetryDelay(attemptNumber) | 
| 212 |  | - | 
| 213 |  | -		log.Tracef("Retrying transaction due to tx serialization or "+ | 
| 214 |  | -			"deadlock error, attempt_number=%v, delay=%v", | 
| 215 |  | -			attemptNumber, retryDelay) | 
| 216 |  | - | 
| 217 |  | -		// Before we try again, we'll wait with a random backoff based | 
| 218 |  | -		// on the retry delay. | 
| 219 |  | -		time.Sleep(retryDelay) | 
| 220 |  | -	} | 
| 221 |  | - | 
| 222 |  | -	for i := 0; i < t.opts.numRetries; i++ { | 
| 223 |  | -		// Create the db transaction. | 
| 224 |  | -		tx, err := t.BatchedQuerier.BeginTx(ctx, txOptions) | 
| 225 |  | -		if err != nil { | 
| 226 |  | -			dbErr := MapSQLError(err) | 
| 227 |  | -			if IsSerializationOrDeadlockError(dbErr) { | 
| 228 |  | -				// Nothing to roll back here, since we didn't | 
| 229 |  | -				// even get a transaction yet. | 
| 230 |  | -				waitBeforeRetry(i) | 
| 231 |  | -				continue | 
| 232 |  | -			} | 
| 233 |  | - | 
| 234 |  | -			return dbErr | 
| 235 |  | -		} | 
| 236 |  | - | 
| 237 |  | -		// Rollback is safe to call even if the tx is already closed, | 
| 238 |  | -		// so if the tx commits successfully, this is a no-op. | 
| 239 |  | -		defer func() { | 
| 240 |  | -			_ = tx.Rollback() | 
| 241 |  | -		}() | 
| 242 |  | - | 
| 243 |  | -		if err := txBody(t.createQuery(tx)); err != nil { | 
| 244 |  | -			dbErr := MapSQLError(err) | 
| 245 |  | -			if IsSerializationOrDeadlockError(dbErr) { | 
| 246 |  | -				// Roll back the transaction, then pop back up | 
| 247 |  | -				// to try once again. | 
| 248 |  | -				_ = tx.Rollback() | 
| 249 |  | - | 
| 250 |  | -				waitBeforeRetry(i) | 
| 251 |  | -				continue | 
| 252 |  | -			} | 
| 253 |  | - | 
| 254 |  | -			return dbErr | 
| 255 |  | -		} | 
| 256 |  | - | 
| 257 |  | -		// Commit transaction. | 
| 258 |  | -		if err = tx.Commit(); err != nil { | 
| 259 |  | -			dbErr := MapSQLError(err) | 
| 260 |  | -			if IsSerializationOrDeadlockError(dbErr) { | 
| 261 |  | -				// Roll back the transaction, then pop back up | 
| 262 |  | -				// to try once again. | 
| 263 |  | -				_ = tx.Rollback() | 
| 264 |  | - | 
| 265 |  | -				waitBeforeRetry(i) | 
| 266 |  | -				continue | 
| 267 |  | -			} | 
| 268 |  | - | 
| 269 |  | -			return dbErr | 
| 270 |  | -		} | 
| 271 |  | - | 
| 272 |  | -		return nil | 
| 273 |  | -	} | 
| 274 |  | - | 
| 275 |  | -	// If we get to this point, then we weren't able to successfully commit | 
| 276 |  | -	// a tx given the max number of retries. | 
| 277 |  | -	return ErrRetriesExceeded | 
| 278 |  | -} | 
| 279 |  | - | 
| 280 |  | -// Backend returns the type of the database backend used. | 
| 281 |  | -func (t *TransactionExecutor[Q]) Backend() sqldb.BackendType { | 
| 282 |  | -	return t.BatchedQuerier.Backend() | 
| 283 |  | -} | 
| 284 |  | - | 
| 285 |  | -// BaseDB is the base database struct that each implementation can embed to | 
| 286 |  | -// gain some common functionality. | 
| 287 |  | -type BaseDB struct { | 
| 288 |  | -	*sql.DB | 
| 289 |  | - | 
| 290 |  | -	*sqlc.Queries | 
| 291 |  | -} | 
| 292 |  | - | 
| 293 |  | -// BeginTx wraps the normal sql specific BeginTx method with the TxOptions | 
| 294 |  | -// interface. This interface is then mapped to the concrete sql tx options | 
| 295 |  | -// struct. | 
| 296 |  | -func (s *BaseDB) BeginTx(ctx context.Context, opts TxOptions) (*sql.Tx, error) { | 
| 297 |  | -	sqlOptions := sql.TxOptions{ | 
| 298 |  | -		ReadOnly:  opts.ReadOnly(), | 
| 299 |  | -		Isolation: sql.LevelSerializable, | 
| 300 |  | -	} | 
| 301 |  | -	return s.DB.BeginTx(ctx, &sqlOptions) | 
| 302 |  | -} | 
| 303 |  | - | 
| 304 |  | -// Backend returns the type of the database backend used. | 
| 305 |  | -func (s *BaseDB) Backend() sqldb.BackendType { | 
| 306 |  | -	return s.Queries.Backend() | 
| 307 |  | -} | 
| 308 |  | - | 
| 309 | 3 | // QueriesTxOptions defines the set of db txn options the SQLQueries | 
| 310 | 4 | // understands. | 
| 311 | 5 | type QueriesTxOptions struct { | 
|  | 
0 commit comments