Skip to content

Commit d30cca7

Browse files
dittopsclaude
andcommitted
feat(budpipeline): add deployment actions with internal auth support
## Changes ### BudPipeline Actions - Add Scale Deployment action to set fixed replica count - Refactor Deploy Model action with simulation support and smart mode - Refactor Delete Deployment action with proper cleanup - Refactor Rate Limit action with multiple algorithm support - Remove deprecated autoscale action (replaced by scale) - Remove cluster create/delete actions (not yet implemented) - Add CREATING_ACTIONS.md documentation guide ### BudApp Internal Authentication - Add internal auth support to autoscale endpoints for pipeline calls - Use require_permissions_or_internal decorator pattern - Add get_current_active_user_or_internal dependency ### BudAdmin UI - Fix empty endpoint dropdown in pipeline editor - Add endpoints to dataSources for ActionConfigPanel - Fetch endpoints when entering edit mode ### ActionContext Enhancement - Add invoke_service method for Dapr service invocation - Support dapr-api-token header for internal auth ### Helm Chart - Add alembic migration to budpipeline startup ### Fixes from Review - Fix source_topics type consistency (always use list) - Add force param to delete action - Update cluster tests for removed actions Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent ab359d2 commit d30cca7

File tree

29 files changed

+2544
-693
lines changed

29 files changed

+2544
-693
lines changed

infra/helm/bud/templates/microservices/budpipeline.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ spec:
4040
- name: budpipeline
4141
image: {{ .Values.microservices.budpipeline.image }}
4242
imagePullPolicy: {{ .Values.imagePullPolicy }}
43-
command: ["sh", "-c", "uvicorn budpipeline.main:app --host 0.0.0.0 --port 8010"]
43+
command: ["sh", "-c", "alembic upgrade head && uvicorn budpipeline.main:app --host 0.0.0.0 --port 8010"]
4444
ports:
4545
- containerPort: 8010
4646
envFrom:

