Skip to content

Commit 222d342

Browse files
authored
Celery scheduler (ITISFoundation#2325)
1 parent ad00db4 commit 222d342

File tree

41 files changed

+1568
-285
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1568
-285
lines changed

ci/github/unit-testing/director_v2.bash

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@ install() {
1414
test() {
1515
pytest --numprocesses=auto --cov=simcore_service_director_v2 --durations=10 --cov-append \
1616
--color=yes --cov-report=term-missing --cov-report=xml --cov-config=.coveragerc \
17-
-v -m "not travis" services/director-v2/tests/unit
17+
-v -m "not travis" services/director-v2/tests/unit --ignore=services/director-v2/tests/unit/with_dbs;
18+
# these tests cannot be run in parallel
19+
pytest --cov=simcore_service_director_v2 --durations=10 --cov-append \
20+
--color=yes --cov-report=term-missing --cov-report=xml --cov-config=.coveragerc \
21+
-v -m "not travis" services/director-v2/tests/unit/with_dbs;
1822
}
1923

2024
# Check if the function exists (bash specific)
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
"""add comp runs table
2+
3+
Revision ID: da8abd0d8e42
4+
Revises: ec3ef74cddae
5+
Create Date: 2021-05-12 13:46:36.676255+00:00
6+
7+
"""
8+
import sqlalchemy as sa
9+
from alembic import op
10+
11+
# revision identifiers, used by Alembic.
12+
revision = "da8abd0d8e42"
13+
down_revision = "ec3ef74cddae"
14+
branch_labels = None
15+
depends_on = None
16+
17+
18+
def upgrade():
19+
# ### commands auto generated by Alembic - please adjust! ###
20+
op.create_table(
21+
"comp_runs",
22+
sa.Column(
23+
"run_id",
24+
sa.BigInteger(),
25+
autoincrement=True,
26+
nullable=False,
27+
primary_key=True,
28+
),
29+
sa.Column("project_uuid", sa.String(), nullable=False),
30+
sa.Column("user_id", sa.BigInteger(), nullable=False),
31+
sa.Column("iteration", sa.BigInteger(), autoincrement=False, nullable=False),
32+
sa.Column(
33+
"result",
34+
sa.dialects.postgresql.ENUM(
35+
"NOT_STARTED",
36+
"PUBLISHED",
37+
"PENDING",
38+
"RUNNING",
39+
"SUCCESS",
40+
"FAILED",
41+
"ABORTED",
42+
name="statetype",
43+
create_type=False, # necessary to avoid alembic statetype already exists error
44+
),
45+
server_default="NOT_STARTED",
46+
nullable=False,
47+
),
48+
sa.Column(
49+
"created", sa.DateTime(), server_default=sa.text("now()"), nullable=False
50+
),
51+
sa.Column(
52+
"modified", sa.DateTime(), server_default=sa.text("now()"), nullable=False
53+
),
54+
sa.Column("started", sa.DateTime(), nullable=True),
55+
sa.Column("ended", sa.DateTime(), nullable=True),
56+
sa.ForeignKeyConstraint(
57+
["project_uuid"],
58+
["projects.uuid"],
59+
name="fk_comp_runs_project_uuid_projects",
60+
onupdate="CASCADE",
61+
ondelete="CASCADE",
62+
),
63+
sa.ForeignKeyConstraint(
64+
["user_id"],
65+
["users.id"],
66+
name="fk_comp_runs_user_id_users",
67+
onupdate="CASCADE",
68+
ondelete="CASCADE",
69+
),
70+
sa.UniqueConstraint("project_uuid", "user_id", "iteration"),
71+
)
72+
# ### end Alembic commands ###
73+
74+
75+
def downgrade():
76+
# ### commands auto generated by Alembic - please adjust! ###
77+
op.drop_table("comp_runs")
78+
# ### end Alembic commands ###
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
""" Computational Runs Table
2+
3+
"""
4+
import sqlalchemy as sa
5+
from sqlalchemy.sql import func
6+
7+
from .base import metadata
8+
from .comp_pipeline import StateType
9+
10+
comp_runs = sa.Table(
11+
"comp_runs",
12+
metadata,
13+
# this task db id
14+
sa.Column(
15+
"run_id",
16+
sa.BigInteger,
17+
nullable=False,
18+
autoincrement=True,
19+
primary_key=True,
20+
doc="Primary key, identifies the run",
21+
),
22+
sa.Column(
23+
"project_uuid",
24+
sa.String,
25+
sa.ForeignKey(
26+
"projects.uuid",
27+
name="fk_comp_runs_project_uuid_projects",
28+
onupdate="CASCADE",
29+
ondelete="CASCADE",
30+
),
31+
nullable=False,
32+
doc="The project uuid with which the run entry is associated",
33+
),
34+
sa.Column(
35+
"user_id",
36+
sa.BigInteger(),
37+
sa.ForeignKey(
38+
"users.id",
39+
name="fk_comp_runs_user_id_users",
40+
onupdate="CASCADE",
41+
ondelete="CASCADE",
42+
),
43+
nullable=False,
44+
doc="The user id with which the run entry is associated",
45+
),
46+
sa.Column(
47+
"iteration",
48+
sa.BigInteger,
49+
nullable=False,
50+
autoincrement=False,
51+
doc="A computational run is always associated to a user, a project and a specific iteration",
52+
),
53+
sa.Column(
54+
"result",
55+
sa.Enum(StateType),
56+
nullable=False,
57+
server_default=StateType.NOT_STARTED.value,
58+
doc="The result of the run entry",
59+
),
60+
# dag node id and class
61+
sa.Column(
62+
"created",
63+
sa.DateTime(),
64+
nullable=False,
65+
server_default=func.now(),
66+
doc="When the run entry was created",
67+
),
68+
sa.Column(
69+
"modified",
70+
sa.DateTime(),
71+
nullable=False,
72+
server_default=func.now(),
73+
onupdate=func.now(), # this will auto-update on modification
74+
doc="When the run entry was last modified",
75+
),
76+
# utc timestamps for submission/start/end
77+
sa.Column(
78+
"started",
79+
sa.DateTime,
80+
nullable=True,
81+
doc="When the run was started",
82+
),
83+
sa.Column(
84+
"ended",
85+
sa.DateTime,
86+
nullable=True,
87+
doc="When the run was finished",
88+
),
89+
sa.UniqueConstraint("project_uuid", "user_id", "iteration"),
90+
)

services/api-server/src/simcore_service_api_server/api/dependencies/database.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ async def _get_repo(
2424
# the max amount of connections is reached
2525
# now the current solution is to acquire connection when needed.
2626

27-
if engine.freesize <= 1:
27+
available_engines = engine.maxsize - (engine.size - engine.freesize)
28+
if available_engines <= 1:
2829
logger.warning(
2930
"Low pg connections available in pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
3031
engine.size,

services/director-v2/requirements/_test.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111

1212
# testing
13+
asgi_lifespan
1314
pytest
1415
pytest-aiohttp # incompatible with pytest-asyncio. See https://github.com/pytest-dev/pytest-asyncio/issues/76
1516
pytest-cov

services/director-v2/requirements/_test.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,14 @@ amqp==5.0.2
2222
# kombu
2323
apipkg==1.5
2424
# via execnet
25+
asgi-lifespan==1.0.1
26+
# via -r requirements/_test.in
2527
astroid==2.5.6
2628
# via pylint
29+
async-exit-stack==1.0.1 ; python_version < "3.7"
30+
# via
31+
# -c requirements/_base.txt
32+
# asgi-lifespan
2733
async-generator==1.10 ; python_version < "3.7"
2834
# via
2935
# -c requirements/_base.txt
@@ -292,6 +298,7 @@ six==1.15.0
292298
sniffio==1.2.0
293299
# via
294300
# -c requirements/_base.txt
301+
# asgi-lifespan
295302
# httpcore
296303
# httpx
297304
sqlalchemy[postgresql_psycopg2binary]==1.3.20

services/director-v2/src/simcore_service_director_v2/api/dependencies/database.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ async def _get_repo(
2424
# the max amount of connections is reached
2525
# now the current solution is to acquire connection when needed.
2626

27-
if engine.freesize <= 1:
27+
available_engines = engine.maxsize - (engine.size - engine.freesize)
28+
if available_engines <= 1:
2829
logger.warning(
2930
"Low pg connections available in pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
3031
engine.size,
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from fastapi import Request
2+
3+
from ...modules.scheduler import CeleryScheduler
4+
5+
6+
def get_scheduler(request: Request) -> CeleryScheduler:
7+
return request.app.state.scheduler

0 commit comments

Comments
 (0)