Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion invoices/kv_sql_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestMigrationWithChannelDB(t *testing.T) {
// Just some sane defaults for the sqlite config.
const (
timeout = 5 * time.Second
maxConns = 50
maxConns = 2
)

sqliteConfig := &sqlite.Config{
Expand Down
5 changes: 4 additions & 1 deletion kvdb/kvdb_sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ const (
// tag is defined. This will allow testing of other database backends.
SqliteBackend = true

testMaxConnections = 50
// Sqlite allows for concurrent reads however writers are limited to
// 1 so we select 2 here to allow for concurrent reads but keep the
// number of connections low to avoid write contention.
testMaxConnections = 2
)

// StartSqliteTestBackend starts a sqlite backed wallet.DB instance
Expand Down
8 changes: 7 additions & 1 deletion kvdb/postgres/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@ func getTestDsn(dbName string) string {

var testPostgres *embeddedpostgres.EmbeddedPostgres

// testMaxConnections is the total number of connections to the database server.
const testMaxConnections = 200

// testMaxConnectionPerDatabase is the number of connections per database the
// golang driver will open.
const testMaxConnectionPerDatabase = 10

// StartEmbeddedPostgres starts an embedded postgres instance. This only needs
// to be done once, because NewFixture will create random new databases on every
// call. It returns a stop closure that stops the database if called.
func StartEmbeddedPostgres() (func() error, error) {
sqlbase.Init(testMaxConnections)
sqlbase.Init(testMaxConnectionPerDatabase)

postgres := embeddedpostgres.NewDatabase(
embeddedpostgres.DefaultConfig().
Expand All @@ -43,6 +48,7 @@ func StartEmbeddedPostgres() (func() error, error) {
"max_connections": fmt.Sprintf(
"%d", testMaxConnections,
),
"max_pred_locks_per_transaction": "256",
},
),
)
Expand Down
2 changes: 1 addition & 1 deletion lncfg/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
SqliteBackend = "sqlite"
DefaultBatchCommitInterval = 500 * time.Millisecond

defaultPostgresMaxConnections = 50
defaultPostgresMaxConnections = 10
defaultSqliteMaxConnections = 2
Comment on lines +42 to 43

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To avoid duplication and ensure consistency, these default values should be sourced from the sqldb package where they are also defined.

Suggested change
defaultPostgresMaxConnections = 10
defaultSqliteMaxConnections = 2
defaultPostgresMaxConnections = sqldb.DefaultMaxConnsPostgres
defaultSqliteMaxConnections = sqldb.DefaultMaxConnsSqlite


defaultSqliteBusyTimeout = 5 * time.Second
Expand Down
2 changes: 1 addition & 1 deletion sample-lnd.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1618,7 +1618,7 @@
; recommended to set a limit that is below the server connection limit.
; Otherwise errors may occur in lnd under high-load conditions.
; Default:
; db.postgres.maxconnections=50
; db.postgres.maxconnections=10
; Example:
; db.postgres.maxconnections=

Expand Down
11 changes: 9 additions & 2 deletions sqldb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,19 @@ import (
)

const (
// defaultMaxConns is the number of permitted active and idle
// defaultMaxConnsPostgres is the number of permitted active and idle
// connections. We want to limit this so it isn't unlimited. We use the
// same value for the number of idle connections as, this can speed up
// queries given a new connection doesn't need to be established each
// time.
defaultMaxConns = 25
defaultMaxConnsPostgres = 10

// defaultMaxConnsSqlite is the number of permitted active and idle
// connections. We want to limit this to 2 because SQLite allows for
// concurrent reads however writers are limited to 1 so we select 2 here
// to allow for concurrent reads but keep the number of connections low
// to avoid write contention.
defaultMaxConnsSqlite = 2
Comment on lines +10 to +22

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To avoid duplicating these default values in the lncfg package, consider exporting these constants (e.g., DefaultMaxConnsPostgres, DefaultMaxConnsSqlite). The lncfg package can then reference them directly, ensuring consistency and making future changes easier. This change should be propagated to sqldb/postgres.go and sqldb/sqlite.go as well.

Suggested change
// defaultMaxConnsPostgres is the number of permitted active and idle
// connections. We want to limit this so it isn't unlimited. We use the
// same value for the number of idle connections as, this can speed up
// queries given a new connection doesn't need to be established each
// time.
defaultMaxConns = 25
defaultMaxConnsPostgres = 10
// defaultMaxConnsSqlite is the number of permitted active and idle
// connections. We want to limit this to 2 because SQLite allows for
// concurrent reads however writers are limited to 1 so we select 2 here
// to allow for concurrent reads but keep the number of connections low
// to avoid write contention.
defaultMaxConnsSqlite = 2
// DefaultMaxConnsPostgres is the number of permitted active and idle
// connections. We want to limit this so it isn't unlimited. We use the
// same value for the number of idle connections as, this can speed up
// queries given a new connection doesn't need to be established each
// time.
DefaultMaxConnsPostgres = 10
// DefaultMaxConnsSqlite is the number of permitted active and idle
// connections. We want to limit this to 2 because SQLite allows for
// concurrent reads however writers are limited to 1 so we select 2 here
// to allow for concurrent reads but keep the number of connections low
// to avoid write contention.
DefaultMaxConnsSqlite = 2


// connIdleLifetime is the amount of time a connection can be idle.
connIdleLifetime = 5 * time.Minute
Expand Down
60 changes: 58 additions & 2 deletions sqldb/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"database/sql"
"fmt"
"io/fs"
"net/url"
"path"
"strings"
"time"

"github.com/golang-migrate/migrate/v4/database"
pgx_migrate "github.com/golang-migrate/migrate/v4/database/pgx/v5"
_ "github.com/golang-migrate/migrate/v4/source/file" // Read migrations from files. // nolint:ll
_ "github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -122,7 +124,7 @@ func NewPostgresStore(cfg *PostgresConfig) (*PostgresStore, error) {
err)
}

maxConns := defaultMaxConns
maxConns := defaultMaxConnsPostgres

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To be consistent with exporting the default values from sqldb/config.go, this should use the exported constant.

Suggested change
maxConns := defaultMaxConnsPostgres
maxConns := DefaultMaxConnsPostgres

if cfg.MaxConnections > 0 {
maxConns = cfg.MaxConnections
}
Expand All @@ -148,6 +150,33 @@ func (s *PostgresStore) GetBaseDB() *BaseDB {
return s.BaseDB
}

// postgresMigrationExecutor implements MigrationExecutor using a single reusable
// migrate driver to avoid holding multiple database connections.
type postgresMigrationExecutor struct {
driver database.Driver
dbName string
fs fs.FS
}

// ExecuteMigrations runs migrations using the pre-created driver.
func (p *postgresMigrationExecutor) ExecuteMigrations(target MigrationTarget) error {
return applyMigrations(p.fs, p.driver, "sqlc/migrations", p.dbName, target)
}

// GetSchemaVersion returns the current schema version using the pre-created driver.
func (p *postgresMigrationExecutor) GetSchemaVersion() (int, bool, error) {
version, dirty, err := p.driver.Version()
if err != nil {
return 0, false, err
}
return version, dirty, nil
}

// SetSchemaVersion sets the schema version using the pre-created driver.
func (p *postgresMigrationExecutor) SetSchemaVersion(version int, dirty bool) error {
return p.driver.SetVersion(version, dirty)
}

// ApplyAllMigrations applies both the SQLC and custom in-code migrations to the
// Postgres database.
func (s *PostgresStore) ApplyAllMigrations(ctx context.Context,
Expand All @@ -158,7 +187,30 @@ func (s *PostgresStore) ApplyAllMigrations(ctx context.Context,
return nil
}

return ApplyMigrations(ctx, s.BaseDB, s, migrations)
// Create a single migrate driver that will be reused for all migration
// operations to avoid holding multiple database connections.
dbName, err := getDatabaseNameFromDSN(s.cfg.Dsn)
if err != nil {
return err
}

driver, err := pgx_migrate.WithInstance(s.DB, &pgx_migrate.Config{})
if err != nil {
return errPostgresMigration(err)
}

// Note: We don't call driver.Close() here because it closes the
// underlying *sql.DB. The single connection held by the driver will
// be cleaned up when the *sql.DB is closed.

// Create a migration executor that reuses this driver.
executor := &postgresMigrationExecutor{
driver: driver,
dbName: dbName,
fs: newReplacerFS(sqlSchemas, postgresSchemaReplacements),
}

return ApplyMigrations(ctx, s.BaseDB, executor, migrations)
}

func errPostgresMigration(err error) error {
Expand All @@ -178,6 +230,10 @@ func (s *PostgresStore) ExecuteMigrations(target MigrationTarget) error {
return errPostgresMigration(err)
}

// Note: We don't call driver.Close() here because it closes the
// underlying *sql.DB. The connection held by the driver will be
// cleaned up when the *sql.DB is closed.

// Populate the database with our set of schemas based on our embedded
// in-memory file system.
postgresFS := newReplacerFS(sqlSchemas, postgresSchemaReplacements)
Expand Down
1 change: 1 addition & 0 deletions sqldb/postgres_fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func NewTestPgFixture(t testing.TB, expiry time.Duration) *TestPgFixture {
"-c", "log_statement=all",
"-c", "log_destination=stderr",
"-c", "max_connections=5000",
"-c", "max_pred_locks_per_transaction=256",
},
}, func(config *docker.HostConfig) {
// Set AutoRemove to true so that stopped container goes away
Expand Down
62 changes: 59 additions & 3 deletions sqldb/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"context"
"database/sql"
"fmt"
"io/fs"
"net/url"
"path/filepath"
"testing"

"github.com/golang-migrate/migrate/v4/database"
sqlite_migrate "github.com/golang-migrate/migrate/v4/database/sqlite"
"github.com/lightningnetwork/lnd/sqldb/sqlc"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -130,8 +132,13 @@ func NewSqliteStore(cfg *SqliteConfig, dbPath string) (*SqliteStore, error) {
err)
}

db.SetMaxOpenConns(defaultMaxConns)
db.SetMaxIdleConns(defaultMaxConns)
maxConns := defaultMaxConnsSqlite

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To be consistent with exporting the default values from sqldb/config.go, this should use the exported constant.

Suggested change
maxConns := defaultMaxConnsSqlite
maxConns := DefaultMaxConnsSqlite

if cfg.MaxConnections > 0 {
maxConns = cfg.MaxConnections
}

db.SetMaxOpenConns(maxConns)
db.SetMaxIdleConns(maxConns)
db.SetConnMaxLifetime(connIdleLifetime)
queries := sqlc.New(db)

Expand All @@ -152,6 +159,32 @@ func (s *SqliteStore) GetBaseDB() *BaseDB {
return s.BaseDB
}

// sqliteMigrationExecutor implements MigrationExecutor using a single reusable
// migrate driver to avoid holding multiple database connections.
type sqliteMigrationExecutor struct {
driver database.Driver
fs fs.FS
}

// ExecuteMigrations runs migrations using the pre-created driver.
func (s *sqliteMigrationExecutor) ExecuteMigrations(target MigrationTarget) error {
return applyMigrations(s.fs, s.driver, "sqlc/migrations", "sqlite", target)
}

// GetSchemaVersion returns the current schema version using the pre-created driver.
func (s *sqliteMigrationExecutor) GetSchemaVersion() (int, bool, error) {
version, dirty, err := s.driver.Version()
if err != nil {
return 0, dirty, err
}
return version, dirty, nil
}

// SetSchemaVersion sets the schema version using the pre-created driver.
func (s *sqliteMigrationExecutor) SetSchemaVersion(version int, dirty bool) error {
return s.driver.SetVersion(version, dirty)
}

// ApplyAllMigrations applies both the SQLC and custom in-code migrations to the
// SQLite database.
func (s *SqliteStore) ApplyAllMigrations(ctx context.Context,
Expand All @@ -162,7 +195,26 @@ func (s *SqliteStore) ApplyAllMigrations(ctx context.Context,
return nil
}

return ApplyMigrations(ctx, s.BaseDB, s, migrations)
// Create a single migrate driver that will be reused for all migration
// operations to avoid holding multiple database connections.
driver, err := sqlite_migrate.WithInstance(
s.DB, &sqlite_migrate.Config{},
)
if err != nil {
return errSqliteMigration(err)
}

// Note: We don't call driver.Close() here because it closes the
// underlying *sql.DB. The single connection held by the driver will
// be cleaned up when the *sql.DB is closed.

// Create a migration executor that reuses this driver.
executor := &sqliteMigrationExecutor{
driver: driver,
fs: newReplacerFS(sqlSchemas, sqliteSchemaReplacements),
}

return ApplyMigrations(ctx, s.BaseDB, executor, migrations)
}

func errSqliteMigration(err error) error {
Expand All @@ -179,6 +231,10 @@ func (s *SqliteStore) ExecuteMigrations(target MigrationTarget) error {
return errSqliteMigration(err)
}

// Note: We intentionally don't call driver.Close() here because it would
// close the underlying *sql.DB that we passed to WithInstance. The
// driver will be cleaned up when the *sql.DB is closed.

// Populate the database with our set of schemas based on our embedded
// in-memory file system.
sqliteFS := newReplacerFS(sqlSchemas, sqliteSchemaReplacements)
Expand Down
Loading