Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion services/budadmin/src/flows/Pipeline/EditPipeline.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export default function EditPipeline() {
<BudForm
data={{
name: pipeline.name || "",
icon: pipeline.icon || "🔄",
description: pipeline.dag?.description || "",
}}
drawerLoading={isSaving}
Expand All @@ -50,7 +51,7 @@ export default function EditPipeline() {
description: values.description,
};

const result = await updateWorkflow(pipeline.id, updatedDag);
const result = await updateWorkflow(pipeline.id, updatedDag, values.icon);
if (result) {
successToast("Pipeline updated successfully");
closeDrawer();
Expand Down
2 changes: 1 addition & 1 deletion services/budadmin/src/flows/Pipeline/NewPipeline.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export default function NewPipeline() {
outputs: {},
};

const result = await createWorkflow(dag);
const result = await createWorkflow(dag, values.icon);
if (result) {
successToast("Pipeline draft created");
closeDrawer();
Expand Down
2 changes: 1 addition & 1 deletion services/budadmin/src/pages/home/budpipelines/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ const WorkflowCard = ({
{/* Header with icon */}
<div className="pr-0 flex justify-between items-start gap-3">
<div className="w-[2.40125rem] h-[2.40125rem] bg-[#1F1F1F] rounded-[5px] flex items-center justify-center text-xl">
🔄
{workflow.icon || "🔄"}
</div>
<ConfigProvider
theme={{
Expand Down
13 changes: 9 additions & 4 deletions services/budadmin/src/stores/useBudPipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export type BudPipelineItem = {
updated_at?: string;
step_count: number;
dag: DAGDefinition;
icon?: string;
warnings?: string[];
execution_count?: number;
last_execution_at?: string;
Expand Down Expand Up @@ -645,8 +646,8 @@ type BudPipelineStore = {
getWorkflow: (id: string) => Promise<void>;
getExecutions: (workflowId?: string, page?: number, pageSize?: number) => Promise<void>;
getExecution: (executionId: string) => Promise<void>;
createWorkflow: (dag: DAGDefinition) => Promise<BudPipelineItem | null>;
updateWorkflow: (id: string, dag: DAGDefinition) => Promise<BudPipelineItem | null>;
createWorkflow: (dag: DAGDefinition, icon?: string) => Promise<BudPipelineItem | null>;
updateWorkflow: (id: string, dag: DAGDefinition, icon?: string) => Promise<BudPipelineItem | null>;
executeWorkflow: (workflowId: string, params: Record<string, any>) => Promise<PipelineExecution | null>;
deleteWorkflow: (id: string) => Promise<boolean>;
validatePipeline: (dag: DAGDefinition) => Promise<ValidationResult>;
Expand Down Expand Up @@ -852,7 +853,7 @@ export const useBudPipeline = create<BudPipelineStore>((set, get) => ({
},

// Create workflow
createWorkflow: async (dag: DAGDefinition) => {
createWorkflow: async (dag: DAGDefinition, icon?: string) => {
set({ isLoading: true, error: null });

if (USE_MOCK_DATA) {
Expand All @@ -865,6 +866,7 @@ export const useBudPipeline = create<BudPipelineStore>((set, get) => ({
created_at: new Date().toISOString(),
step_count: dag.steps.length,
dag,
icon,
execution_count: 0,
};
set((state) => ({
Expand All @@ -878,6 +880,7 @@ export const useBudPipeline = create<BudPipelineStore>((set, get) => ({
const response = await AppRequest.Post(BUDPIPELINE_API, {
dag,
name: dag.name,
icon,
});
const newWorkflow = response.data;
set((state) => ({
Expand All @@ -896,7 +899,7 @@ export const useBudPipeline = create<BudPipelineStore>((set, get) => ({
},

// Update workflow
updateWorkflow: async (id: string, dag: DAGDefinition) => {
updateWorkflow: async (id: string, dag: DAGDefinition, icon?: string) => {
set({ isLoading: true, error: null });

if (USE_MOCK_DATA) {
Expand All @@ -910,6 +913,7 @@ export const useBudPipeline = create<BudPipelineStore>((set, get) => ({
updated_at: new Date().toISOString(),
step_count: dag.steps.length,
dag,
icon,
execution_count: get().workflows.find((w) => w.id === id)?.execution_count || 0,
};
set((state) => ({
Expand All @@ -924,6 +928,7 @@ export const useBudPipeline = create<BudPipelineStore>((set, get) => ({
const response = await AppRequest.Put(`${BUDPIPELINE_API}/${id}`, {
dag,
name: dag.name,
icon,
});
const updatedWorkflow = response.data;
set((state) => ({
Expand Down
2 changes: 2 additions & 0 deletions services/budapp/budapp/workflow_ops/budpipeline_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ async def create_budpipeline(
name=request_body.get("name"),
user_id=str(current_user.id),
system_owned=request_body.get("system_owned", False),
icon=request_body.get("icon"),
)
return JSONResponse(content=result, status_code=status.HTTP_201_CREATED)
except ClientException as e:
Expand Down Expand Up @@ -1053,6 +1054,7 @@ async def update_budpipeline(
dag=request_body.get("dag"),
name=request_body.get("name"),
user_id=str(current_user.id),
icon=request_body.get("icon"),
)
return JSONResponse(content=result, status_code=status.HTTP_200_OK)
except ClientException as e:
Expand Down
24 changes: 17 additions & 7 deletions services/budapp/budapp/workflow_ops/budpipeline_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async def create_pipeline(
name: Optional[str] = None,
user_id: Optional[str] = None,
system_owned: bool = False,
icon: Optional[str] = None,
) -> Dict[str, Any]:
"""Create a new pipeline in budpipeline service.

Expand All @@ -81,6 +82,7 @@ async def create_pipeline(
name: Optional pipeline name override
user_id: The ID of the user creating the pipeline
system_owned: True if this is a system-owned pipeline visible to all users
icon: Optional icon/emoji for UI representation

Returns:
Created pipeline data including ID
Expand All @@ -89,16 +91,20 @@ async def create_pipeline(
ClientException: If creation fails
"""
try:
data: Dict[str, Any] = {
"dag": dag,
"name": name,
"user_id": user_id,
"system_owned": system_owned,
}
if icon is not None:
data["icon"] = icon
Comment on lines +94 to +101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency and to avoid sending None values for optional fields, it would be better to build the data dictionary by only including optional fields (name, user_id, icon) if they are not None. This would align with the pattern used in update_pipeline in this same file, making the code more robust and predictable.

            data: Dict[str, Any] = {
                "dag": dag,
                "system_owned": system_owned,
            }
            if name is not None:
                data["name"] = name
            if user_id is not None:
                data["user_id"] = user_id
            if icon is not None:
                data["icon"] = icon


result = await DaprService.invoke_service(
app_id=BUDPIPELINE_APP_ID,
method_path="pipelines",
method="POST",
data={
"dag": dag,
"name": name,
"user_id": user_id,
"system_owned": system_owned,
},
data=data,
)
return result
except ClientException:
Expand Down Expand Up @@ -212,6 +218,7 @@ async def update_pipeline(
dag: Optional[Dict[str, Any]] = None,
name: Optional[str] = None,
user_id: Optional[str] = None,
icon: Optional[str] = None,
) -> Dict[str, Any]:
"""Update a pipeline's DAG definition.

Expand All @@ -220,6 +227,7 @@ async def update_pipeline(
dag: New DAG definition
name: Optional new name
user_id: User ID for permission check (optional)
icon: Optional icon/emoji for UI representation

Returns:
Updated pipeline data
Expand All @@ -228,11 +236,13 @@ async def update_pipeline(
ClientException: If update fails
"""
try:
data = {}
data: Dict[str, Any] = {}
if dag is not None:
data["dag"] = dag
if name is not None:
data["name"] = name
if icon is not None:
data["icon"] = icon

headers = {}
if user_id:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Add icon column to pipeline_definition.

Revision ID: 006_add_icon
Revises: 005_user_isolation
Create Date: 2026-02-10 12:00:00.000000

This migration adds an optional icon column to pipeline_definition
for storing a user-selected emoji/icon for UI representation.
"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "006_add_icon"
down_revision = "005_user_isolation"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column(
"pipeline_definition",
sa.Column(
"icon",
sa.String(255),
nullable=True,
comment="Optional icon/emoji for UI representation",
),
)


def downgrade() -> None:
op.drop_column("pipeline_definition", "icon")
3 changes: 3 additions & 0 deletions services/budpipeline/budpipeline/pipeline/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ async def create(
dag_definition: dict[str, Any],
created_by: str,
description: str | None = None,
icon: str | None = None,
status: PipelineStatus = PipelineStatus.DRAFT,
user_id: UUID | None = None,
system_owned: bool = False,
Expand All @@ -58,6 +59,7 @@ async def create(
dag_definition: Complete pipeline DAG definition.
created_by: User or service that created the pipeline.
description: Optional pipeline description.
icon: Optional icon/emoji for UI representation.
status: Initial pipeline status (default: draft).
user_id: UUID of the owning user (None for system/anonymous pipelines).
system_owned: True if this is a system-owned pipeline visible to all users.
Expand All @@ -72,6 +74,7 @@ async def create(
definition = PipelineDefinition(
name=name,
description=description,
icon=icon,
dag_definition=dag_definition,
status=status,
step_count=step_count,
Expand Down
5 changes: 5 additions & 0 deletions services/budpipeline/budpipeline/pipeline/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ class PipelineDefinition(Base):
nullable=True,
comment="Optional pipeline description",
)
icon: Mapped[str | None] = mapped_column(
String(255),
nullable=True,
comment="Optional icon/emoji for UI representation",
)

# DAG definition
dag_definition: Mapped[dict[str, Any]] = mapped_column(
Expand Down
5 changes: 5 additions & 0 deletions services/budpipeline/budpipeline/pipeline/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ async def create_pipeline(
name_override=request.name,
created_by="api", # TODO: Get from auth context
description=None,
icon=request.icon,
user_id=user_id,
system_owned=request.system_owned,
)
Expand All @@ -108,6 +109,7 @@ async def create_pipeline(
step_count=pipeline["step_count"],
user_id=pipeline.get("user_id"),
system_owned=pipeline.get("system_owned", False),
icon=pipeline.get("icon"),
)
except DuplicatePipelineNameError as e:
raise HTTPException(
Expand Down Expand Up @@ -154,6 +156,7 @@ async def list_pipelines(
user_id=p.get("user_id"),
system_owned=p.get("system_owned", False),
description=p.get("description"),
icon=p.get("icon"),
dag=p.get("dag"),
execution_count=p.get("execution_count", 0),
last_execution_at=p.get("last_execution_at"),
Expand Down Expand Up @@ -246,6 +249,7 @@ async def update_pipeline(
pipeline_id=pipeline_id,
dag_dict=request.dag,
name_override=request.name,
icon=request.icon,
)
return PipelineResponse(
id=pipeline["id"],
Expand All @@ -256,6 +260,7 @@ async def update_pipeline(
step_count=pipeline["step_count"],
user_id=pipeline.get("user_id"),
system_owned=pipeline.get("system_owned", False),
icon=pipeline.get("icon"),
)
except WorkflowNotFoundError:
raise HTTPException(
Expand Down
3 changes: 3 additions & 0 deletions services/budpipeline/budpipeline/pipeline/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class PipelineCreateRequest(BaseModel):

dag: dict[str, Any] = Field(..., description="The DAG definition")
name: str | None = Field(None, description="Optional pipeline name override")
icon: str | None = Field(None, description="Optional icon/emoji for UI representation")
user_id: str | None = Field(
None, description="User ID for pipeline ownership (set by service from auth context)"
)
Expand All @@ -43,6 +44,7 @@ class PipelineResponse(BaseModel):
user_id: str | None = None
system_owned: bool = False
description: str | None = None
icon: str | None = None
dag: dict[str, Any] | None = None
execution_count: int = 0
last_execution_at: datetime | None = None
Expand Down Expand Up @@ -351,6 +353,7 @@ class PipelineDefinitionResponse(BaseModel):
version: int = Field(..., description="Optimistic locking version")
name: str = Field(..., description="Human-readable pipeline name")
description: str | None = Field(None, description="Optional pipeline description")
icon: str | None = Field(None, description="Optional icon/emoji for UI representation")
status: PipelineStatus = Field(..., description="Current pipeline status")
step_count: int = Field(..., description="Number of steps in the pipeline DAG")
dag: dict[str, Any] = Field(..., alias="dag_definition", description="Pipeline DAG definition")
Expand Down
Loading
Loading