Skip to content

Commit 2752fb6

Browse files
committed
Application db pool made dynamic and created separate connection and pool for mapper level upgrade command
1 parent 4cd7704 commit 2752fb6

File tree

4 files changed

+53
-11
lines changed

4 files changed

+53
-11
lines changed

backend/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ class Config:
7474
POSTGRES_ENDPOINT: str = os.getenv("POSTGRES_ENDPOINT", "localhost")
7575
POSTGRES_DB: str = os.getenv("POSTGRES_DB", "postgres")
7676
POSTGRES_PORT: str = os.getenv("POSTGRES_PORT", "5432")
77+
DB_MIN_CONNECTIONS: int = os.getenv("DB_MIN_CONNECTIONS", 4)
78+
DB_MAX_CONNECTIONS: int = os.getenv("DB_MAX_CONNECTIONS", 8)
7779

7880
SQLALCHEMY_DATABASE_URI: Optional[PostgresDsn] = None
7981

backend/db.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ class DatabaseConnection:
1111

1212
def __init__(self):
1313
self.database = Database(
14-
settings.SQLALCHEMY_DATABASE_URI.unicode_string(), min_size=4, max_size=8
14+
settings.SQLALCHEMY_DATABASE_URI.unicode_string(),
15+
min_size=settings.DB_MIN_CONNECTIONS,
16+
max_size=settings.DB_MAX_CONNECTIONS,
1517
)
1618

1719
async def connect(self):

example.env

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ POSTGRES_USER=${POSTGRES_USER:-tm}
138138
POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-tm}
139139
POSTGRES_ENDPOINT=${POSTGRES_ENDPOINT:-tm-db}
140140
POSTGRES_PORT=${POSTGRES_PORT:-5432}
141+
DB_MIN_CONNECTIONS=${DB_MIN_CONNECTIONS:-4}
142+
DB_MAX_CONNECTIONS=${DB_MAX_CONNECTIONS:-8}
141143

142144
# The postgres database name used for testing (required).
143145
# All other configurations except the database name are inherited from the main database defined above.

scripts/commands/refresh_mapper_level.py

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
from types import SimpleNamespace
77
from typing import List
88

9-
from backend.db import db_connection
9+
from databases import Database
1010
from backend.services.users.user_service import UserService
1111
from backend.models.postgis.user import User
12+
from backend.config import settings
1213

1314
import httpx
1415

@@ -28,20 +29,21 @@
2829

2930
async def process_user(
3031
user_record,
32+
script_db: Database,
3133
failed_users,
3234
failed_lock,
3335
users_updated_counter,
3436
counter_lock,
3537
semaphore,
3638
):
3739
"""
38-
Process a single user using its own DB connection from the pool.
40+
Process a single user using its own DB connection from the script-local pool.
3941
Calls UserService.check_and_update_mapper_level(user_id, conn).
4042
Retries on transient network/HTTP errors with simple fixed delay.
4143
"""
4244
await semaphore.acquire()
4345
try:
44-
async with db_connection.database.connection() as conn:
46+
async with script_db.connection() as conn:
4547
attempt = 0
4648
while True:
4749
attempt += 1
@@ -109,17 +111,31 @@ async def _fetch_users_only_missing(conn) -> List[SimpleNamespace]:
109111
return users
110112

111113

112-
async def main(only_missing: bool, workers: int, batch_size: int):
114+
async def main(
115+
only_missing: bool,
116+
workers: int,
117+
batch_size: int,
118+
script_db_min: int,
119+
script_db_max: int,
120+
):
113121
try:
114-
logger.info("Connecting to database...")
115-
await db_connection.connect()
122+
db_url = settings.SQLALCHEMY_DATABASE_URI.unicode_string()
123+
124+
script_db = Database(db_url, min_size=script_db_min, max_size=script_db_max)
125+
126+
logger.info(
127+
"Connecting to script-local DB pool (min=%d max=%d)...",
128+
script_db_min,
129+
script_db_max,
130+
)
131+
await script_db.connect()
116132

117133
logger.info("Started updating mapper levels...")
118134
logger.info(
119135
"Using %d concurrent workers, task batch size %d", workers, batch_size
120136
)
121137

122-
async with db_connection.database.connection() as conn:
138+
async with script_db.connection() as conn:
123139
if only_missing:
124140
users = await _fetch_users_only_missing(conn)
125141
else:
@@ -143,6 +159,7 @@ async def main(only_missing: bool, workers: int, batch_size: int):
143159
asyncio.create_task(
144160
process_user(
145161
user_record,
162+
script_db,
146163
failed_users,
147164
failed_lock,
148165
users_updated_counter,
@@ -163,11 +180,11 @@ async def main(only_missing: bool, workers: int, batch_size: int):
163180
logger.exception("Error while refreshing mapper levels")
164181
raise
165182
finally:
166-
logger.info("Disconnecting from database...")
183+
logger.info("Disconnecting from script-local DB...")
167184
try:
168-
await db_connection.disconnect()
185+
await script_db.disconnect()
169186
except Exception:
170-
logger.exception("Error while disconnecting from DB (ignored)")
187+
logger.exception("Error while disconnecting from script-local DB (ignored)")
171188

172189

173190
if __name__ == "__main__":
@@ -191,18 +208,37 @@ async def main(only_missing: bool, workers: int, batch_size: int):
191208
default=DEFAULT_TASK_BATCH_SIZE,
192209
help=f"Number of users scheduled per batch (default {DEFAULT_TASK_BATCH_SIZE})",
193210
)
211+
parser.add_argument(
212+
"--script-db-min",
213+
type=int,
214+
default=4,
215+
help="Script-local DB pool minimum size (default 4)",
216+
)
217+
parser.add_argument(
218+
"--script-db-max",
219+
type=int,
220+
default=8,
221+
help="Script-local DB pool maximum size (default 8)",
222+
)
223+
194224
args = parser.parse_args()
195225

196226
# Basic validation
197227
if args.workers <= 0:
198228
parser.error("--workers must be a positive integer")
199229
if args.batch_size <= 0:
200230
parser.error("--batch-size must be a positive integer")
231+
if args.script_db_min <= 0 or args.script_db_max <= 0:
232+
parser.error("--script-db-min and --script-db-max must be positive integers")
233+
if args.script_db_min > args.script_db_max:
234+
parser.error("--script-db-min cannot be greater than --script-db-max")
201235

202236
asyncio.run(
203237
main(
204238
only_missing=args.only_missing,
205239
workers=args.workers,
206240
batch_size=args.batch_size,
241+
script_db_min=args.script_db_min,
242+
script_db_max=args.script_db_max,
207243
)
208244
)

0 commit comments

Comments
 (0)