Skip to content

Commit a08ada5

Browse files
committed
ongoing
1 parent cebad23 commit a08ada5

File tree

13 files changed

+79
-56
lines changed

13 files changed

+79
-56
lines changed

packages/service-library/src/servicelib/aiohttp/db_asyncpg_engine.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,23 +38,22 @@ def get_async_engine(app: web.Application) -> AsyncEngine:
3838
return engine
3939

4040

41-
async def connect_to_db(app: web.Application, settings: PostgresSettings) -> None:
41+
async def connect_to_db(
42+
app: web.Application, settings: PostgresSettings, application_name: str
43+
) -> None:
4244
"""
4345
- db services up, data migrated and ready to use
4446
- sets an engine in app state (use `get_async_engine(app)` to retrieve)
4547
"""
46-
if settings.POSTGRES_CLIENT_NAME:
47-
settings = settings.model_copy(
48-
update={"POSTGRES_CLIENT_NAME": settings.POSTGRES_CLIENT_NAME + "-asyncpg"}
49-
)
50-
5148
with log_context(
5249
_logger,
5350
logging.INFO,
5451
"Connecting app[APP_DB_ASYNC_ENGINE_KEY] to postgres with %s",
5552
f"{settings=}",
5653
):
57-
engine = await create_async_engine_and_database_ready(settings)
54+
engine = await create_async_engine_and_database_ready(
55+
settings, application_name
56+
)
5857
_set_async_engine_to_app_state(app, engine)
5958

