Skip to content

Commit 6cb731f

Browse files
committed
wip multiple workers, created_by, refactor values
Signed-off-by: Radek Ježek <[email protected]>
1 parent 03e48df commit 6cb731f

File tree

25 files changed

+138
-69
lines changed

25 files changed

+138
-69
lines changed

apps/beeai-cli/src/beeai_cli/commands/platform/base_driver.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ async def deploy(
110110
values = {
111111
**{svc: {"service": {"type": "LoadBalancer"}} for svc in ["collector", "docling", "ui", "phoenix"]},
112112
"hostNetwork": True,
113-
"localDockerRegistry": {"enabled": True},
114113
"externalRegistries": {"public_github": str(Configuration().agent_registry)},
115114
"encryptionKey": "Ovx8qImylfooq4-HNwOzKKDcXLZCB3c_m0JlB9eJBxc=",
116115
"features": {

apps/beeai-sdk/src/beeai_sdk/platform/provider_build.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class ProviderBuild(pydantic.BaseModel):
2727
status: BuildState
2828
source: ResolvedGithubUrl
2929
destination: str
30+
created_by: str
3031

3132
@staticmethod
3233
async def create(*, location: str, client: PlatformClient | None = None) -> ProviderBuild:

apps/beeai-server/src/beeai_server/api/routes/provider_builds.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@
2222

2323
@router.post("")
2424
async def create_provider_build(
25-
_: Annotated[AuthorizedUser, Depends(RequiresPermissions(provider_builds={"write"}))],
25+
user: Annotated[AuthorizedUser, Depends(RequiresPermissions(provider_builds={"write"}))],
2626
request: CreateProviderBuildRequest,
2727
provider_build_service: ProviderBuildServiceDependency,
2828
) -> ProviderBuild:
29-
return await provider_build_service.create_build(location=request.location)
29+
return await provider_build_service.create_build(location=request.location, user=user.user)
3030

3131
@router.get("/{id}")
3232
async def get_provider_build(

apps/beeai-server/src/beeai_server/configuration.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ def validate_level(cls, v: str | int | None):
4646
class OCIRegistryConfiguration(BaseModel, extra="allow"):
4747
username: str | None = None
4848
password: Secret[str] | None = None
49-
insecure: bool = False
5049
auth_header: Secret[str] | None = None
50+
insecure: bool = False
5151

5252
@property
5353
def protocol(self):
@@ -168,6 +168,7 @@ class DockerConfigJsonAuth(BaseModel, extra="allow"):
168168
auth: Secret[str] | None = None
169169
username: str | None = None
170170
password: Secret[str] | None = None
171+
insecure: bool = False
171172

172173

173174
class DockerConfigJson(BaseModel):
@@ -241,7 +242,11 @@ def _oci_registry_defaultdict(self):
241242
for docker_config_json in self.oci_registry_docker_config_json.values():
242243
try:
243244
for registry, conf in docker_config_json.auths.items():
244-
registry_short = AnyUrl(registry).host if "://" in registry else registry.strip("/")
245+
if "://" in registry:
246+
url = AnyUrl(registry)
247+
registry_short = f"{url.host}:{url.port}" if url.port else url.host
248+
else:
249+
registry_short = registry.strip("/")
245250
# For some reason dockerhub registry has a weird url
246251
assert registry_short
247252
aliases = [registry_short]
@@ -251,6 +256,7 @@ def _oci_registry_defaultdict(self):
251256
self.oci_registry[alias].username = conf.username
252257
self.oci_registry[alias].password = conf.password
253258
self.oci_registry[alias].auth_header = conf.auth
259+
self.oci_registry[alias].insecure = conf.insecure
254260
except ValueError as e:
255261
logger.error(f"Failed to parse .dockerconfigjson: {e}. Some agent images might not work correctly.")
256262
if not self.provider_build.oci_build_registry_prefix and len(self.oci_registry):

apps/beeai-server/src/beeai_server/domain/models/provider_build.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,4 @@ class ProviderBuild(BaseModel):
3131
status: BuildState
3232
source: ResolvedGithubUrl
3333
destination: DockerImageID
34+
created_by: UUID

apps/beeai-server/src/beeai_server/infrastructure/persistence/migrations/alembic/versions/b0388daeb831_.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,35 +5,40 @@
55
Create Date: 2025-09-22 12:42:16.598519
66
77
"""
8-
from typing import Sequence, Union
98

10-
from alembic import op
11-
import sqlalchemy as sa
9+
from collections.abc import Sequence
1210

11+
import sqlalchemy as sa
12+
from alembic import op
1313

1414
# revision identifiers, used by Alembic.
15-
revision: str = 'b0388daeb831'
16-
down_revision: Union[str, None] = '7b933a4a8cfc'
17-
branch_labels: Union[str, Sequence[str], None] = None
18-
depends_on: Union[str, Sequence[str], None] = None
15+
revision: str = "b0388daeb831"
16+
down_revision: str | None = "7b933a4a8cfc"
17+
branch_labels: str | Sequence[str] | None = None
18+
depends_on: str | Sequence[str] | None = None
1919

2020

2121
def upgrade() -> None:
2222
"""Upgrade schema."""
2323
# ### commands auto generated by Alembic - please adjust! ###
24-
op.create_table('provider_builds',
25-
sa.Column('id', sa.UUID(), nullable=False),
26-
sa.Column('source', sa.JSON(), nullable=False),
27-
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
28-
sa.Column('status', sa.Enum('missing', 'in_progress', 'completed', 'failed', name='buildstate'), nullable=False),
29-
sa.Column('destination', sa.String(length=512), nullable=False),
30-
sa.PrimaryKeyConstraint('id')
24+
op.create_table(
25+
"provider_builds",
26+
sa.Column("id", sa.UUID(), nullable=False),
27+
sa.Column("source", sa.JSON(), nullable=False),
28+
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
29+
sa.Column("created_by", sa.UUID(), nullable=False),
30+
sa.Column(
31+
"status", sa.Enum("missing", "in_progress", "completed", "failed", name="buildstate"), nullable=False
32+
),
33+
sa.Column("destination", sa.String(length=512), nullable=False),
34+
sa.ForeignKeyConstraint(["created_by"], ["users.id"], ondelete="CASCADE"),
35+
sa.PrimaryKeyConstraint("id"),
3136
)
3237
# ### end Alembic commands ###
3338

3439

3540
def downgrade() -> None:
3641
"""Downgrade schema."""
3742
# ### commands auto generated by Alembic - please adjust! ###
38-
op.drop_table('provider_builds')
43+
op.drop_table("provider_builds")
3944
# ### end Alembic commands ###

apps/beeai-server/src/beeai_server/infrastructure/persistence/repositories/provider_build.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from typing import Any
66
from uuid import UUID
77

8-
from sqlalchemy import JSON, Column, DateTime, Row, String, Table
8+
from sqlalchemy import JSON, Column, DateTime, ForeignKey, Row, String, Table
99
from sqlalchemy import UUID as SQL_UUID
1010
from sqlalchemy.ext.asyncio import AsyncConnection
1111
from sqlalchemy.sql import select
@@ -23,6 +23,8 @@
2323
Column("id", SQL_UUID, primary_key=True),
2424
Column("source", JSON, nullable=False),
2525
Column("created_at", DateTime(timezone=True), nullable=False),
26+
# The CASCADE might leave some k8s jobs orphaned without cancellation, but jobs have timeout and self-deletion
27+
Column("created_by", ForeignKey("users.id", ondelete="CASCADE"), nullable=True),
2628
Column("status", sql_enum(BuildState), nullable=False),
2729
Column("destination", String(512), nullable=False),
2830
)
@@ -50,6 +52,7 @@ def _to_row(self, provider_build: ProviderBuild) -> dict[str, Any]:
5052
"source": provider_build.source.model_dump(mode="json"),
5153
"created_at": provider_build.created_at,
5254
"status": provider_build.status,
55+
"created_by": provider_build.created_by,
5356
"destination": str(provider_build.destination),
5457
}
5558

@@ -60,6 +63,7 @@ def _to_provider_build(self, row: Row) -> ProviderBuild:
6063
"source": row.source,
6164
"destination": row.destination,
6265
"created_at": row.created_at,
66+
"created_by": row.created_by,
6367
"status": row.status,
6468
}
6569
)

apps/beeai-server/src/beeai_server/jobs/crons/cleanup.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from kink import inject
66
from procrastinate import Blueprint, JobContext, builtin_tasks
77

8+
from beeai_server.jobs.queues import Queues
89
from beeai_server.service_layer.services.contexts import ContextService
910

1011
blueprint = Blueprint()
@@ -13,7 +14,7 @@
1314

1415

1516
@blueprint.periodic(cron="5 * * * *")
16-
@blueprint.task(queueing_lock="cleanup_expired_vector_stores", queue="cron:cleanup")
17+
@blueprint.task(queueing_lock="cleanup_expired_vector_stores", queue=str(Queues.CRON_CLEANUP))
1718
@inject
1819
async def cleanup_expired_context_resources(timestamp: int, context: ContextService) -> None:
1920
"""Delete resources of contexts that haven't been used for several days."""
@@ -22,7 +23,7 @@ async def cleanup_expired_context_resources(timestamp: int, context: ContextServ
2223

2324

2425
@blueprint.periodic(cron="*/10 * * * *")
25-
@blueprint.task(queueing_lock="remove_old_jobs", queue="cron:cleanup", pass_context=True)
26+
@blueprint.task(queueing_lock="remove_old_jobs", queue=str(Queues.CRON_CLEANUP), pass_context=True)
2627
async def remove_old_jobs(context: JobContext, timestamp: int):
2728
return await builtin_tasks.remove_old_jobs(
2829
context,

apps/beeai-server/src/beeai_server/jobs/crons/mcp.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from beeai_server.configuration import get_configuration
1414
from beeai_server.domain.models.mcp_provider import McpProviderDeploymentState
15+
from beeai_server.jobs.queues import Queues
1516
from beeai_server.service_layer.services.mcp import McpService
1617
from beeai_server.utils.utils import extract_messages
1718

@@ -22,7 +23,7 @@
2223
if get_configuration().mcp.auto_remove_enabled:
2324

2425
@blueprint.periodic(cron="* * * * * */5")
25-
@blueprint.task(queueing_lock="auto_remove_mcp_providers", queue="cron:mcp_provider")
26+
@blueprint.task(queueing_lock="auto_remove_mcp_providers", queue=str(Queues.CRON_MCP_PROVIDER))
2627
@inject
2728
async def auto_remove_providers(timestamp: int, mcp_service: McpService):
2829
providers = await mcp_service.list_providers()

apps/beeai-server/src/beeai_server/jobs/crons/provider.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from beeai_server.configuration import Configuration
1717
from beeai_server.domain.models.provider import ProviderLocation
1818
from beeai_server.exceptions import EntityNotFoundError
19+
from beeai_server.jobs.queues import Queues
1920
from beeai_server.service_layer.services.provider import ProviderService
2021
from beeai_server.service_layer.unit_of_work import IUnitOfWorkFactory
2122
from beeai_server.utils.utils import extract_messages
@@ -26,15 +27,15 @@
2627

2728

2829
@blueprint.periodic(cron="*/1 * * * *")
29-
@blueprint.task(queueing_lock="scale_down_providers", queue="cron:provider")
30+
@blueprint.task(queueing_lock="scale_down_providers", queue=str(Queues.CRON_PROVIDER))
3031
@inject
3132
async def scale_down_providers(timestamp: int, service: ProviderService):
3233
await service.scale_down_providers()
3334

3435

3536
# TODO: Can't use DI here because it's not initialized yet
3637
@blueprint.periodic(cron=get_configuration().agent_registry.sync_period_cron)
37-
@blueprint.task(queueing_lock="check_registry", queue="cron:provider")
38+
@blueprint.task(queueing_lock="check_registry", queue=str(Queues.CRON_PROVIDER))
3839
@inject
3940
async def check_registry(timestamp: int, configuration: Configuration, provider_service: ProviderService):
4041
if not configuration.agent_registry.locations:
@@ -102,7 +103,7 @@ async def check_registry(timestamp: int, configuration: Configuration, provider_
102103
if get_configuration().provider.auto_remove_enabled:
103104

104105
@blueprint.periodic(cron="* * * * * */5")
105-
@blueprint.task(queueing_lock="auto_remove_providers", queue="cron:provider")
106+
@blueprint.task(queueing_lock="auto_remove_providers", queue=str(Queues.CRON_PROVIDER))
106107
@inject
107108
async def auto_remove_providers(timestamp: int, uow_f: IUnitOfWorkFactory, provider_service: ProviderService):
108109
async with uow_f() as uow:

0 commit comments

Comments
 (0)