services/budadmin/src/components/pipelineEditor/components/ActionConfigPanel.tsx

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ export interface ActionConfigPanelProps {
243243
projects?: SelectOption[];
244244
endpoints?: SelectOption[];
245245
providers?: SelectOption[];
246+
credentials?: SelectOption[];
246247
};
247248
/** Whether data sources are loading */
248249
loadingDataSources?: Set<string>;
@@ -547,6 +548,8 @@ export function ActionConfigPanel({
547548
return dataSources.endpoints || [];
548549
case 'provider_ref':
549550
return dataSources.providers || [];
551+
case 'credential_ref':
552+
return dataSources.credentials || [];
550553
default:
551554
return [];
552555
}
@@ -664,8 +667,8 @@ export function ActionConfigPanel({
664667
</select>
665668
)}
666669

667-
{/* Ref types (model, cluster, provider, etc.) - with toggle for template mode */}
668-
{['model_ref', 'cluster_ref', 'project_ref', 'endpoint_ref', 'provider_ref'].includes(param.type) && (
670+
{/* Ref types (model, cluster, provider, credential, etc.) - with toggle for template mode */}
671+
{['model_ref', 'cluster_ref', 'project_ref', 'endpoint_ref', 'provider_ref', 'credential_ref'].includes(param.type) && (
669672
<div>
670673
<div style={{ display: 'flex', gap: '8px', marginBottom: '6px' }}>
671674
<button

services/budadmin/src/flows/components/CommonStatus.tsx

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,7 @@ export default function CommonStatus({
156156
}, [steps, workflowId]);
157157

158158
useEffect(() => {
159-
console.log(`socket`, socket)
160159
if (socket) {
161-
console.log(handleNotification)
162160
socket.on("notification_received", handleNotification);
163161
}
164162

services/budadmin/src/pages/home/budpipelines/[id]/index.tsx

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import { useCluster } from "src/hooks/useCluster";
2626
import { useModels } from "src/hooks/useModels";
2727
import { useCloudProviders } from "src/hooks/useCloudProviders";
2828
import { useProjects } from "src/hooks/useProjects";
29+
import { useProprietaryCredentials } from "src/stores/useProprietaryCredentials";
30+
import { useEndPoints } from "src/hooks/useEndPoint";
2931
import { PrimaryButton } from "@/components/ui/bud/form/Buttons";
3032
import { useDrawer } from "src/hooks/useDrawer";
3133
import StepDetailDrawer from "@/components/pipelineEditor/components/StepDetailDrawer";
@@ -121,6 +123,8 @@ const WorkflowDetail = () => {
121123
const { models, getGlobalModels } = useModels();
122124
const { providers, getProviders } = useCloudProviders();
123125
const { projects, getProjects } = useProjects();
126+
const { credentials: proprietaryCredentials, getCredentials: getProprietaryCredentials } = useProprietaryCredentials();
127+
const { endPoints, getEndPoints } = useEndPoints();
124128

125129
const [activeTab, setActiveTab] = useState("dag");
126130
const [selectedStep, setSelectedStep] = useState<PipelineStep | null>(null);
@@ -144,11 +148,13 @@ const WorkflowDetail = () => {
144148
[executions]
145149
);
146150

147-
// Transform clusters, models, projects, and providers to SelectOption format for WorkflowEditor
151+
// Transform clusters, models, projects, providers, credentials, and endpoints to SelectOption format for WorkflowEditor
152+
// Note: clusters use cluster_id (budcluster UUID) as value, not id (budapp UUID),
153+
// to match what deploy-workflow endpoint expects
148154
const dataSources = useMemo(() => ({
149155
clusters: clusters.map((c) => ({
150156
label: c.name,
151-
value: c.id,
157+
value: c.cluster_id,
152158
})),
153159
models: models.map((m) => ({
154160
label: m.name,
@@ -162,9 +168,17 @@ const WorkflowDetail = () => {
162168
label: p.name,
163169
value: p.id,
164170
})),
165-
}), [clusters, models, projects, providers]);
171+
credentials: proprietaryCredentials.map((c) => ({
172+
label: `${c.name} (${c.type})`,
173+
value: c.id,
174+
})),
175+
endpoints: endPoints.map((e) => ({
176+
label: e.name || e.id || "",
177+
value: e.id || "",
178+
})),
179+
}), [clusters, models, projects, providers, proprietaryCredentials, endPoints]);
166180

167-
// Fetch clusters, models, projects, and providers when entering edit mode
181+
// Fetch clusters, models, projects, providers, credentials, and endpoints when entering edit mode
168182
useEffect(() => {
169183
if (isEditing) {
170184
const loadingSet = new Set<string>();
@@ -181,6 +195,12 @@ const WorkflowDetail = () => {
181195
if (providers.length === 0) {
182196
loadingSet.add("providers");
183197
}
198+
if (proprietaryCredentials.length === 0) {
199+
loadingSet.add("credentials");
200+
}
201+
if (endPoints.length === 0) {
202+
loadingSet.add("endpoints");
203+
}
184204

185205
if (loadingSet.size > 0) {
186206
setLoadingDataSources(loadingSet);
@@ -242,6 +262,33 @@ const WorkflowDetail = () => {
242262
);
243263
}
244264

265+
if (proprietaryCredentials.length === 0) {
266+
fetchPromises.push(
267+
(async () => {
268+
await getProprietaryCredentials({ page: 1, limit: 100 });
269+
setLoadingDataSources(prev => {
270+
const next = new Set(prev);
271+
next.delete("credentials");
272+
return next;
273+
});
274+
})()
275+
);
276+
}
277+
278+
if (endPoints.length === 0) {
279+
fetchPromises.push(
280+
(async () => {
281+
// Fetch all endpoints (no project filter needed for pipeline actions)
282+
await getEndPoints({ page: 1, limit: 100 });
283+
setLoadingDataSources(prev => {
284+
const next = new Set(prev);
285+
next.delete("endpoints");
286+
return next;
287+
});
288+
})()
289+
);
290+
}
291+
245292
Promise.all(fetchPromises).finally(() => {
246293
setLoadingDataSources(new Set());
247294
});

services/budadmin/src/types/actions.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ export type ParamType =
2222
| 'cluster_ref'
2323
| 'project_ref'
2424
| 'endpoint_ref'
25-
| 'provider_ref';
25+
| 'provider_ref'
26+
| 'credential_ref';
2627

2728
export interface SelectOption {
2829
value: string;

services/budapp/budapp/cluster_ops/schemas.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,10 @@ class RecommendedClusterRequest(BaseModel):
605605
default=None,
606606
description="Comma-separated model endpoint types (e.g., 'EMBEDDING', 'LLM') for engine selection",
607607
)
608+
debug: bool = Field(
609+
default=False,
610+
description="If True, run simulation synchronously and wait for completion before returning",
611+
)
608612

609613

610614
class GrafanaDashboardResponse(SuccessResponse):

services/budapp/budapp/endpoint_ops/endpoint_routes.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,22 @@
1919
from typing import List, Optional, Union
2020
from uuid import UUID
2121

22-
from fastapi import APIRouter, Depends, Header, Query, status
22+
from fastapi import APIRouter, Body, Depends, Header, Query, status
2323
from sqlalchemy.orm import Session
2424
from typing_extensions import Annotated
2525

2626
from budapp.commons import logging
2727
from budapp.commons.dependencies import (
2828
get_current_active_user,
29+
get_current_active_user_or_internal,
2930
get_session,
3031
parse_ordering_fields,
3132
)
3233
from budapp.commons.exceptions import ClientException
3334
from budapp.user_ops.schemas import User
3435

3536
from ..commons.constants import PermissionEnum
36-
from ..commons.permission_handler import require_permissions
37+
from ..commons.permission_handler import require_permissions, require_permissions_or_internal
3738
from ..commons.schemas import ErrorResponse, SuccessResponse
3839
from ..workflow_ops.schemas import RetrieveWorkflowDataResponse
3940
from ..workflow_ops.services import WorkflowService
@@ -46,6 +47,7 @@
4647
DeleteWorkerRequest,
4748
DeploymentPricingResponse,
4849
DeploymentSettingsResponse,
50+
EndpointDeleteResponse,
4951
EndpointFilter,
5052
EndpointPaginatedResponse,
5153
ModelClusterDetailResponse,
@@ -146,28 +148,37 @@ async def list_all_endpoints(
146148
"description": "Invalid request parameters",
147149
},
148150
status.HTTP_200_OK: {
149-
"model": SuccessResponse,
151+
"model": EndpointDeleteResponse,
150152
"description": "Successfully executed delete endpoint workflow",
151153
},
152154
},
153155
description="Delete an endpoint by ID",
154156
)
155157
@require_permissions(permissions=[PermissionEnum.ENDPOINT_MANAGE])
156158
async def delete_endpoint(
157-
current_user: Annotated[User, Depends(get_current_active_user)],
159+
current_user: Annotated[User, Depends(get_current_active_user_or_internal)],
158160
session: Annotated[Session, Depends(get_session)],
159161
endpoint_id: UUID,
160162
x_resource_type: Annotated[Optional[str], Header()] = None,
161163
x_entity_id: Annotated[Optional[str], Header()] = None,
162-
) -> Union[SuccessResponse, ErrorResponse]:
163-
"""Delete a endpoint by its ID."""
164+
callback_topic: Annotated[Optional[str], Body(embed=True)] = None,
165+
) -> Union[EndpointDeleteResponse, ErrorResponse]:
166+
"""Delete a endpoint by its ID.
167+
168+
Args:
169+
callback_topic: Optional Dapr pub/sub topic for budpipeline integration.
170+
If provided, completion events will be published to this topic.
171+
"""
164172
try:
165-
db_workflow = await EndpointService(session).delete_endpoint(endpoint_id, current_user.id)
173+
db_workflow = await EndpointService(session).delete_endpoint(
174+
endpoint_id, current_user.id, callback_topic=callback_topic
175+
)
166176
logger.debug(f"Endpoint deleting initiated with workflow id: {db_workflow.id}")
167-
return SuccessResponse(
177+
return EndpointDeleteResponse(
168178
message="Deployment deleting initiated successfully",
169179
code=status.HTTP_200_OK,
170180
object="endpoint.delete",
181+
workflow_id=db_workflow.id,
171182
)
172183
except ClientException as e:
173184
logger.exception(f"Failed to delete endpoint: {e}")
@@ -1018,10 +1029,10 @@ async def update_deployment_settings(
10181029
},
10191030
description="Get autoscale configuration for an endpoint",
10201031
)
1021-
@require_permissions(permissions=[PermissionEnum.ENDPOINT_VIEW])
1032+
@require_permissions_or_internal(permissions=[PermissionEnum.ENDPOINT_VIEW])
10221033
async def get_autoscale_config(
10231034
endpoint_id: UUID,
1024-
current_user: Annotated[User, Depends(get_current_active_user)],
1035+
current_user: Annotated[User, Depends(get_current_active_user_or_internal)],
10251036
session: Annotated[Session, Depends(get_session)],
10261037
x_resource_type: Annotated[Optional[str], Header()] = None,
10271038
x_entity_id: Annotated[Optional[str], Header()] = None,
@@ -1065,11 +1076,11 @@ async def get_autoscale_config(
10651076
},
10661077
description="Update autoscale configuration for an endpoint",
10671078
)
1068-
@require_permissions(permissions=[PermissionEnum.ENDPOINT_MANAGE])
1079+
@require_permissions_or_internal(permissions=[PermissionEnum.ENDPOINT_MANAGE])
10691080
async def update_autoscale_config(
10701081
endpoint_id: UUID,
10711082
request: UpdateAutoscaleRequest,
1072-
current_user: Annotated[User, Depends(get_current_active_user)],
1083+
current_user: Annotated[User, Depends(get_current_active_user_or_internal)],
10731084
session: Annotated[Session, Depends(get_session)],
10741085
x_resource_type: Annotated[Optional[str], Header()] = None,
10751086
x_entity_id: Annotated[Optional[str], Header()] = None,

services/budapp/budapp/endpoint_ops/schemas.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,15 @@ class EndpointPaginatedResponse(PaginatedSuccessResponse):
100100
endpoints: list[EndpointListResponse] = []
101101

102102

103+
class EndpointDeleteResponse(SuccessResponse):
104+
"""Response schema for endpoint delete operation.
105+
106+
Includes the workflow_id for tracking the async delete workflow.
107+
"""
108+
109+
workflow_id: Optional[UUID] = None
110+
111+
103112
class WorkerInfoFilter(BaseModel):
104113
"""Filter for worker info."""
105114

services/budapp/budapp/endpoint_ops/services.py

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,17 @@ async def get_all_endpoints(
230230
project_id, offset, limit, filters, order_by, search, status_filter
231231
)
232232

233-
async def delete_endpoint(self, endpoint_id: UUID, current_user_id: UUID) -> WorkflowModel:
234-
"""Delete an endpoint by its ID."""
233+
async def delete_endpoint(
234+
self, endpoint_id: UUID, current_user_id: UUID, callback_topic: Optional[str] = None
235+
) -> WorkflowModel:
236+
"""Delete an endpoint by its ID.
237+
238+
Args:
239+
endpoint_id: The endpoint ID to delete.
240+
current_user_id: The ID of the user initiating the delete.
241+
callback_topic: Optional Dapr pub/sub topic for budpipeline integration.
242+
If provided, completion events will be published to this topic.
243+
"""
235244
db_endpoint = await EndpointDataManager(self.session).retrieve_by_fields(
236245
EndpointModel, {"id": endpoint_id}, exclude_fields={"status": EndpointStatusEnum.DELETED}
237246
)
@@ -326,7 +335,11 @@ async def delete_endpoint(self, endpoint_id: UUID, current_user_id: UUID) -> Wor
326335
# Perform delete endpoint request to bud_cluster app
327336
if has_cluster:
328337
bud_cluster_response = await self._perform_bud_cluster_delete_endpoint_request(
329-
db_endpoint.cluster.cluster_id, db_endpoint.namespace, current_user_id, db_workflow.id
338+
db_endpoint.cluster.cluster_id,
339+
db_endpoint.namespace,
340+
current_user_id,
341+
db_workflow.id,
342+
callback_topic=callback_topic,
330343
)
331344
else:
332345
# For cloud models without cluster, skip bud_cluster deletion
@@ -380,13 +393,22 @@ async def delete_endpoint(self, endpoint_id: UUID, current_user_id: UUID) -> Wor
380393
return db_workflow
381394

382395
async def _perform_bud_cluster_delete_endpoint_request(
383-
self, bud_cluster_id: Optional[UUID], namespace: str, current_user_id: UUID, workflow_id: UUID
396+
self,
397+
bud_cluster_id: Optional[UUID],
398+
namespace: str,
399+
current_user_id: UUID,
400+
workflow_id: UUID,
401+
callback_topic: Optional[str] = None,
384402
) -> Dict:
385403
"""Perform delete endpoint request to bud_cluster app.
386404
387405
Args:
388406
bud_cluster_id: The ID of the cluster being served by the endpoint to delete.
389407
namespace: The namespace of the cluster endpoint to delete.
408+
current_user_id: The ID of the user initiating the delete.
409+
workflow_id: The workflow ID tracking this deletion.
410+
callback_topic: Optional Dapr pub/sub topic for budpipeline integration.
411+
If provided, completion events will also be published to this topic.
390412
"""
391413
if not bud_cluster_id:
392414
logger.warning(
@@ -397,6 +419,11 @@ async def _perform_bud_cluster_delete_endpoint_request(
397419
f"{app_settings.dapr_base_url}/v1.0/invoke/{app_settings.bud_cluster_app_id}/method/deployment/delete"
398420
)
399421

422+
# Build source_topic - always use list for consistent type
423+
source_topics = [app_settings.source_topic]
424+
if callback_topic:
425+
source_topics.append(callback_topic)
426+
400427
payload = {
401428
"cluster_id": str(bud_cluster_id),
402429
"namespace": namespace,
@@ -405,7 +432,7 @@ async def _perform_bud_cluster_delete_endpoint_request(
405432
"subscriber_ids": str(current_user_id),
406433
"workflow_id": str(workflow_id),
407434
},
408-
"source_topic": f"{app_settings.source_topic}",
435+
"source_topic": source_topics,
409436
}
410437

411438
logger.debug(f"Performing delete endpoint request to budcluster {payload}")

services/budapp/budapp/model_ops/model_routes.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -914,9 +914,9 @@ async def get_catalog_model_details(
914914
},
915915
description="Retrieve details of a model by ID",
916916
)
917-
@require_permissions(permissions=[PermissionEnum.MODEL_VIEW])
917+
@require_permissions_or_internal(permissions=[PermissionEnum.MODEL_VIEW])
918918
async def retrieve_model(
919-
current_user: Annotated[User, Depends(get_current_active_user)],
919+
current_user: Annotated[User, Depends(get_current_active_user_or_internal)],
920920
session: Annotated[Session, Depends(get_session)],
921921
model_id: UUID,
922922
) -> Union[ModelDetailSuccessResponse, ErrorResponse]:
@@ -1231,6 +1231,8 @@ async def deploy_model_by_step(
12311231
chat_template=deploy_request.chat_template,
12321232
supports_lora=deploy_request.supports_lora,
12331233
supports_pipeline_parallelism=deploy_request.supports_pipeline_parallelism,
1234+
callback_topic=deploy_request.callback_topic,
1235+
simulator_id=deploy_request.simulator_id,
12341236
)
12351237

12361238
return await WorkflowService(session).retrieve_workflow_data(db_workflow.id)

0 commit comments

Comments
 (0)