6059
_logger.info(

packages/service-library/src/servicelib/db_asyncpg_utils.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
@retry(**PostgresRetryPolicyUponInitialization(_logger).kwargs)
2020
async def create_async_engine_and_database_ready(
21-
settings: PostgresSettings,
21+
settings: PostgresSettings, application_name: str
2222
) -> AsyncEngine:
2323
"""
2424
- creates asyncio engine
@@ -30,12 +30,9 @@ async def create_async_engine_and_database_ready(
3030
raise_if_migration_not_ready,
3131
)
3232

33-
server_settings = None
34-
if settings.POSTGRES_CLIENT_NAME:
35-
assert isinstance(settings.POSTGRES_CLIENT_NAME, str) # nosec
36-
server_settings = {
37-
"application_name": settings.POSTGRES_CLIENT_NAME,
38-
}
33+
server_settings = {
34+
"application_name": settings.client_name(f"{application_name}-asyncpg"),
35+
}
3936

4037
engine = create_async_engine(
4138
settings.dsn_with_async_sqlalchemy,
@@ -82,9 +79,9 @@ async def with_async_pg_engine(
8279
logging.DEBUG,
8380
f"connection to db {settings.dsn_with_async_sqlalchemy}",
8481
):
85-
server_settings = None
86-
if settings.POSTGRES_CLIENT_NAME:
87-
assert isinstance(settings.POSTGRES_CLIENT_NAME, str)
82+
server_settings = {
83+
"application_name": settings.client_name("-asyncpg"),
84+
}
8885

8986
engine = create_async_engine(
9087
settings.dsn_with_async_sqlalchemy,

packages/service-library/src/servicelib/fastapi/db_asyncpg_engine.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
_logger = logging.getLogger(__name__)
1515

1616

17-
async def connect_to_db(app: FastAPI, settings: PostgresSettings) -> None:
17+
async def connect_to_db(
18+
app: FastAPI, settings: PostgresSettings, application_name: str
19+
) -> None:
1820
warnings.warn(
1921
"The 'connect_to_db' function is deprecated and will be removed in a future release. "
2022
"Please use 'postgres_lifespan' instead for managing the database connection lifecycle.",
@@ -27,7 +29,9 @@ async def connect_to_db(app: FastAPI, settings: PostgresSettings) -> None:
2729
logging.DEBUG,
2830
f"Connecting and migraging {settings.dsn_with_async_sqlalchemy}",
2931
):
30-
engine = await create_async_engine_and_database_ready(settings)
32+
engine = await create_async_engine_and_database_ready(
33+
settings, application_name
34+
)
3135

3236
app.state.engine = engine
3337
_logger.debug(

packages/service-library/src/servicelib/fastapi/postgres_lifespan.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ def create_postgres_database_input_state(settings: PostgresSettings) -> State:
2828
return {PostgresLifespanState.POSTGRES_SETTINGS: settings}
2929

3030

31-
async def postgres_database_lifespan(_: FastAPI, state: State) -> AsyncIterator[State]:
31+
async def postgres_database_lifespan(
32+
app: FastAPI, state: State
33+
) -> AsyncIterator[State]:
3234

3335
_lifespan_name = f"{__name__}.{postgres_database_lifespan.__name__}"
3436

@@ -43,7 +45,7 @@ async def postgres_database_lifespan(_: FastAPI, state: State) -> AsyncIterator[
4345

4446
# connect to database
4547
async_engine: AsyncEngine = await create_async_engine_and_database_ready(
46-
settings
48+
settings, app.title
4749
)
4850

4951
try:

packages/settings-library/src/settings_library/postgres.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,19 +82,19 @@ def dsn_with_async_sqlalchemy(self) -> str:
8282
)
8383
return f"{url}"
8484

85-
@cached_property
86-
def dsn_with_query(self) -> str:
85+
def dsn_with_query(self, application_name: str) -> str:
8786
"""Some clients do not support queries in the dsn"""
8887
dsn = self.dsn
89-
return self._update_query(dsn)
88+
return self._update_query(dsn, application_name)
89+
90+
def client_name(self, application_name: str) -> str:
91+
return f"{application_name}{'-' if self.POSTGRES_CLIENT_NAME else ''}{self.POSTGRES_CLIENT_NAME or ''}"
9092

91-
def _update_query(self, uri: str) -> str:
93+
def _update_query(self, uri: str, application_name: str) -> str:
9294
# SEE https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS
93-
new_params: dict[str, str] = {}
94-
if self.POSTGRES_CLIENT_NAME:
95-
new_params = {
96-
"application_name": self.POSTGRES_CLIENT_NAME,
97-
}
95+
new_params: dict[str, str] = {
96+
"application_name": self.client_name(application_name),
97+
}
9898

9999
if new_params:
100100
parsed_uri = urlparse(uri)

packages/settings-library/tests/test_postgres.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from urllib.parse import urlparse
77

88
import pytest
9+
from faker import Faker
910
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
1011
from pytest_simcore.helpers.typing_env import EnvVarsDict
1112
from settings_library.postgres import PostgresSettings
@@ -36,22 +37,27 @@ def test_cached_property_dsn(mock_environment: EnvVarsDict):
3637
assert "dsn" not in settings.model_dump()
3738

3839

39-
def test_dsn_with_query(mock_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch):
40+
def test_dsn_with_query(
41+
mock_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch, faker: Faker
42+
):
4043
settings = PostgresSettings()
4144

4245
assert settings.POSTGRES_CLIENT_NAME
4346
assert settings.dsn == "postgresql://foo:secret@localhost:5432/foodb"
47+
app_name = faker.pystr()
4448
assert (
45-
settings.dsn_with_query
46-
== "postgresql://foo:secret@localhost:5432/foodb?application_name=Some+%2643+funky+name"
49+
settings.dsn_with_query(app_name)
50+
== f"postgresql://foo:secret@localhost:5432/foodb?application_name={app_name}-Some+%2643+funky+name"
4751
)
4852

4953
with monkeypatch.context() as patch:
5054
patch.delenv("POSTGRES_CLIENT_NAME")
5155
settings = PostgresSettings()
5256

5357
assert not settings.POSTGRES_CLIENT_NAME
54-
assert settings.dsn == settings.dsn_with_query
58+
assert f"{settings.dsn}?application_name=blah" == settings.dsn_with_query(
59+
"blah"
60+
)
5561

5662

5763
def test_dsn_with_async_sqlalchemy_has_query(

packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ async def _state_metadata_entry_exists(
169169

170170

171171
async def _delete_legacy_archive(
172-
project_id: ProjectID, node_uuid: NodeID, path: Path
172+
project_id: ProjectID, node_uuid: NodeID, path: Path, *, application_name: str
173173
) -> None:
174174
"""removes the .zip state archive from storage"""
175175
s3_object = __create_s3_object_key(
@@ -180,7 +180,9 @@ async def _delete_legacy_archive(
180180
# NOTE: if service is opened by a person which the users shared it with,
181181
# they will not have the permission to delete the node
182182
# Removing it via it's owner allows to always have access to the delete operation.
183-
owner_id = await DBManager().get_project_owner_user_id(project_id)
183+
owner_id = await DBManager(
184+
application_name=application_name
185+
).get_project_owner_user_id(project_id)
184186
await filemanager.delete_file(
185187
user_id=owner_id, store_id=SIMCORE_LOCATION, s3_object=s3_object
186188
)
@@ -198,6 +200,7 @@ async def push(
198200
progress_bar: ProgressBarData,
199201
aws_s3_cli_settings: AwsS3CliSettings | None,
200202
legacy_state: LegacyState | None,
203+
application_name: str,
201204
) -> None:
202205
"""pushes and removes the legacy archive if present"""
203206

@@ -226,6 +229,7 @@ async def push(
226229
project_id=project_id,
227230
node_uuid=node_uuid,
228231
path=source_path,
232+
application_name=application_name,
229233
)
230234

231235
if legacy_state:
@@ -244,6 +248,7 @@ async def push(
244248
project_id=project_id,
245249
node_uuid=node_uuid,
246250
path=legacy_state.old_state_path,
251+
application_name=application_name,
247252
)
248253

249254

packages/simcore-sdk/src/simcore_sdk/node_ports_common/dbmanager.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,22 +82,25 @@ async def _update_comp_run_snapshot_tasks_if_computational(
8282

8383

8484
class DBContextManager:
85-
def __init__(self, db_engine: AsyncEngine | None = None) -> None:
85+
def __init__(
86+
self, db_engine: AsyncEngine | None = None, *, application_name: str
87+
) -> None:
8688
self._db_engine: AsyncEngine | None = db_engine
8789
self._db_engine_created: bool = False
90+
self._application_name: str = application_name
8891

8992
@staticmethod
90-
async def _create_db_engine() -> AsyncEngine:
93+
async def _create_db_engine(application_name: str) -> AsyncEngine:
9194
settings = NodePortsSettings.create_from_envs()
9295
engine = await create_async_engine_and_database_ready(
93-
settings.POSTGRES_SETTINGS
96+
settings.POSTGRES_SETTINGS, f"{application_name}-simcore-sdk"
9497
)
9598
assert isinstance(engine, AsyncEngine) # nosec
9699
return engine
97100

98101
async def __aenter__(self) -> AsyncEngine:
99102
if not self._db_engine:
100-
self._db_engine = await self._create_db_engine()
103+
self._db_engine = await self._create_db_engine(self._application_name)
101104
self._db_engine_created = True
102105
return self._db_engine
103106

@@ -107,8 +110,9 @@ async def __aexit__(self, exc_type, exc, tb) -> None:
107110

108111

109112
class DBManager:
110-
def __init__(self, db_engine: AsyncEngine | None = None):
113+
def __init__(self, db_engine: AsyncEngine | None = None, *, application_name: str):
111114
self._db_engine = db_engine
115+
self._application_name = application_name
112116

113117
async def write_ports_configuration(
114118
self,
@@ -124,7 +128,9 @@ async def write_ports_configuration(
124128

125129
node_configuration = json_loads(json_configuration)
126130
async with (
127-
DBContextManager(self._db_engine) as engine,
131+
DBContextManager(
132+
self._db_engine, application_name=self._application_name
133+
) as engine,
128134
engine.begin() as connection,
129135
):
130136
# 1. Update comp_tasks table
@@ -154,7 +160,9 @@ async def get_ports_configuration_from_node_uuid(
154160
"Getting ports configuration of node %s from comp_tasks table", node_uuid
155161
)
156162
async with (
157-
DBContextManager(self._db_engine) as engine,
163+
DBContextManager(
164+
self._db_engine, application_name=self._application_name
165+
) as engine,
158166
engine.connect() as connection,
159167
):
160168
node = await _get_node_from_db(project_id, node_uuid, connection)
@@ -171,7 +179,9 @@ async def get_ports_configuration_from_node_uuid(
171179

172180
async def get_project_owner_user_id(self, project_id: ProjectID) -> UserID:
173181
async with (
174-
DBContextManager(self._db_engine) as engine,
182+
DBContextManager(
183+
self._db_engine, application_name=self._application_name
184+
) as engine,
175185
engine.connect() as connection,
176186
):
177187
prj_owner = await connection.scalar(

packages/simcore-sdk/src/simcore_sdk/node_ports_v2/__init__.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,11 @@ async def ports(
2222
project_id: ProjectIDStr,
2323
node_uuid: NodeIDStr,
2424
*,
25-
db_manager: DBManager | None = None,
25+
db_manager: DBManager,
2626
r_clone_settings: RCloneSettings | None = None,
2727
io_log_redirect_cb: LogRedirectCB | None = None,
2828
aws_s3_cli_settings: AwsS3CliSettings | None = None
2929
) -> Nodeports:
30-
log.debug("creating node_ports_v2 object using provided dbmanager: %s", db_manager)
31-
# NOTE: warning every dbmanager create a new db engine!
32-
if db_manager is None: # NOTE: keeps backwards compatibility
33-
log.debug("no db manager provided, creating one...")
34-
db_manager = DBManager()
35-
3630
return await load(
3731
db_manager=db_manager,
3832
user_id=user_id,

services/director-v2/src/simcore_service_director_v2/utils/dask.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from simcore_sdk.node_ports_v2.links import ItemValue as _NPItemValue
4141
from sqlalchemy.ext.asyncio import AsyncEngine
4242

43+
from .._meta import APP_NAME
4344
from ..constants import LOGS_FILE_NAME, UNDEFINED_API_BASE_URL, UNDEFINED_DOCKER_LABEL
4445
from ..core.errors import (
4546
ComputationalBackendNotConnectedError,
@@ -88,7 +89,7 @@ async def create_node_ports(
8889
:raises PortsValidationError: if any of the ports assigned values are invalid
8990
"""
9091
try:
91-
db_manager = node_ports_v2.DBManager(db_engine)
92+
db_manager = node_ports_v2.DBManager(db_engine, application_name=APP_NAME)
9293
return await node_ports_v2.ports(
9394
user_id=user_id,
9495
project_id=ProjectIDStr(f"{project_id}"),

0 commit comments

Comments
 (0)