From ae1f0cb2083aef68fdf3c42b6fc7822476797ef3 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Tue, 6 Jan 2026 15:45:11 +0000 Subject: [PATCH 01/31] feat: unsaved changes --- .../components/Header.tsx | 44 +++++--- .../components/WorkflowEditor.tsx | 2 + .../hooks/useUnsavedChanges.ts | 100 ++++++++++++++++++ .../stores/createSessionContextStore.ts | 11 ++ assets/js/collaborative-editor/types/edge.ts | 2 +- assets/js/collaborative-editor/types/job.ts | 2 +- .../types/sessionContext.ts | 12 +++ .../js/collaborative-editor/types/trigger.ts | 4 +- .../channels/workflow_channel.ex | 12 ++- 9 files changed, 164 insertions(+), 25 deletions(-) create mode 100644 assets/js/collaborative-editor/hooks/useUnsavedChanges.ts diff --git a/assets/js/collaborative-editor/components/Header.tsx b/assets/js/collaborative-editor/components/Header.tsx index b74293fc9e0..76af321bc54 100644 --- a/assets/js/collaborative-editor/components/Header.tsx +++ b/assets/js/collaborative-editor/components/Header.tsx @@ -39,6 +39,7 @@ import { NewRunButton } from './NewRunButton'; import { ReadOnlyWarning } from './ReadOnlyWarning'; import { ShortcutKeys } from './ShortcutKeys'; import { Tooltip } from './Tooltip'; +import { useUnsavedChanges } from '../hooks/useUnsavedChanges'; /** * Save button component - visible in React DevTools @@ -54,6 +55,7 @@ export function SaveButton({ label = 'Save', canSync, syncTooltipMessage, + hasChanges, }: { canSave: boolean; tooltipMessage: string; @@ -63,34 +65,40 @@ export function SaveButton({ label?: string; canSync: boolean; syncTooltipMessage: string | null; + hasChanges: boolean; }) { const hasGitHubIntegration = repoConnection !== null; if (!hasGitHubIntegration) { return ( -
- : tooltipMessage - } - side="bottom" - > - - + onClick={onClick} + disabled={!canSave} + > + {label} me + + +
+ {hasChanges ? ( +
+ ) : null} ); } @@ -204,6 +212,7 @@ export function Header({ const { provider } = useSession(); const limits = useLimits(); const { isReadOnly } = useWorkflowReadOnly(); + const { hasChanges } = useUnsavedChanges(); // Check GitHub sync limit const githubSyncLimit = limits.github_sync ?? { @@ -410,6 +419,7 @@ export function Header({ label={isNewWorkflow ? 'Create' : 'Save'} canSync={githubSyncLimit.allowed} syncTooltipMessage={githubSyncLimit.message} + hasChanges={hasChanges} /> diff --git a/assets/js/collaborative-editor/components/WorkflowEditor.tsx b/assets/js/collaborative-editor/components/WorkflowEditor.tsx index 71ce75abbaf..53ddc60d5c7 100644 --- a/assets/js/collaborative-editor/components/WorkflowEditor.tsx +++ b/assets/js/collaborative-editor/components/WorkflowEditor.tsx @@ -35,6 +35,7 @@ import { ManualRunPanel } from './ManualRunPanel'; import { ManualRunPanelErrorBoundary } from './ManualRunPanelErrorBoundary'; import { TemplateDetailsCard } from './TemplateDetailsCard'; import { Tooltip } from './Tooltip'; +import { useUnsavedChanges } from '../hooks/useUnsavedChanges'; interface WorkflowEditorProps { parentProjectId?: string | null; @@ -45,6 +46,7 @@ export function WorkflowEditor({ parentProjectId = null, parentProjectName = null, }: WorkflowEditorProps = {}) { + useUnsavedChanges(); const { params, updateSearchParams } = useURLState(); const { currentNode, selectNode } = useNodeSelection(); const workflowStore = useWorkflowStoreContext(); diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts new file mode 100644 index 00000000000..6c2ab1f1a0b --- /dev/null +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -0,0 +1,100 @@ +import { useSessionContext } from './useSessionContext'; +import { useWorkflowState } from './useWorkflow'; + +export function useUnsavedChanges() { + const { workflow } = useSessionContext(); + const wf = { + jobs: workflow?.jobs.map(j => ({ + id: j.id, + name: j.name, + body: j.body || '', + adaptor: j.adaptor, + project_credential_id: j.project_credential_id, + keychain_credential_id: j.keychain_credential_id, + })), + triggers: workflow?.triggers.map(t => ({ + cron_expression: t.cron_expression, + enabled: t.enabled, + id: t.id, + type: t.type, + })), + edges: workflow?.edges.map(e => ({ + condition_expression: e.condition_expression, + condition_label: e.condition_label, + condition_type: e.condition_type, + enabled: e.enabled, + id: e.id, + source_job_id: e.source_job_id, + source_trigger_id: e.source_trigger_id, + target_job_id: e.target_job_id, + })), + positions: workflow?.positions, + name: workflow?.name, + }; + const storeWorkflow = useWorkflowState(state => ({ + jobs: state.jobs.map(j => ({ + id: j.id, + name: j.name, + body: j.body || '', + adaptor: j.adaptor, + project_credential_id: j.project_credential_id, + keychain_credential_id: j.keychain_credential_id, + })), + triggers: state.triggers.map(t => ({ + cron_expression: t.cron_expression, + enabled: t.enabled, + id: t.id, + type: t.type, + })), + edges: state.edges.map(e => ({ + condition_expression: e.condition_expression, + condition_label: e.condition_label, + condition_type: e.condition_type, + enabled: e.enabled, + id: e.id, + source_job_id: e.source_job_id, + source_trigger_id: e.source_trigger_id, + target_job_id: e.target_job_id, + })), + positions: state.positions, + name: state.workflow?.name, + })); + + // pick items in the exising and check if the new matches it. + + return { hasChanges: isDiff(wf, storeWorkflow) }; +} + +function isDiff(base: unknown, target: unknown) { + const undef = [undefined, null]; + // @ts-expect-error + if (undef.includes(base) && undef.includes(target)) return false; + if (typeof base !== typeof target) return true; + if (Array.isArray(base) && Array.isArray(target)) { + if (base.length !== target.length) return true; + // enter the array + // iterate the array and check each item + let final = false; + for (let idx = 0; idx < base.length; idx++) { + final ||= isDiff(base[idx], target[idx]); + } + console.log('array:final', final); + return final; + } else if ( + base && + target && + typeof base === 'object' && + typeof target === 'object' + ) { + // iterate the object and check each item + let final = false; + console.log(':final', Object.keys(base)); + for (const key of Object.keys(base)) { + final ||= isDiff(base[key], target[key]); + } + console.log('object:final', final); + return final; + } else { + return target !== base; + } +} diff --git a/assets/js/collaborative-editor/stores/createSessionContextStore.ts b/assets/js/collaborative-editor/stores/createSessionContextStore.ts index 796cbafc04f..e1ae67befaa 100644 --- a/assets/js/collaborative-editor/stores/createSessionContextStore.ts +++ b/assets/js/collaborative-editor/stores/createSessionContextStore.ts @@ -171,6 +171,7 @@ export const createSessionContextStore = ( */ const handleSessionContextReceived = (rawData: unknown) => { const result = SessionContextResponseSchema.safeParse(rawData); + console.log('han:base', result.data?.workflow, result.error, rawData); if (result.success) { const sessionContext = result.data; @@ -178,6 +179,7 @@ export const createSessionContextStore = ( state = produce(state, draft => { draft.user = sessionContext.user; draft.project = sessionContext.project; + draft.workflow = sessionContext.workflow ?? null; draft.config = sessionContext.config; draft.permissions = sessionContext.permissions; draft.latestSnapshotLockVersion = @@ -293,6 +295,12 @@ export const createSessionContextStore = ( notify('setLatestSnapshotLockVersion'); }; + const setBaseWorkflow = (workflow: unknown) => { + state = produce(state, draft => { + draft.workflow = workflow as any; + }); + }; + /** * Clear isNewWorkflow flag * Called after first successful save of a new workflow @@ -445,6 +453,9 @@ export const createSessionContextStore = ( ).latest_snapshot_lock_version; logger.debug('Workflow saved - updating lock version', lockVersion); setLatestSnapshotLockVersion(lockVersion); + if ('workflow' in message) { + setBaseWorkflow(message.workflow); + } } }; diff --git a/assets/js/collaborative-editor/types/edge.ts b/assets/js/collaborative-editor/types/edge.ts index f5996409266..3e30dbc0f4f 100644 --- a/assets/js/collaborative-editor/types/edge.ts +++ b/assets/js/collaborative-editor/types/edge.ts @@ -12,7 +12,7 @@ export const EdgeConditionType = z.enum([ export const EdgeSchema = z.object({ // Core identifiers id: uuidSchema, - workflow_id: uuidSchema, + workflow_id: uuidSchema.optional(), // Source (mutually exclusive) source_job_id: uuidSchema.nullable().optional(), diff --git a/assets/js/collaborative-editor/types/job.ts b/assets/js/collaborative-editor/types/job.ts index 67be1592cd9..53a06a379f2 100644 --- a/assets/js/collaborative-editor/types/job.ts +++ b/assets/js/collaborative-editor/types/job.ts @@ -30,7 +30,7 @@ export const JobSchema = z keychain_credential_id: uuidSchema.nullable().default(null), // Association fields - workflow_id: uuidSchema, + workflow_id: uuidSchema.optional(), // Virtual field for form deletion logic delete: z.boolean().optional(), diff --git a/assets/js/collaborative-editor/types/sessionContext.ts b/assets/js/collaborative-editor/types/sessionContext.ts index a3ceea9fdca..a880c965948 100644 --- a/assets/js/collaborative-editor/types/sessionContext.ts +++ b/assets/js/collaborative-editor/types/sessionContext.ts @@ -2,6 +2,10 @@ import type { PhoenixChannelProvider } from 'y-phoenix-channel'; import * as z from 'zod'; import { isoDateTimeSchema, uuidSchema } from './common'; +import { type Workflow } from './workflow'; +import { JobSchema } from './job'; +import { TriggerSchema } from './trigger'; +import { EdgeSchema } from './edge'; export const UserContextSchema = z.object({ id: uuidSchema, @@ -102,6 +106,13 @@ export const SessionContextResponseSchema = z.object({ workflow_template: WorkflowTemplateSchema.nullable(), has_read_ai_disclaimer: z.boolean(), limits: LimitsSchema.optional(), + workflow: z.object({ + jobs: z.array(JobSchema), + triggers: z.array(TriggerSchema), + edges: z.array(EdgeSchema), + positions: z.record(z.string(), z.any()).optional().default({}), + name: z.string(), + }), // to be fixed }); export type UserContext = z.infer; @@ -112,6 +123,7 @@ export type AppConfig = z.infer; export interface SessionContextState { user: UserContext | null; project: ProjectContext | null; + workflow: Workflow | null; config: AppConfig | null; permissions: Permissions | null; latestSnapshotLockVersion: number | null; diff --git a/assets/js/collaborative-editor/types/trigger.ts b/assets/js/collaborative-editor/types/trigger.ts index 477713aab4e..8c85b323893 100644 --- a/assets/js/collaborative-editor/types/trigger.ts +++ b/assets/js/collaborative-editor/types/trigger.ts @@ -17,8 +17,8 @@ const baseTriggerSchema = z.object({ // Webhook trigger schema const webhookTriggerSchema = baseTriggerSchema.extend({ type: z.literal('webhook'), - cron_expression: z.null(), - kafka_configuration: z.null(), + cron_expression: z.null().default(null), + kafka_configuration: z.null().default(null), }); // Cron trigger schema with professional validation using cron-validator diff --git a/lib/lightning_web/channels/workflow_channel.ex b/lib/lightning_web/channels/workflow_channel.ex index c392395e34d..ed75c6b3a14 100644 --- a/lib/lightning_web/channels/workflow_channel.ex +++ b/lib/lightning_web/channels/workflow_channel.ex @@ -189,7 +189,9 @@ defmodule LightningWeb.WorkflowChannel do if is_nil(workflow.lock_version) do workflow else - Lightning.Workflows.get_workflow(workflow.id) + Lightning.Workflows.get_workflow(workflow.id, + include: [:edges, :jobs, :triggers] + ) end project_repo_connection = @@ -214,7 +216,8 @@ defmodule LightningWeb.WorkflowChannel do workflow_template: render_workflow_template(workflow_template), has_read_ai_disclaimer: Lightning.AiAssistant.user_has_read_disclaimer?(user), - limits: render_limits(project.id) + limits: render_limits(project.id), + workflow: (fresh_workflow && fresh_workflow) || %{} } end) end @@ -324,8 +327,9 @@ defmodule LightningWeb.WorkflowChannel do {:ok, workflow} <- Session.save_workflow(session_pid, user) do # Broadcast the new lock_version to all users in the channel # so they can update their latestSnapshotLockVersion in SessionContextStore - broadcast_from!(socket, "workflow_saved", %{ - latest_snapshot_lock_version: workflow.lock_version + broadcast!(socket, "workflow_saved", %{ + latest_snapshot_lock_version: workflow.lock_version, + workflow: workflow }) {:reply, From 94886b90ddaf9e07b7b5925a9c7acc3d0d1f2d90 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Tue, 6 Jan 2026 17:56:29 +0000 Subject: [PATCH 02/31] fix: wording --- assets/js/collaborative-editor/components/Header.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/assets/js/collaborative-editor/components/Header.tsx b/assets/js/collaborative-editor/components/Header.tsx index 76af321bc54..c6b605c56d6 100644 --- a/assets/js/collaborative-editor/components/Header.tsx +++ b/assets/js/collaborative-editor/components/Header.tsx @@ -92,7 +92,7 @@ export function SaveButton({ onClick={onClick} disabled={!canSave} > - {label} me + {label} From 2f4f8a8ea85af573a12bbee3f5b5691b2012531d Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Tue, 6 Jan 2026 18:08:59 +0000 Subject: [PATCH 03/31] feat: udpate --- assets/js/collaborative-editor/hooks/useUnsavedChanges.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index 6c2ab1f1a0b..8821c63ee10 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -28,7 +28,7 @@ export function useUnsavedChanges() { source_trigger_id: e.source_trigger_id, target_job_id: e.target_job_id, })), - positions: workflow?.positions, + positions: workflow?.positions || {}, name: workflow?.name, }; const storeWorkflow = useWorkflowState(state => ({ @@ -56,7 +56,7 @@ export function useUnsavedChanges() { source_trigger_id: e.source_trigger_id, target_job_id: e.target_job_id, })), - positions: state.positions, + positions: state.positions || {}, name: state.workflow?.name, })); @@ -66,7 +66,7 @@ export function useUnsavedChanges() { } function isDiff(base: unknown, target: unknown) { - const undef = [undefined, null]; + const undef = [undefined, null, '']; // @ts-expect-error if (undef.includes(base) && undef.includes(target)) return false; if (typeof base !== typeof target) return true; From 6d98fd57f8ce4fac9230e78ba3f339ebc1c84444 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Tue, 6 Jan 2026 18:41:56 +0000 Subject: [PATCH 04/31] fix: resolve new workflow from templates --- assets/js/collaborative-editor/components/Header.tsx | 2 +- assets/js/collaborative-editor/hooks/useUnsavedChanges.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/assets/js/collaborative-editor/components/Header.tsx b/assets/js/collaborative-editor/components/Header.tsx index c6b605c56d6..b644e9a6305 100644 --- a/assets/js/collaborative-editor/components/Header.tsx +++ b/assets/js/collaborative-editor/components/Header.tsx @@ -419,7 +419,7 @@ export function Header({ label={isNewWorkflow ? 'Create' : 'Save'} canSync={githubSyncLimit.allowed} syncTooltipMessage={githubSyncLimit.message} - hasChanges={hasChanges} + hasChanges={hasChanges && canSave && !isNewWorkflow} /> diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index 8821c63ee10..528d81531f8 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -62,7 +62,7 @@ export function useUnsavedChanges() { // pick items in the exising and check if the new matches it. - return { hasChanges: isDiff(wf, storeWorkflow) }; + return { hasChanges: isDiff(wf, storeWorkflow) as boolean }; } function isDiff(base: unknown, target: unknown) { From ee8e7e00b3fbb0ecb9a9a0b0a258c7a763978c42 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Wed, 7 Jan 2026 08:58:32 +0000 Subject: [PATCH 05/31] feat: add workflow_transform to serializer --- .../hooks/useUnsavedChanges.ts | 58 +------------- .../collaboration/workflow_serializer.ex | 76 +++++++++++++------ .../channels/workflow_channel.ex | 7 +- 3 files changed, 60 insertions(+), 81 deletions(-) diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index 528d81531f8..bff4defecb3 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -3,66 +3,16 @@ import { useWorkflowState } from './useWorkflow'; export function useUnsavedChanges() { const { workflow } = useSessionContext(); - const wf = { - jobs: workflow?.jobs.map(j => ({ - id: j.id, - name: j.name, - body: j.body || '', - adaptor: j.adaptor, - project_credential_id: j.project_credential_id, - keychain_credential_id: j.keychain_credential_id, - })), - triggers: workflow?.triggers.map(t => ({ - cron_expression: t.cron_expression, - enabled: t.enabled, - id: t.id, - type: t.type, - })), - edges: workflow?.edges.map(e => ({ - condition_expression: e.condition_expression, - condition_label: e.condition_label, - condition_type: e.condition_type, - enabled: e.enabled, - id: e.id, - source_job_id: e.source_job_id, - source_trigger_id: e.source_trigger_id, - target_job_id: e.target_job_id, - })), - positions: workflow?.positions || {}, - name: workflow?.name, - }; const storeWorkflow = useWorkflowState(state => ({ - jobs: state.jobs.map(j => ({ - id: j.id, - name: j.name, - body: j.body || '', - adaptor: j.adaptor, - project_credential_id: j.project_credential_id, - keychain_credential_id: j.keychain_credential_id, - })), - triggers: state.triggers.map(t => ({ - cron_expression: t.cron_expression, - enabled: t.enabled, - id: t.id, - type: t.type, - })), - edges: state.edges.map(e => ({ - condition_expression: e.condition_expression, - condition_label: e.condition_label, - condition_type: e.condition_type, - enabled: e.enabled, - id: e.id, - source_job_id: e.source_job_id, - source_trigger_id: e.source_trigger_id, - target_job_id: e.target_job_id, - })), + jobs: state.jobs, + triggers: state.triggers, + edges: state.edges, positions: state.positions || {}, name: state.workflow?.name, })); - // pick items in the exising and check if the new matches it. - return { hasChanges: isDiff(wf, storeWorkflow) as boolean }; + return { hasChanges: isDiff(workflow, storeWorkflow) as boolean }; } function isDiff(base: unknown, target: unknown) { diff --git a/lib/lightning/collaboration/workflow_serializer.ex b/lib/lightning/collaboration/workflow_serializer.ex index 039fe4a40c4..1f9ca84b8f7 100644 --- a/lib/lightning/collaboration/workflow_serializer.ex +++ b/lib/lightning/collaboration/workflow_serializer.ex @@ -160,50 +160,76 @@ defmodule Lightning.Collaboration.WorkflowSerializer do # Private helper functions + def transform_workflow(workflow) do + %{ + jobs: Enum.map(workflow.jobs || [], &transform_job/1), + edges: Enum.map(workflow.edges || [], &transform_edge/1), + triggers: Enum.map(workflow.triggers || [], &transform_trigger/1), + name: workflow.name, + positions: workflow.positions || %{} + } + end + + def transform_job(job) do + %{ + "id" => job.id, + "name" => job.name || "", + "body" => job.body || "", + "adaptor" => job.adaptor, + "project_credential_id" => job.project_credential_id, + "keychain_credential_id" => job.keychain_credential_id + } + end + defp initialize_jobs(jobs_array, jobs) do Enum.each(jobs || [], fn job -> job_map = - Yex.MapPrelim.from(%{ - "id" => job.id, - "name" => job.name || "", - "body" => Yex.TextPrelim.from(job.body || ""), - "adaptor" => job.adaptor, - "project_credential_id" => job.project_credential_id, - "keychain_credential_id" => job.keychain_credential_id - }) + Yex.MapPrelim.from( + Map.update!(transform_job(job), "body", fn old -> + Yex.TextPrelim.from(old) + end) + ) Yex.Array.push(jobs_array, job_map) end) end + def transform_edge(edge) do + %{ + "condition_expression" => edge.condition_expression, + "condition_label" => edge.condition_label, + "condition_type" => edge.condition_type |> to_string(), + "enabled" => edge.enabled, + # "errors" => edge.errors, + "id" => edge.id, + "source_job_id" => edge.source_job_id, + "source_trigger_id" => edge.source_trigger_id, + "target_job_id" => edge.target_job_id + } + end + defp initialize_edges(edges_array, edges) do Enum.each(edges || [], fn edge -> edge_map = - Yex.MapPrelim.from(%{ - "condition_expression" => edge.condition_expression, - "condition_label" => edge.condition_label, - "condition_type" => edge.condition_type |> to_string(), - "enabled" => edge.enabled, - # "errors" => edge.errors, - "id" => edge.id, - "source_job_id" => edge.source_job_id, - "source_trigger_id" => edge.source_trigger_id, - "target_job_id" => edge.target_job_id - }) + Yex.MapPrelim.from(transform_edge(edge)) Yex.Array.push(edges_array, edge_map) end) end + def transform_trigger(trigger) do + %{ + "cron_expression" => trigger.cron_expression, + "enabled" => trigger.enabled, + "id" => trigger.id, + "type" => trigger.type |> to_string() + } + end + defp initialize_triggers(triggers_array, triggers) do Enum.each(triggers || [], fn trigger -> trigger_map = - Yex.MapPrelim.from(%{ - "cron_expression" => trigger.cron_expression, - "enabled" => trigger.enabled, - "id" => trigger.id, - "type" => trigger.type |> to_string() - }) + Yex.MapPrelim.from(transform_trigger(trigger)) Yex.Array.push(triggers_array, trigger_map) end) diff --git a/lib/lightning_web/channels/workflow_channel.ex b/lib/lightning_web/channels/workflow_channel.ex index ed75c6b3a14..f4cc1ad4968 100644 --- a/lib/lightning_web/channels/workflow_channel.ex +++ b/lib/lightning_web/channels/workflow_channel.ex @@ -22,6 +22,7 @@ defmodule LightningWeb.WorkflowChannel do alias Lightning.Workflows.WorkflowUsageLimiter alias Lightning.WorkOrders alias LightningWeb.Channels.WorkflowJSON + alias Lightning.Collaboration.WorkflowSerializer require Logger @@ -217,7 +218,9 @@ defmodule LightningWeb.WorkflowChannel do has_read_ai_disclaimer: Lightning.AiAssistant.user_has_read_disclaimer?(user), limits: render_limits(project.id), - workflow: (fresh_workflow && fresh_workflow) || %{} + workflow: + (fresh_workflow && + WorkflowSerializer.transform_workflow(fresh_workflow)) || %{} } end) end @@ -329,7 +332,7 @@ defmodule LightningWeb.WorkflowChannel do # so they can update their latestSnapshotLockVersion in SessionContextStore broadcast!(socket, "workflow_saved", %{ latest_snapshot_lock_version: workflow.lock_version, - workflow: workflow + workflow: WorkflowSerializer.transform_workflow(workflow) }) {:reply, From cfcbeeff6981591a80544be6e4ba2b377bafa04f Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Wed, 7 Jan 2026 09:00:22 +0000 Subject: [PATCH 06/31] feat: remove logs --- assets/js/collaborative-editor/components/WorkflowEditor.tsx | 2 -- assets/js/collaborative-editor/hooks/useUnsavedChanges.ts | 3 --- .../collaborative-editor/stores/createSessionContextStore.ts | 1 - 3 files changed, 6 deletions(-) diff --git a/assets/js/collaborative-editor/components/WorkflowEditor.tsx b/assets/js/collaborative-editor/components/WorkflowEditor.tsx index 53ddc60d5c7..71ce75abbaf 100644 --- a/assets/js/collaborative-editor/components/WorkflowEditor.tsx +++ b/assets/js/collaborative-editor/components/WorkflowEditor.tsx @@ -35,7 +35,6 @@ import { ManualRunPanel } from './ManualRunPanel'; import { ManualRunPanelErrorBoundary } from './ManualRunPanelErrorBoundary'; import { TemplateDetailsCard } from './TemplateDetailsCard'; import { Tooltip } from './Tooltip'; -import { useUnsavedChanges } from '../hooks/useUnsavedChanges'; interface WorkflowEditorProps { parentProjectId?: string | null; @@ -46,7 +45,6 @@ export function WorkflowEditor({ parentProjectId = null, parentProjectName = null, }: WorkflowEditorProps = {}) { - useUnsavedChanges(); const { params, updateSearchParams } = useURLState(); const { currentNode, selectNode } = useNodeSelection(); const workflowStore = useWorkflowStoreContext(); diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index bff4defecb3..1aedde59c5c 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -28,7 +28,6 @@ function isDiff(base: unknown, target: unknown) { for (let idx = 0; idx < base.length; idx++) { final ||= isDiff(base[idx], target[idx]); } - console.log('array:final', final); return final; } else if ( base && @@ -38,11 +37,9 @@ function isDiff(base: unknown, target: unknown) { ) { // iterate the object and check each item let final = false; - console.log(':final', Object.keys(base)); for (const key of Object.keys(base)) { final ||= isDiff(base[key], target[key]); } - console.log('object:final', final); return final; } else { return target !== base; diff --git a/assets/js/collaborative-editor/stores/createSessionContextStore.ts b/assets/js/collaborative-editor/stores/createSessionContextStore.ts index e1ae67befaa..1468e0b8fd6 100644 --- a/assets/js/collaborative-editor/stores/createSessionContextStore.ts +++ b/assets/js/collaborative-editor/stores/createSessionContextStore.ts @@ -171,7 +171,6 @@ export const createSessionContextStore = ( */ const handleSessionContextReceived = (rawData: unknown) => { const result = SessionContextResponseSchema.safeParse(rawData); - console.log('han:base', result.data?.workflow, result.error, rawData); if (result.success) { const sessionContext = result.data; From c5c40f1e44669f2e475d21418fbf20350c248fe3 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Wed, 7 Jan 2026 14:36:25 +0000 Subject: [PATCH 07/31] feat: transform workflow --- .../hooks/useUnsavedChanges.ts | 44 ++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index 1aedde59c5c..a27dd72fb92 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -1,3 +1,4 @@ +import type { Workflow } from '../types/workflow'; import { useSessionContext } from './useSessionContext'; import { useWorkflowState } from './useWorkflow'; @@ -12,7 +13,48 @@ export function useUnsavedChanges() { })); // pick items in the exising and check if the new matches it. - return { hasChanges: isDiff(workflow, storeWorkflow) as boolean }; + if (!workflow || !storeWorkflow) return false; + return { + hasChanges: isDiff( + transformWorkflow(workflow || {}), + transformWorkflow(storeWorkflow || {}) + ) as boolean, + }; +} + +function transformWorkflow(workflow: Workflow) { + return { + name: workflow.name, + jobs: (workflow.jobs || []) + .map(job => ({ + id: job.id, + name: job.name, + body: job.body, + adaptor: job.adaptor, + project_credential_id: job.project_credential_id, + keychain_credential_id: job.keychain_credential_id, + })) + .sort((a, b) => a.id.localeCompare(b.id)), + edges: (workflow.edges || []) + .map(edge => ({ + id: edge.id, + source_job_id: edge.source_job_id, + source_trigger_id: edge.source_trigger_id, + target_job_id: edge.target_job_id, + enabled: edge.enabled || false, + condition_type: edge.condition_type, + condition_label: edge.condition_label, + condition_expression: edge.condition_expression, + })) + .sort((a, b) => a.id.localeCompare(b.id)), + trigger: (workflow.triggers || []).map(trigger => ({ + id: trigger.id, + type: trigger.type, + enabled: trigger.enabled, + cron_expression: trigger.cron_expression, + })), + positions: workflow.positions || {}, + }; } function isDiff(base: unknown, target: unknown) { From e08442c597d328ebfd593569e14e0a704fb8bb64 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Wed, 7 Jan 2026 19:37:01 +0000 Subject: [PATCH 08/31] feat: remove transforms from serializer --- .../hooks/useUnsavedChanges.ts | 1 + .../collaboration/workflow_serializer.ex | 76 ++++++------------- .../channels/workflow_channel.ex | 7 +- 3 files changed, 28 insertions(+), 56 deletions(-) diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index a27dd72fb92..deb8315c5f8 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -1,4 +1,5 @@ import type { Workflow } from '../types/workflow'; + import { useSessionContext } from './useSessionContext'; import { useWorkflowState } from './useWorkflow'; diff --git a/lib/lightning/collaboration/workflow_serializer.ex b/lib/lightning/collaboration/workflow_serializer.ex index 1f9ca84b8f7..039fe4a40c4 100644 --- a/lib/lightning/collaboration/workflow_serializer.ex +++ b/lib/lightning/collaboration/workflow_serializer.ex @@ -160,76 +160,50 @@ defmodule Lightning.Collaboration.WorkflowSerializer do # Private helper functions - def transform_workflow(workflow) do - %{ - jobs: Enum.map(workflow.jobs || [], &transform_job/1), - edges: Enum.map(workflow.edges || [], &transform_edge/1), - triggers: Enum.map(workflow.triggers || [], &transform_trigger/1), - name: workflow.name, - positions: workflow.positions || %{} - } - end - - def transform_job(job) do - %{ - "id" => job.id, - "name" => job.name || "", - "body" => job.body || "", - "adaptor" => job.adaptor, - "project_credential_id" => job.project_credential_id, - "keychain_credential_id" => job.keychain_credential_id - } - end - defp initialize_jobs(jobs_array, jobs) do Enum.each(jobs || [], fn job -> job_map = - Yex.MapPrelim.from( - Map.update!(transform_job(job), "body", fn old -> - Yex.TextPrelim.from(old) - end) - ) + Yex.MapPrelim.from(%{ + "id" => job.id, + "name" => job.name || "", + "body" => Yex.TextPrelim.from(job.body || ""), + "adaptor" => job.adaptor, + "project_credential_id" => job.project_credential_id, + "keychain_credential_id" => job.keychain_credential_id + }) Yex.Array.push(jobs_array, job_map) end) end - def transform_edge(edge) do - %{ - "condition_expression" => edge.condition_expression, - "condition_label" => edge.condition_label, - "condition_type" => edge.condition_type |> to_string(), - "enabled" => edge.enabled, - # "errors" => edge.errors, - "id" => edge.id, - "source_job_id" => edge.source_job_id, - "source_trigger_id" => edge.source_trigger_id, - "target_job_id" => edge.target_job_id - } - end - defp initialize_edges(edges_array, edges) do Enum.each(edges || [], fn edge -> edge_map = - Yex.MapPrelim.from(transform_edge(edge)) + Yex.MapPrelim.from(%{ + "condition_expression" => edge.condition_expression, + "condition_label" => edge.condition_label, + "condition_type" => edge.condition_type |> to_string(), + "enabled" => edge.enabled, + # "errors" => edge.errors, + "id" => edge.id, + "source_job_id" => edge.source_job_id, + "source_trigger_id" => edge.source_trigger_id, + "target_job_id" => edge.target_job_id + }) Yex.Array.push(edges_array, edge_map) end) end - def transform_trigger(trigger) do - %{ - "cron_expression" => trigger.cron_expression, - "enabled" => trigger.enabled, - "id" => trigger.id, - "type" => trigger.type |> to_string() - } - end - defp initialize_triggers(triggers_array, triggers) do Enum.each(triggers || [], fn trigger -> trigger_map = - Yex.MapPrelim.from(transform_trigger(trigger)) + Yex.MapPrelim.from(%{ + "cron_expression" => trigger.cron_expression, + "enabled" => trigger.enabled, + "id" => trigger.id, + "type" => trigger.type |> to_string() + }) Yex.Array.push(triggers_array, trigger_map) end) diff --git a/lib/lightning_web/channels/workflow_channel.ex b/lib/lightning_web/channels/workflow_channel.ex index f4cc1ad4968..ed75c6b3a14 100644 --- a/lib/lightning_web/channels/workflow_channel.ex +++ b/lib/lightning_web/channels/workflow_channel.ex @@ -22,7 +22,6 @@ defmodule LightningWeb.WorkflowChannel do alias Lightning.Workflows.WorkflowUsageLimiter alias Lightning.WorkOrders alias LightningWeb.Channels.WorkflowJSON - alias Lightning.Collaboration.WorkflowSerializer require Logger @@ -218,9 +217,7 @@ defmodule LightningWeb.WorkflowChannel do has_read_ai_disclaimer: Lightning.AiAssistant.user_has_read_disclaimer?(user), limits: render_limits(project.id), - workflow: - (fresh_workflow && - WorkflowSerializer.transform_workflow(fresh_workflow)) || %{} + workflow: (fresh_workflow && fresh_workflow) || %{} } end) end @@ -332,7 +329,7 @@ defmodule LightningWeb.WorkflowChannel do # so they can update their latestSnapshotLockVersion in SessionContextStore broadcast!(socket, "workflow_saved", %{ latest_snapshot_lock_version: workflow.lock_version, - workflow: WorkflowSerializer.transform_workflow(workflow) + workflow: workflow }) {:reply, From 43a2d19f87ab96166df4d08347bc41cbcf071fce Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Wed, 7 Jan 2026 19:54:32 +0000 Subject: [PATCH 09/31] feat: cleanup --- .../hooks/useUnsavedChanges.ts | 26 +++++-------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index deb8315c5f8..432d2426dde 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -12,17 +12,16 @@ export function useUnsavedChanges() { positions: state.positions || {}, name: state.workflow?.name, })); - // pick items in the exising and check if the new matches it. - if (!workflow || !storeWorkflow) return false; return { - hasChanges: isDiff( + hasChanges: isDiffWorkflow( transformWorkflow(workflow || {}), - transformWorkflow(storeWorkflow || {}) - ) as boolean, + transformWorkflow((storeWorkflow as Workflow) || {}) + ), }; } +// we need the same structure of workflows to be able to compare function transformWorkflow(workflow: Workflow) { return { name: workflow.name, @@ -58,32 +57,21 @@ function transformWorkflow(workflow: Workflow) { }; } -function isDiff(base: unknown, target: unknown) { +function isDiffWorkflow(base: unknown, target: unknown): boolean { const undef = [undefined, null, '']; // @ts-expect-error if (undef.includes(base) && undef.includes(target)) return false; if (typeof base !== typeof target) return true; if (Array.isArray(base) && Array.isArray(target)) { if (base.length !== target.length) return true; - // enter the array - // iterate the array and check each item - let final = false; - for (let idx = 0; idx < base.length; idx++) { - final ||= isDiff(base[idx], target[idx]); - } - return final; + return base.some((v, i) => isDiffWorkflow(v, target[i])); } else if ( base && target && typeof base === 'object' && typeof target === 'object' ) { - // iterate the object and check each item - let final = false; - for (const key of Object.keys(base)) { - final ||= isDiff(base[key], target[key]); - } - return final; + return Object.keys(base).some(k => isDiffWorkflow(base[k], target[k])); } else { return target !== base; } From b990b8a4de57592c1f50bc390c7fa613da2edf73 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Wed, 7 Jan 2026 20:17:41 +0000 Subject: [PATCH 10/31] fix: changes --- assets/js/collaborative-editor/components/Header.tsx | 4 +++- assets/js/collaborative-editor/hooks/useUnsavedChanges.ts | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/assets/js/collaborative-editor/components/Header.tsx b/assets/js/collaborative-editor/components/Header.tsx index b644e9a6305..b2dd5f712a8 100644 --- a/assets/js/collaborative-editor/components/Header.tsx +++ b/assets/js/collaborative-editor/components/Header.tsx @@ -229,6 +229,8 @@ export function Header({ // When ?v= is present, user is viewing a specific version (even if latest) const isPinnedVersion = params['v'] !== undefined && params['v'] !== null; + const showChangeIndicator = hasChanges && canSave && !isNewWorkflow; + const handleRunClick = useCallback(() => { if (firstTriggerId) { // select the first trigger @@ -419,7 +421,7 @@ export function Header({ label={isNewWorkflow ? 'Create' : 'Save'} canSync={githubSyncLimit.allowed} syncTooltipMessage={githubSyncLimit.message} - hasChanges={hasChanges && canSave && !isNewWorkflow} + hasChanges={showChangeIndicator} /> diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index 432d2426dde..52667f3c6e3 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -12,7 +12,10 @@ export function useUnsavedChanges() { positions: state.positions || {}, name: state.workflow?.name, })); - if (!workflow || !storeWorkflow) return false; + if (!workflow || !storeWorkflow) + return { + hasChanges: false, + }; return { hasChanges: isDiffWorkflow( transformWorkflow(workflow || {}), From 45f76b175d3f8cd07b78e5930e5191f2ac4f12c7 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Thu, 8 Jan 2026 08:37:00 +0000 Subject: [PATCH 11/31] tests: update tests --- .../components/Header.tsx | 131 +++--- .../hooks/useUnsavedChanges.ts | 42 +- .../stores/createSessionContextStore.ts | 1 + .../types/sessionContext.ts | 8 +- .../__helpers__/sessionContextFactory.ts | 2 + .../CollaborativeEditor.keyboard.test.tsx | 37 ++ .../components/Header.test.tsx | 219 ++++++++- .../hooks/useUnsavedChanges.test.tsx | 421 ++++++++++++++++++ 8 files changed, 775 insertions(+), 86 deletions(-) create mode 100644 assets/test/collaborative-editor/hooks/useUnsavedChanges.test.tsx diff --git a/assets/js/collaborative-editor/components/Header.tsx b/assets/js/collaborative-editor/components/Header.tsx index b2dd5f712a8..ced96645357 100644 --- a/assets/js/collaborative-editor/components/Header.tsx +++ b/assets/js/collaborative-editor/components/Header.tsx @@ -97,84 +97,95 @@ export function SaveButton({ {hasChanges ? ( -
+
) : null} ); } return ( -
- : tooltipMessage - } - side="bottom" - > - - - - + {label} + + + + - Open sync options - - - Open sync options + + + - - - ) : !canSync && syncTooltipMessage ? ( - syncTooltipMessage - ) : ( - tooltipMessage - ) - } - side="bottom" - > - - - - - + > + Save & Sync + + + + + +
+ {hasChanges ? ( +
+ ) : null} ); } diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index 52667f3c6e3..7aef0cc6a4f 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -12,19 +12,18 @@ export function useUnsavedChanges() { positions: state.positions || {}, name: state.workflow?.name, })); - if (!workflow || !storeWorkflow) - return { - hasChanges: false, - }; + + if (!workflow || !storeWorkflow) return { hasChanges: false }; + return { hasChanges: isDiffWorkflow( - transformWorkflow(workflow || {}), - transformWorkflow((storeWorkflow as Workflow) || {}) + transformWorkflow(workflow), + transformWorkflow(storeWorkflow as Workflow) ), }; } -// we need the same structure of workflows to be able to compare +// transform workflow to normalized structure for comparison function transformWorkflow(workflow: Workflow) { return { name: workflow.name, @@ -50,7 +49,7 @@ function transformWorkflow(workflow: Workflow) { condition_expression: edge.condition_expression, })) .sort((a, b) => a.id.localeCompare(b.id)), - trigger: (workflow.triggers || []).map(trigger => ({ + triggers: (workflow.triggers || []).map(trigger => ({ id: trigger.id, type: trigger.type, enabled: trigger.enabled, @@ -60,22 +59,31 @@ function transformWorkflow(workflow: Workflow) { }; } +// deep comparison to detect workflow changes function isDiffWorkflow(base: unknown, target: unknown): boolean { - const undef = [undefined, null, '']; - // @ts-expect-error - if (undef.includes(base) && undef.includes(target)) return false; + const isNullish = (v: unknown) => v === undefined || v === null || v === ''; + if (isNullish(base) && isNullish(target)) return false; if (typeof base !== typeof target) return true; + if (Array.isArray(base) && Array.isArray(target)) { - if (base.length !== target.length) return true; - return base.some((v, i) => isDiffWorkflow(v, target[i])); - } else if ( + return ( + base.length !== target.length || + base.some((v, i) => isDiffWorkflow(v, target[i])) + ); + } + + if ( base && target && typeof base === 'object' && typeof target === 'object' ) { - return Object.keys(base).some(k => isDiffWorkflow(base[k], target[k])); - } else { - return target !== base; + const baseObj = base as Record; + const targetObj = target as Record; + return Object.keys(baseObj).some(k => + isDiffWorkflow(baseObj[k], targetObj[k]) + ); } + + return base !== target; } diff --git a/assets/js/collaborative-editor/stores/createSessionContextStore.ts b/assets/js/collaborative-editor/stores/createSessionContextStore.ts index 1468e0b8fd6..aed8dd0d47b 100644 --- a/assets/js/collaborative-editor/stores/createSessionContextStore.ts +++ b/assets/js/collaborative-editor/stores/createSessionContextStore.ts @@ -126,6 +126,7 @@ export const createSessionContextStore = ( isLoading: false, error: null, lastUpdated: null, + workflow: null, } as SessionContextState, // No initial transformations needed draft => draft diff --git a/assets/js/collaborative-editor/types/sessionContext.ts b/assets/js/collaborative-editor/types/sessionContext.ts index a880c965948..b26805a518d 100644 --- a/assets/js/collaborative-editor/types/sessionContext.ts +++ b/assets/js/collaborative-editor/types/sessionContext.ts @@ -106,13 +106,7 @@ export const SessionContextResponseSchema = z.object({ workflow_template: WorkflowTemplateSchema.nullable(), has_read_ai_disclaimer: z.boolean(), limits: LimitsSchema.optional(), - workflow: z.object({ - jobs: z.array(JobSchema), - triggers: z.array(TriggerSchema), - edges: z.array(EdgeSchema), - positions: z.record(z.string(), z.any()).optional().default({}), - name: z.string(), - }), // to be fixed + workflow: z.any(), // to be fixed }); export type UserContext = z.infer; diff --git a/assets/test/collaborative-editor/__helpers__/sessionContextFactory.ts b/assets/test/collaborative-editor/__helpers__/sessionContextFactory.ts index df417362a32..3b8e143c16d 100644 --- a/assets/test/collaborative-editor/__helpers__/sessionContextFactory.ts +++ b/assets/test/collaborative-editor/__helpers__/sessionContextFactory.ts @@ -289,6 +289,7 @@ export interface CreateSessionContextOptions { workflow_template?: WorkflowTemplate | null; has_read_ai_disclaimer?: boolean; limits?: Partial; + workflow?: any | null; } /** @@ -397,6 +398,7 @@ export function createSessionContext( webhook_auth_methods: options.webhook_auth_methods ?? [], workflow_template: options.workflow_template ?? null, has_read_ai_disclaimer: options.has_read_ai_disclaimer ?? true, + workflow: options.workflow, }; // Only add limits if provided diff --git a/assets/test/collaborative-editor/components/CollaborativeEditor.keyboard.test.tsx b/assets/test/collaborative-editor/components/CollaborativeEditor.keyboard.test.tsx index 619f39935e6..5e44c4e703a 100644 --- a/assets/test/collaborative-editor/components/CollaborativeEditor.keyboard.test.tsx +++ b/assets/test/collaborative-editor/components/CollaborativeEditor.keyboard.test.tsx @@ -226,6 +226,43 @@ vi.mock('../../../js/collaborative-editor/hooks/useSessionContext', () => ({ workflow_activation: { allowed: true, message: null }, github_sync: { allowed: true, message: null }, }), + useSessionContext: () => ({ + workflow: { + jobs: [ + { + id: '5887a56d-19b0-452f-8891-9c7a72116325', + body: '// Validate and transform the data you\'ve received...\nfn(state => {\n console.log("Do some data transformation here");\n return state;\n})\n', + name: 'Transform data', + adaptor: '@openfn/language-common@3.2.1', + }, + ], + triggers: [ + { + id: '000311a3-f84c-4990-8a74-824ccf0e0561', + comment: null, + custom_path: null, + cron_expression: null, + type: 'webhook', + enabled: true, + webhook_reply: 'before_start', + }, + ], + edges: [ + { + id: '772ef8d2-f5e9-4437-81e8-ca0c68c8315b', + condition_type: 'always', + condition_expression: null, + condition_label: null, + enabled: true, + source_job_id: null, + source_trigger_id: '000311a3-f84c-4990-8a74-824ccf0e0561', + target_job_id: '5887a56d-19b0-452f-8891-9c7a72116325', + }, + ], + name: 'simple-worfklow', + positions: {}, + }, + }), })); // Mock workflow hooks with controllable node selection diff --git a/assets/test/collaborative-editor/components/Header.test.tsx b/assets/test/collaborative-editor/components/Header.test.tsx index edcbc957889..7db1ec3bf14 100644 --- a/assets/test/collaborative-editor/components/Header.test.tsx +++ b/assets/test/collaborative-editor/components/Header.test.tsx @@ -16,9 +16,10 @@ import { SessionContext } from '../../../js/collaborative-editor/contexts/Sessio import { StoreContext } from '../../../js/collaborative-editor/contexts/StoreProvider'; import { KeyboardProvider } from '../../../js/collaborative-editor/keyboard'; +import { triggerProviderSync } from '../__helpers__/sessionStoreHelpers'; +import type { CreateSessionContextOptions } from '../__helpers__/sessionContextFactory'; import { simulateStoreProviderWithConnection } from '../__helpers__/storeProviderHelpers'; import { createMinimalWorkflowYDoc } from '../__helpers__/workflowStoreHelpers'; -import type { CreateSessionContextOptions } from '../__helpers__/sessionContextFactory'; import { createSessionContextStore } from '../../../js/collaborative-editor/stores/createSessionContextStore'; // ============================================================================= @@ -79,10 +80,18 @@ async function createTestSetup(options: WrapperOptions = {}) { workflowMap.set('deleted_at', workflowDeletedAt); } - // Build session context options + // Build session context options including workflow data const sessionContextOptions: CreateSessionContextOptions = { permissions, latest_snapshot_lock_version: latestSnapshotLockVersion, + workflow: { + id: 'test-workflow-123', + name: 'Test Workflow', + jobs: [], + triggers: [], + edges: [], + positions: {}, + }, }; if (hasGithubConnection) { @@ -113,6 +122,9 @@ async function createTestSetup(options: WrapperOptions = {}) { } ); + // Trigger provider sync to enable save functionality + triggerProviderSync(sessionStore, true); + // For new workflows, replace sessionContextStore with one that has isNewWorkflow=true // This is a limitation of the current helper design. if (isNewWorkflow) { @@ -945,6 +957,209 @@ describe('Header - Run Button Tooltip with Panel State', () => { }); }); +// ============================================================================= +// UNSAVED CHANGES INDICATOR TESTS +// ============================================================================= + +describe('Header - Unsaved Changes Indicator', () => { + test('shows red dot when workflow has unsaved changes', async () => { + const { wrapper, emitSessionContext, ydoc } = await createTestSetup(); + + // Modify Y.Doc to have a different name than session context + const workflowMap = ydoc!.getMap('workflow'); + workflowMap.set('name', 'Modified Workflow Name'); + + const { container } = render( +
+ {[Breadcrumb]} +
, + { wrapper } + ); + + // Emit session context with original name + await act(async () => { + emitSessionContext(); + await new Promise(resolve => setTimeout(resolve, 150)); + }); + + // Should show red dot because Y.Doc has "Modified Workflow Name" but session has "Test Workflow" + await waitFor( + () => { + const redDot = container.querySelector('[data-is-dirty]'); + expect(redDot).toBeInTheDocument(); + expect(redDot).toHaveClass('rounded-full'); + }, + { timeout: 3000 } + ); + }); + + test('hides red dot when no changes present', async () => { + const { wrapper, emitSessionContext } = await createTestSetup(); + + const { container } = render( +
+ {[Breadcrumb]} +
, + { wrapper } + ); + + await act(async () => { + emitSessionContext(); + await new Promise(resolve => setTimeout(resolve, 150)); + }); + + // No changes should mean no red dot + expect(container.querySelector('[data-is-dirty]')).not.toBeInTheDocument(); + }); + + test('does not show red dot for new workflows', async () => { + const { wrapper, emitSessionContext, ydoc } = await createTestSetup({ + isNewWorkflow: true, + }); + + const { container } = render( +
+ {[Breadcrumb]} +
, + { wrapper } + ); + + await act(async () => { + emitSessionContext(); + await new Promise(resolve => setTimeout(resolve, 150)); + }); + + // Make changes to workflow + await act(async () => { + const workflowMap = ydoc.getMap('workflow'); + workflowMap.set('name', 'New Workflow'); + }); + + // Should not show red dot for new workflows + await new Promise(resolve => setTimeout(resolve, 100)); + expect(container.querySelector('[data-is-dirty]')).not.toBeInTheDocument(); + }); + + test('does not show red dot when save is disabled', async () => { + const { wrapper, emitSessionContext, ydoc } = await createTestSetup({ + permissions: { can_edit_workflow: false }, + }); + + const { container } = render( +
+ {[Breadcrumb]} +
, + { wrapper } + ); + + await act(async () => { + emitSessionContext(); + await new Promise(resolve => setTimeout(resolve, 150)); + }); + + // Make changes to workflow + await act(async () => { + const workflowMap = ydoc.getMap('workflow'); + workflowMap.set('name', 'Modified Name'); + }); + + // Should not show red dot when user cannot save + await new Promise(resolve => setTimeout(resolve, 100)); + expect(container.querySelector('[data-is-dirty]')).not.toBeInTheDocument(); + }); + + test('red dot is positioned correctly on save button', async () => { + const { wrapper, emitSessionContext, ydoc } = await createTestSetup(); + + const { container } = render( +
+ {[Breadcrumb]} +
, + { wrapper } + ); + + await act(async () => { + emitSessionContext(); + await new Promise(resolve => setTimeout(resolve, 150)); + }); + + // Modify workflow + await act(async () => { + const workflowMap = ydoc.getMap('workflow'); + workflowMap.set('name', 'Modified'); + }); + + await waitFor(() => { + const redDot = container.querySelector('[data-is-dirty]'); + expect(redDot).toBeInTheDocument(); + // Verify positioning classes + expect(redDot).toHaveClass('absolute'); + expect(redDot).toHaveClass('top-0'); + expect(redDot).toHaveClass('right-0'); + expect(redDot).toHaveClass('z-10'); + }); + }); + + test('red dot appears on split button when GitHub connected', async () => { + const { wrapper, emitSessionContext, ydoc } = await createTestSetup({ + hasGithubConnection: true, + }); + + // Modify Y.Doc to have a different name than session context + const workflowMap = ydoc!.getMap('workflow'); + workflowMap.set('name', 'Modified'); + + const { container } = render( +
+ {[Breadcrumb]} +
, + { wrapper } + ); + + // Emit session context with original name + await act(async () => { + emitSessionContext(); + await new Promise(resolve => setTimeout(resolve, 150)); + }); + + // Should show red dot on split button + await waitFor(() => { + const redDot = container.querySelector('[data-is-dirty]'); + expect(redDot).toBeInTheDocument(); + }); + }); + + test('red dot disappears after workflow is saved', async () => { + const { wrapper, emitSessionContext, ydoc } = await createTestSetup(); + + const { container } = render( +
+ {[Breadcrumb]} +
, + { wrapper } + ); + + await act(async () => { + emitSessionContext(); + await new Promise(resolve => setTimeout(resolve, 150)); + }); + + // Modify workflow + await act(async () => { + const workflowMap = ydoc.getMap('workflow'); + workflowMap.set('name', 'Modified'); + }); + + await waitFor(() => { + expect(container.querySelector('[data-is-dirty]')).toBeInTheDocument(); + }); + + // Simulate save by updating session context workflow + // This would normally happen via workflow_saved channel event + // For now, we verify the indicator shows correctly + }); +}); + // ============================================================================= // KEYBOARD SHORTCUT TESTS // ============================================================================= diff --git a/assets/test/collaborative-editor/hooks/useUnsavedChanges.test.tsx b/assets/test/collaborative-editor/hooks/useUnsavedChanges.test.tsx new file mode 100644 index 00000000000..1427c551e9b --- /dev/null +++ b/assets/test/collaborative-editor/hooks/useUnsavedChanges.test.tsx @@ -0,0 +1,421 @@ +/** + * useUnsavedChanges Hook Tests + * + * Tests for detecting unsaved changes by comparing the workflow state + * from SessionContext (server snapshot) with WorkflowStore (local edits). + */ + +import { renderHook, waitFor } from '@testing-library/react'; +import type React from 'react'; +import { beforeEach, describe, expect, test } from 'vitest'; +import * as Y from 'yjs'; + +import type { StoreContextValue } from '../../../js/collaborative-editor/contexts/StoreProvider'; +import { StoreContext } from '../../../js/collaborative-editor/contexts/StoreProvider'; +import { useUnsavedChanges } from '../../../js/collaborative-editor/hooks/useUnsavedChanges'; +import type { SessionContextStoreInstance } from '../../../js/collaborative-editor/stores/createSessionContextStore'; +import { createSessionContextStore } from '../../../js/collaborative-editor/stores/createSessionContextStore'; +import type { WorkflowStoreInstance } from '../../../js/collaborative-editor/stores/createWorkflowStore'; +import { + createEmptyWorkflowYDoc, + createMinimalWorkflowYDoc, + setupWorkflowStoreTest, +} from '../__helpers__'; +import { createSessionContext } from '../__helpers__/sessionContextFactory'; +import { + createMockPhoenixChannel, + createMockPhoenixChannelProvider, +} from '../mocks/phoenixChannel'; + +function createWrapper( + sessionContextStore: SessionContextStoreInstance, + workflowStore: WorkflowStoreInstance +): React.ComponentType<{ children: React.ReactNode }> { + const mockStoreValue: StoreContextValue = { + sessionContextStore, + workflowStore, + adaptorStore: {} as any, + credentialStore: {} as any, + awarenessStore: {} as any, + }; + + return ({ children }: { children: React.ReactNode }) => ( + + {children} + + ); +} + +describe('useUnsavedChanges - Basic Detection', () => { + let sessionContextStore: SessionContextStoreInstance; + + beforeEach(() => { + sessionContextStore = createSessionContextStore(); + }); + + test('returns false when no workflow loaded', () => { + const { store, ydoc, cleanup } = setupWorkflowStoreTest( + createEmptyWorkflowYDoc() + ); + + const { result } = renderHook(() => useUnsavedChanges(), { + wrapper: createWrapper(sessionContextStore, store), + }); + + expect(result.current.hasChanges).toBe(false); + + cleanup(); + }); + + test('returns false when workflow matches saved state', async () => { + const { store, ydoc, cleanup } = setupWorkflowStoreTest( + createEmptyWorkflowYDoc() + ); + + const mockChannel = createMockPhoenixChannel(); + const mockProvider = createMockPhoenixChannelProvider(mockChannel); + + const jobId = '770e8400-e29b-41d4-a716-446655440000'; + + // Set up workflow in Y.Doc with matching data + const workflowMap = ydoc.getMap('workflow'); + workflowMap.set('name', 'Test Workflow'); + + const jobsArray = ydoc.getArray('jobs'); + const jobMap = new Y.Map(); + jobMap.set('id', jobId); + jobMap.set('name', 'Job 1'); + jobMap.set('body', 'fn(state => state)'); + jobMap.set('adaptor', '@openfn/language-common@latest'); + jobMap.set('project_credential_id', null); + jobMap.set('keychain_credential_id', null); + jobsArray.push([jobMap]); + + // Set up matching session context + mockChannel.push = (_event: string, _payload: unknown) => { + return { + receive: (status: string, callback: (response?: unknown) => void) => { + if (status === 'ok') { + setTimeout(() => { + callback( + createSessionContext({ + workflow: { + name: 'Test Workflow', + jobs: [ + { + id: jobId, + name: 'Job 1', + body: 'fn(state => state)', + adaptor: '@openfn/language-common@latest', + project_credential_id: null, + keychain_credential_id: null, + }, + ], + triggers: [], + edges: [], + positions: {}, + }, + }) + ); + }, 0); + } + return { + receive: () => ({ + receive: () => ({ receive: () => ({}) }), + }), + }; + }, + }; + }; + + sessionContextStore._connectChannel(mockProvider); + + const { result } = renderHook(() => useUnsavedChanges(), { + wrapper: createWrapper(sessionContextStore, store), + }); + + await waitFor(() => { + expect(result.current.hasChanges).toBe(false); + }); + + cleanup(); + }); + + test('detects changes when job body changes', async () => { + const { store, ydoc, cleanup } = setupWorkflowStoreTest( + createEmptyWorkflowYDoc() + ); + + const mockChannel = createMockPhoenixChannel(); + const mockProvider = createMockPhoenixChannelProvider(mockChannel); + + const jobId = '770e8400-e29b-41d4-a716-446655440000'; + + // Set up initial workflow in session context + mockChannel.push = (_event: string, _payload: unknown) => { + return { + receive: (status: string, callback: (response?: unknown) => void) => { + if (status === 'ok') { + setTimeout(() => { + callback( + createSessionContext({ + workflow: { + name: 'Test Workflow', + jobs: [ + { + id: jobId, + name: 'Job 1', + body: 'fn(state => state)', + adaptor: '@openfn/language-common@latest', + project_credential_id: null, + keychain_credential_id: null, + }, + ], + triggers: [], + edges: [], + positions: {}, + }, + }) + ); + }, 0); + } + return { + receive: () => ({ receive: () => ({ receive: () => ({}) }) }), + }; + }, + }; + }; + + sessionContextStore._connectChannel(mockProvider); + + await waitFor(() => sessionContextStore.getSnapshot().workflow !== null, { + timeout: 1000, + }); + + // Modify workflow in Y.Doc with different body + const workflowMap = ydoc.getMap('workflow'); + workflowMap.set('name', 'Test Workflow'); + + const jobsArray = ydoc.getArray('jobs'); + const jobMap = new Y.Map(); + jobMap.set('id', jobId); + jobMap.set('name', 'Job 1'); + jobMap.set( + 'body', + 'fn(state => { console.log("modified"); return state; })' + ); + jobMap.set('adaptor', '@openfn/language-common@latest'); + jobMap.set('project_credential_id', null); + jobMap.set('keychain_credential_id', null); + jobsArray.push([jobMap]); + + const { result } = renderHook(() => useUnsavedChanges(), { + wrapper: createWrapper(sessionContextStore, store), + }); + + await waitFor(() => { + expect(result.current.hasChanges).toBe(true); + }); + + cleanup(); + }); + + test('detects changes when workflow name changes', async () => { + const { store, ydoc, cleanup } = setupWorkflowStoreTest( + createEmptyWorkflowYDoc() + ); + + const mockChannel = createMockPhoenixChannel(); + const mockProvider = createMockPhoenixChannelProvider(mockChannel); + + mockChannel.push = (_event: string, _payload: unknown) => { + return { + receive: (status: string, callback: (response?: unknown) => void) => { + if (status === 'ok') { + setTimeout(() => { + callback( + createSessionContext({ + workflow: { + name: 'Original Name', + jobs: [], + triggers: [], + edges: [], + positions: {}, + }, + }) + ); + }, 0); + } + return { + receive: () => ({ receive: () => ({ receive: () => ({}) }) }), + }; + }, + }; + }; + + sessionContextStore._connectChannel(mockProvider); + + await waitFor(() => sessionContextStore.getSnapshot().workflow !== null, { + timeout: 1000, + }); + + // Change name in Y.Doc + const workflowMap = ydoc.getMap('workflow'); + workflowMap.set('name', 'Modified Name'); + + const { result } = renderHook(() => useUnsavedChanges(), { + wrapper: createWrapper(sessionContextStore, store), + }); + + await waitFor(() => { + expect(result.current.hasChanges).toBe(true); + }); + + cleanup(); + }); +}); + +describe('useUnsavedChanges - Edge Cases', () => { + let sessionContextStore: SessionContextStoreInstance; + + beforeEach(() => { + sessionContextStore = createSessionContextStore(); + }); + + test('handles null and undefined values correctly', async () => { + const { store, ydoc, cleanup } = setupWorkflowStoreTest( + createMinimalWorkflowYDoc() + ); + const mockChannel = createMockPhoenixChannel(); + const mockProvider = createMockPhoenixChannelProvider(mockChannel); + + const jobId = '770e8400-e29b-41d4-a716-446655440000'; + + mockChannel.push = (_event: string, _payload: unknown) => { + return { + receive: (status: string, callback: (response?: unknown) => void) => { + if (status === 'ok') { + setTimeout(() => { + callback( + createSessionContext({ + workflow: { + name: 'Test', + jobs: [ + { + id: jobId, + name: 'Job 1', + body: '', + adaptor: '@openfn/language-common@latest', + project_credential_id: null, + keychain_credential_id: null, + }, + ], + triggers: [], + edges: [], + positions: {}, + }, + }) + ); + }, 0); + } + return { + receive: () => ({ receive: () => ({ receive: () => ({}) }) }), + }; + }, + }; + }; + + sessionContextStore._connectChannel(mockProvider); + + await waitFor(() => sessionContextStore.getSnapshot().workflow !== null, { + timeout: 1000, + }); + + // Set up Y.Doc with matching undefined/null credential values + const workflowMap = ydoc.getMap('workflow'); + workflowMap.set('name', 'Test'); + + const jobsArray = ydoc.getArray('jobs'); + const jobMap = new Y.Map(); + jobMap.set('id', jobId); + jobMap.set('name', 'Job 1'); + jobMap.set('body', ''); + jobMap.set('adaptor', '@openfn/language-common@latest'); + jobMap.set('project_credential_id', null); + jobMap.set('keychain_credential_id', null); + jobsArray.push([jobMap]); + const { result } = renderHook(() => useUnsavedChanges(), { + wrapper: createWrapper(sessionContextStore, store), + }); + + await waitFor(() => { + expect(result.current.hasChanges).toBe(false); + }); + + cleanup(); + }); + + test('detects changes when job is added', async () => { + const { store, ydoc, cleanup } = setupWorkflowStoreTest( + createEmptyWorkflowYDoc() + ); + + const mockChannel = createMockPhoenixChannel(); + const mockProvider = createMockPhoenixChannelProvider(mockChannel); + + mockChannel.push = (_event: string, _payload: unknown) => { + return { + receive: (status: string, callback: (response?: unknown) => void) => { + if (status === 'ok') { + setTimeout(() => { + callback( + createSessionContext({ + workflow: { + name: 'Test', + jobs: [], + triggers: [], + edges: [], + positions: {}, + }, + }) + ); + }, 0); + } + return { + receive: () => ({ receive: () => ({ receive: () => ({}) }) }), + }; + }, + }; + }; + + sessionContextStore._connectChannel(mockProvider); + + await waitFor(() => sessionContextStore.getSnapshot().workflow !== null, { + timeout: 1000, + }); + + // Add job to Y.Doc + const workflowMap = ydoc.getMap('workflow'); + workflowMap.set('name', 'Test'); + + const jobsArray = ydoc.getArray('jobs'); + const jobMap = new Y.Map(); + jobMap.set('id', '770e8400-e29b-41d4-a716-446655440000'); + jobMap.set('name', 'New Job'); + jobMap.set('body', 'fn(state => state)'); + jobMap.set('adaptor', '@openfn/language-common@latest'); + jobMap.set('project_credential_id', null); + jobMap.set('keychain_credential_id', null); + jobsArray.push([jobMap]); + + const { result } = renderHook(() => useUnsavedChanges(), { + wrapper: createWrapper(sessionContextStore, store), + }); + + await waitFor(() => { + expect(result.current.hasChanges).toBe(true); + }); + + cleanup(); + }); +}); From af1fd4abe4593bd0816d0184f989b0b71da9e3ba Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Thu, 8 Jan 2026 08:43:54 +0000 Subject: [PATCH 12/31] chore: simplify expression --- lib/lightning_web/channels/workflow_channel.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/lightning_web/channels/workflow_channel.ex b/lib/lightning_web/channels/workflow_channel.ex index ed75c6b3a14..be20136a2f0 100644 --- a/lib/lightning_web/channels/workflow_channel.ex +++ b/lib/lightning_web/channels/workflow_channel.ex @@ -217,7 +217,7 @@ defmodule LightningWeb.WorkflowChannel do has_read_ai_disclaimer: Lightning.AiAssistant.user_has_read_disclaimer?(user), limits: render_limits(project.id), - workflow: (fresh_workflow && fresh_workflow) || %{} + workflow: fresh_workflow || %{} } end) end From dd07f9409f03415ec5c4d7e4b34a6a061d2f8eb7 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Thu, 8 Jan 2026 09:02:35 +0000 Subject: [PATCH 13/31] test: resolve failing test --- .../components/Header.test.tsx | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/assets/test/collaborative-editor/components/Header.test.tsx b/assets/test/collaborative-editor/components/Header.test.tsx index 7db1ec3bf14..6f97d0cb4f2 100644 --- a/assets/test/collaborative-editor/components/Header.test.tsx +++ b/assets/test/collaborative-editor/components/Header.test.tsx @@ -44,6 +44,7 @@ interface WrapperOptions { hasGithubConnection?: boolean; repoName?: string; branchName?: string; + triggerSync?: boolean; } /** @@ -122,8 +123,10 @@ async function createTestSetup(options: WrapperOptions = {}) { } ); - // Trigger provider sync to enable save functionality - triggerProviderSync(sessionStore, true); + if (options.triggerSync) { + // Trigger provider sync to enable save functionality + triggerProviderSync(sessionStore, true); + } // For new workflows, replace sessionContextStore with one that has isNewWorkflow=true // This is a limitation of the current helper design. @@ -963,7 +966,9 @@ describe('Header - Run Button Tooltip with Panel State', () => { describe('Header - Unsaved Changes Indicator', () => { test('shows red dot when workflow has unsaved changes', async () => { - const { wrapper, emitSessionContext, ydoc } = await createTestSetup(); + const { wrapper, emitSessionContext, ydoc } = await createTestSetup({ + triggerSync: true, + }); // Modify Y.Doc to have a different name than session context const workflowMap = ydoc!.getMap('workflow'); @@ -994,7 +999,9 @@ describe('Header - Unsaved Changes Indicator', () => { }); test('hides red dot when no changes present', async () => { - const { wrapper, emitSessionContext } = await createTestSetup(); + const { wrapper, emitSessionContext } = await createTestSetup({ + triggerSync: true, + }); const { container } = render(
@@ -1015,6 +1022,7 @@ describe('Header - Unsaved Changes Indicator', () => { test('does not show red dot for new workflows', async () => { const { wrapper, emitSessionContext, ydoc } = await createTestSetup({ isNewWorkflow: true, + triggerSync: true, }); const { container } = render( @@ -1043,6 +1051,7 @@ describe('Header - Unsaved Changes Indicator', () => { test('does not show red dot when save is disabled', async () => { const { wrapper, emitSessionContext, ydoc } = await createTestSetup({ permissions: { can_edit_workflow: false }, + triggerSync: true, }); const { container } = render( @@ -1069,7 +1078,9 @@ describe('Header - Unsaved Changes Indicator', () => { }); test('red dot is positioned correctly on save button', async () => { - const { wrapper, emitSessionContext, ydoc } = await createTestSetup(); + const { wrapper, emitSessionContext, ydoc } = await createTestSetup({ + triggerSync: true, + }); const { container } = render(
@@ -1103,6 +1114,7 @@ describe('Header - Unsaved Changes Indicator', () => { test('red dot appears on split button when GitHub connected', async () => { const { wrapper, emitSessionContext, ydoc } = await createTestSetup({ hasGithubConnection: true, + triggerSync: true, }); // Modify Y.Doc to have a different name than session context @@ -1130,7 +1142,9 @@ describe('Header - Unsaved Changes Indicator', () => { }); test('red dot disappears after workflow is saved', async () => { - const { wrapper, emitSessionContext, ydoc } = await createTestSetup(); + const { wrapper, emitSessionContext, ydoc } = await createTestSetup({ + triggerSync: true, + }); const { container } = render(
From 1a718f7f26d853f873ec947209938722cb7c98a4 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Thu, 8 Jan 2026 09:05:18 +0000 Subject: [PATCH 14/31] chore: update changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 76332c01a2f..f5cac7d44d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,9 @@ and this project adheres to ### Added +- Add Unsaved Changes Indicator + [#3682](https://github.com/OpenFn/lightning/issues/3682) + ### Changed ### Fixed From 3497712c784cd567382934af12fe399e86afd151 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Thu, 8 Jan 2026 09:32:18 +0000 Subject: [PATCH 15/31] feat: trim all user inputs --- .../hooks/useUnsavedChanges.ts | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index 7aef0cc6a4f..8d9ac437942 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -14,7 +14,6 @@ export function useUnsavedChanges() { })); if (!workflow || !storeWorkflow) return { hasChanges: false }; - return { hasChanges: isDiffWorkflow( transformWorkflow(workflow), @@ -30,8 +29,8 @@ function transformWorkflow(workflow: Workflow) { jobs: (workflow.jobs || []) .map(job => ({ id: job.id, - name: job.name, - body: job.body, + name: job.name.trim(), + body: job.body.trim(), adaptor: job.adaptor, project_credential_id: job.project_credential_id, keychain_credential_id: job.keychain_credential_id, @@ -45,17 +44,17 @@ function transformWorkflow(workflow: Workflow) { target_job_id: edge.target_job_id, enabled: edge.enabled || false, condition_type: edge.condition_type, - condition_label: edge.condition_label, - condition_expression: edge.condition_expression, + condition_label: edge.condition_label?.trim(), + condition_expression: edge.condition_expression?.trim(), })) .sort((a, b) => a.id.localeCompare(b.id)), triggers: (workflow.triggers || []).map(trigger => ({ id: trigger.id, type: trigger.type, enabled: trigger.enabled, - cron_expression: trigger.cron_expression, + cron_expression: trigger.cron_expression?.trim(), })), - positions: workflow.positions || {}, + // positions: workflow.positions || {}, }; } @@ -80,9 +79,10 @@ function isDiffWorkflow(base: unknown, target: unknown): boolean { ) { const baseObj = base as Record; const targetObj = target as Record; - return Object.keys(baseObj).some(k => - isDiffWorkflow(baseObj[k], targetObj[k]) - ); + const keys = [ + ...new Set(Object.keys(baseObj).concat(Object.keys(targetObj))), + ]; + return keys.some(k => isDiffWorkflow(baseObj[k], targetObj[k])); } return base !== target; From 4d4826a425fea23b2779f1185f4daa5d8e33c856 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Thu, 8 Jan 2026 10:14:30 +0000 Subject: [PATCH 16/31] feat: support positions in changes --- .../hooks/useUnsavedChanges.ts | 31 ++++++++++++++----- lib/lightning/workflows/workflow.ex | 1 + 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index 8d9ac437942..1874472308a 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -1,3 +1,4 @@ +import type { Trigger } from '../types/trigger'; import type { Workflow } from '../types/workflow'; import { useSessionContext } from './useSessionContext'; @@ -48,16 +49,32 @@ function transformWorkflow(workflow: Workflow) { condition_expression: edge.condition_expression?.trim(), })) .sort((a, b) => a.id.localeCompare(b.id)), - triggers: (workflow.triggers || []).map(trigger => ({ - id: trigger.id, - type: trigger.type, - enabled: trigger.enabled, - cron_expression: trigger.cron_expression?.trim(), - })), - // positions: workflow.positions || {}, + triggers: (workflow.triggers || []).map(trigger => + transformTrigger(trigger) + ), + positions: workflow.positions || {}, }; } +function transformTrigger(trigger: Trigger) { + const output: Partial = { + id: trigger.id, + type: trigger.type, + enabled: trigger.enabled, + }; + switch (trigger.type) { + case 'cron': + output.cron_expression = trigger.cron_expression; + break; + case 'kafka': + output.kafka_configuration = trigger.kafka_configuration; + break; + case 'webhook': + break; + } + return output; +} + // deep comparison to detect workflow changes function isDiffWorkflow(base: unknown, target: unknown): boolean { const isNullish = (v: unknown) => v === undefined || v === null || v === ''; diff --git a/lib/lightning/workflows/workflow.ex b/lib/lightning/workflows/workflow.ex index 1bc3cc02c40..3bde3c493f0 100644 --- a/lib/lightning/workflows/workflow.ex +++ b/lib/lightning/workflows/workflow.ex @@ -32,6 +32,7 @@ defmodule Lightning.Workflows.Workflow do :edges, :jobs, :triggers, + :positions, :inserted_at, :updated_at ]} From 1ba573d17377a0983c00a7ce594b613bc1ac9026 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Thu, 8 Jan 2026 10:26:26 +0000 Subject: [PATCH 17/31] fix: set default cron expression --- assets/js/collaborative-editor/hooks/useUnsavedChanges.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index 1874472308a..82005e3f65b 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -64,7 +64,7 @@ function transformTrigger(trigger: Trigger) { }; switch (trigger.type) { case 'cron': - output.cron_expression = trigger.cron_expression; + output.cron_expression = trigger.cron_expression ?? '00 00 * * *'; // default cron expression break; case 'kafka': output.kafka_configuration = trigger.kafka_configuration; From 8316dcdeab66f36d97b10b438bd620382ebe7566 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Thu, 8 Jan 2026 10:33:26 +0000 Subject: [PATCH 18/31] fix: default cron value --- .../collaborative-editor/hooks/useUnsavedChanges.ts | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index 82005e3f65b..040e80d2092 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -15,6 +15,11 @@ export function useUnsavedChanges() { })); if (!workflow || !storeWorkflow) return { hasChanges: false }; + console.log( + 'han:diff', + transformWorkflow(workflow), + transformWorkflow(storeWorkflow as Workflow) + ); return { hasChanges: isDiffWorkflow( transformWorkflow(workflow), @@ -64,7 +69,7 @@ function transformTrigger(trigger: Trigger) { }; switch (trigger.type) { case 'cron': - output.cron_expression = trigger.cron_expression ?? '00 00 * * *'; // default cron expression + output.cron_expression = trigger.cron_expression ?? '0 0 * * *'; // default cron expression break; case 'kafka': output.kafka_configuration = trigger.kafka_configuration; @@ -99,7 +104,11 @@ function isDiffWorkflow(base: unknown, target: unknown): boolean { const keys = [ ...new Set(Object.keys(baseObj).concat(Object.keys(targetObj))), ]; - return keys.some(k => isDiffWorkflow(baseObj[k], targetObj[k])); + const resp = keys.some(k => isDiffWorkflow(baseObj[k], targetObj[k])); + if (resp) { + console.log('diff:', baseObj, targetObj); + } + return resp; } return base !== target; From f48b3e1e12eba6933271c84af9b9cae47dcc129e Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Thu, 8 Jan 2026 10:35:53 +0000 Subject: [PATCH 19/31] chore: remove log --- assets/js/collaborative-editor/hooks/useUnsavedChanges.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index 040e80d2092..19481478c33 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -15,11 +15,6 @@ export function useUnsavedChanges() { })); if (!workflow || !storeWorkflow) return { hasChanges: false }; - console.log( - 'han:diff', - transformWorkflow(workflow), - transformWorkflow(storeWorkflow as Workflow) - ); return { hasChanges: isDiffWorkflow( transformWorkflow(workflow), From 991118f95874eb69a68ea8ce7f5ce879444e546c Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Thu, 8 Jan 2026 12:11:01 +0000 Subject: [PATCH 20/31] fix: allow job credentials through encoding --- lib/lightning/workflows/job.ex | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/lightning/workflows/job.ex b/lib/lightning/workflows/job.ex index 38ce055ae44..8470cdf7fc8 100644 --- a/lib/lightning/workflows/job.ex +++ b/lib/lightning/workflows/job.ex @@ -40,7 +40,9 @@ defmodule Lightning.Workflows.Job do :id, :body, :name, - :adaptor + :adaptor, + :project_credential_id, + :keychain_credential_id ]} schema "jobs" do field :body, :string From dbc429c344fd8ed8d214238949cfbe5cc1507090 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Thu, 15 Jan 2026 14:26:44 +0000 Subject: [PATCH 21/31] feat: consider concurrency & jobs log toggle --- .../js/collaborative-editor/hooks/useUnsavedChanges.ts | 10 +++++----- lib/lightning/workflows/workflow.ex | 4 +++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index 19481478c33..168a0cfd97c 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -12,6 +12,8 @@ export function useUnsavedChanges() { edges: state.edges, positions: state.positions || {}, name: state.workflow?.name, + concurrency: state.workflow?.concurrency, + enable_job_logs: state.workflow?.enable_job_logs, })); if (!workflow || !storeWorkflow) return { hasChanges: false }; @@ -53,6 +55,8 @@ function transformWorkflow(workflow: Workflow) { transformTrigger(trigger) ), positions: workflow.positions || {}, + concurrency: workflow.concurrency, + enable_job_logs: workflow.enable_job_logs, }; } @@ -99,11 +103,7 @@ function isDiffWorkflow(base: unknown, target: unknown): boolean { const keys = [ ...new Set(Object.keys(baseObj).concat(Object.keys(targetObj))), ]; - const resp = keys.some(k => isDiffWorkflow(baseObj[k], targetObj[k])); - if (resp) { - console.log('diff:', baseObj, targetObj); - } - return resp; + return keys.some(k => isDiffWorkflow(baseObj[k], targetObj[k])); } return base !== target; diff --git a/lib/lightning/workflows/workflow.ex b/lib/lightning/workflows/workflow.ex index 3bde3c493f0..973faa21905 100644 --- a/lib/lightning/workflows/workflow.ex +++ b/lib/lightning/workflows/workflow.ex @@ -34,7 +34,9 @@ defmodule Lightning.Workflows.Workflow do :triggers, :positions, :inserted_at, - :updated_at + :updated_at, + :concurrency, + :enable_job_logs ]} schema "workflows" do field :name, :string From b16cf24d2937ee19a371e735e7fe3e695a027e3b Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Thu, 15 Jan 2026 14:31:01 +0000 Subject: [PATCH 22/31] chore: resolve linting issue --- assets/js/collaborative-editor/components/Header.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/assets/js/collaborative-editor/components/Header.tsx b/assets/js/collaborative-editor/components/Header.tsx index ced96645357..71c09340327 100644 --- a/assets/js/collaborative-editor/components/Header.tsx +++ b/assets/js/collaborative-editor/components/Header.tsx @@ -17,6 +17,7 @@ import { useTemplatePanel, useUICommands, } from '../hooks/useUI'; +import { useUnsavedChanges } from '../hooks/useUnsavedChanges'; import { useCanRun, useCanSave, @@ -39,7 +40,6 @@ import { NewRunButton } from './NewRunButton'; import { ReadOnlyWarning } from './ReadOnlyWarning'; import { ShortcutKeys } from './ShortcutKeys'; import { Tooltip } from './Tooltip'; -import { useUnsavedChanges } from '../hooks/useUnsavedChanges'; /** * Save button component - visible in React DevTools From 9cf53138b13d58328ec13dae8a23b4bb8bf8fe03 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Thu, 15 Jan 2026 19:02:38 +0000 Subject: [PATCH 23/31] feat: switch back to broadcast_from! and pass workflow in resp --- assets/js/collaborative-editor/hooks/useWorkflow.tsx | 10 ++++++++++ .../stores/createSessionContextStore.ts | 1 + .../stores/createWorkflowStore.ts | 4 ++++ .../js/collaborative-editor/types/sessionContext.ts | 1 + lib/lightning_web/channels/workflow_channel.ex | 11 +++++++---- 5 files changed, 23 insertions(+), 4 deletions(-) diff --git a/assets/js/collaborative-editor/hooks/useWorkflow.tsx b/assets/js/collaborative-editor/hooks/useWorkflow.tsx index d98222ded0b..456bc90800d 100644 --- a/assets/js/collaborative-editor/hooks/useWorkflow.tsx +++ b/assets/js/collaborative-editor/hooks/useWorkflow.tsx @@ -404,6 +404,11 @@ export const useWorkflowActions = () => { ); } + // set base workflow after save + if (response.workflow) { + sessionContextStore.setBaseWorkflow(response.workflow); + } + // Check if this is a new workflow and update URL const currentState = sessionContextStore.getSnapshot(); if (currentState.isNewWorkflow) { @@ -536,6 +541,11 @@ export const useWorkflowActions = () => { ); } + // set workflow base + if (response.workflow) { + sessionContextStore.setBaseWorkflow(response.workflow); + } + // Check if this is a new workflow and update URL const currentState = sessionContextStore.getSnapshot(); if (currentState.isNewWorkflow) { diff --git a/assets/js/collaborative-editor/stores/createSessionContextStore.ts b/assets/js/collaborative-editor/stores/createSessionContextStore.ts index aed8dd0d47b..2044bf3f1f0 100644 --- a/assets/js/collaborative-editor/stores/createSessionContextStore.ts +++ b/assets/js/collaborative-editor/stores/createSessionContextStore.ts @@ -641,6 +641,7 @@ export const createSessionContextStore = ( setHasReadAIDisclaimer, markAIDisclaimerRead, getLimits, + setBaseWorkflow, // Internal methods (not part of public SessionContextStore interface) _connectChannel, diff --git a/assets/js/collaborative-editor/stores/createWorkflowStore.ts b/assets/js/collaborative-editor/stores/createWorkflowStore.ts index 9225a8f482a..7c84bb82bd2 100644 --- a/assets/js/collaborative-editor/stores/createWorkflowStore.ts +++ b/assets/js/collaborative-editor/stores/createWorkflowStore.ts @@ -1376,6 +1376,7 @@ export const createWorkflowStore = () => { const saveWorkflow = async (): Promise<{ saved_at?: string; lock_version?: number; + workflow?: unknown; } | null> => { const { ydoc, provider } = ensureConnected(); @@ -1400,6 +1401,7 @@ export const createWorkflowStore = () => { const response = await channelRequest<{ saved_at: string; lock_version: number; + workflow: unknown; }>(provider.channel, 'save_workflow', payload); logger.debug('Saved workflow', response); @@ -1417,6 +1419,7 @@ export const createWorkflowStore = () => { saved_at?: string; lock_version?: number; repo?: string; + workflow?: unknown; } | null> => { const { ydoc, provider } = ensureConnected(); @@ -1443,6 +1446,7 @@ export const createWorkflowStore = () => { saved_at: string; lock_version: number; repo: string; + workflow: unknown; }>(provider.channel, 'save_and_sync', payload); logger.debug('Saved and synced workflow to GitHub', response); diff --git a/assets/js/collaborative-editor/types/sessionContext.ts b/assets/js/collaborative-editor/types/sessionContext.ts index b26805a518d..6d76ba4f43f 100644 --- a/assets/js/collaborative-editor/types/sessionContext.ts +++ b/assets/js/collaborative-editor/types/sessionContext.ts @@ -144,6 +144,7 @@ interface SessionContextCommands { clearError: () => void; setLatestSnapshotLockVersion: (lockVersion: number) => void; clearIsNewWorkflow: () => void; + setBaseWorkflow: (workflow: unknown) => void; setHasReadAIDisclaimer: (hasRead: boolean) => void; markAIDisclaimerRead: () => Promise; getLimits: ( diff --git a/lib/lightning_web/channels/workflow_channel.ex b/lib/lightning_web/channels/workflow_channel.ex index be20136a2f0..3507711f838 100644 --- a/lib/lightning_web/channels/workflow_channel.ex +++ b/lib/lightning_web/channels/workflow_channel.ex @@ -327,7 +327,7 @@ defmodule LightningWeb.WorkflowChannel do {:ok, workflow} <- Session.save_workflow(session_pid, user) do # Broadcast the new lock_version to all users in the channel # so they can update their latestSnapshotLockVersion in SessionContextStore - broadcast!(socket, "workflow_saved", %{ + broadcast_from!(socket, "workflow_saved", %{ latest_snapshot_lock_version: workflow.lock_version, workflow: workflow }) @@ -336,7 +336,8 @@ defmodule LightningWeb.WorkflowChannel do {:ok, %{ saved_at: workflow.updated_at, - lock_version: workflow.lock_version + lock_version: workflow.lock_version, + workflow: workflow }}, socket} else error -> workflow_error_reply(socket, error) @@ -366,7 +367,8 @@ defmodule LightningWeb.WorkflowChannel do VersionControl.get_repo_connection_for_project(project.id), :ok <- VersionControl.initiate_sync(repo_connection, commit_message) do broadcast_from!(socket, "workflow_saved", %{ - latest_snapshot_lock_version: workflow.lock_version + latest_snapshot_lock_version: workflow.lock_version, + workflow: workflow }) {:reply, @@ -374,7 +376,8 @@ defmodule LightningWeb.WorkflowChannel do %{ saved_at: workflow.updated_at, lock_version: workflow.lock_version, - repo: repo_connection.repo + repo: repo_connection.repo, + workflow: workflow }}, socket} else nil -> From 43117f3119727a4838eb7fff15e2840c2508fca7 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Fri, 16 Jan 2026 02:33:22 +0000 Subject: [PATCH 24/31] chore: add notify call to setBaseWorkflow --- .../js/collaborative-editor/stores/createSessionContextStore.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/assets/js/collaborative-editor/stores/createSessionContextStore.ts b/assets/js/collaborative-editor/stores/createSessionContextStore.ts index 2044bf3f1f0..04a42b4deaf 100644 --- a/assets/js/collaborative-editor/stores/createSessionContextStore.ts +++ b/assets/js/collaborative-editor/stores/createSessionContextStore.ts @@ -299,6 +299,7 @@ export const createSessionContextStore = ( state = produce(state, draft => { draft.workflow = workflow as any; }); + notify('setBaseWorkflow'); }; /** From 38a91aecd8e39f58aee1474d1da1fd8774d3820c Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Fri, 16 Jan 2026 02:41:42 +0000 Subject: [PATCH 25/31] chore: use memoization --- .../hooks/useUnsavedChanges.ts | 62 ++++++++++++++----- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index 168a0cfd97c..4768857ff5f 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -1,3 +1,5 @@ +import { useMemo } from 'react'; + import type { Trigger } from '../types/trigger'; import type { Workflow } from '../types/workflow'; @@ -6,23 +8,51 @@ import { useWorkflowState } from './useWorkflow'; export function useUnsavedChanges() { const { workflow } = useSessionContext(); - const storeWorkflow = useWorkflowState(state => ({ - jobs: state.jobs, - triggers: state.triggers, - edges: state.edges, - positions: state.positions || {}, - name: state.workflow?.name, - concurrency: state.workflow?.concurrency, - enable_job_logs: state.workflow?.enable_job_logs, - })); - if (!workflow || !storeWorkflow) return { hasChanges: false }; - return { - hasChanges: isDiffWorkflow( - transformWorkflow(workflow), - transformWorkflow(storeWorkflow as Workflow) - ), - }; + // Individual selectors - stable references + const jobs = useWorkflowState(state => state.jobs); + const triggers = useWorkflowState(state => state.triggers); + const edges = useWorkflowState(state => state.edges); + const positions = useWorkflowState(state => state.positions); + const name = useWorkflowState(state => state.workflow?.name); + const concurrency = useWorkflowState(state => state.workflow?.concurrency); + const enable_job_logs = useWorkflowState( + state => state.workflow?.enable_job_logs + ); + + // Memoize store workflow object to prevent recreating on every render + const storeWorkflow = useMemo( + () => ({ + jobs, + triggers, + edges, + positions: positions || {}, + name, + concurrency, + enable_job_logs, + }), + [jobs, triggers, edges, positions, name, concurrency, enable_job_logs] + ); + + // Memoize transformed base workflow (from session context) + const transformedBaseWorkflow = useMemo( + () => (workflow ? transformWorkflow(workflow) : null), + [workflow] + ); + + // Memoize transformed store workflow + const transformedStoreWorkflow = useMemo( + () => transformWorkflow(storeWorkflow as Workflow), + [storeWorkflow] + ); + + // Memoize comparison + const hasChanges = useMemo(() => { + if (!transformedBaseWorkflow) return false; + return isDiffWorkflow(transformedBaseWorkflow, transformedStoreWorkflow); + }, [transformedBaseWorkflow, transformedStoreWorkflow]); + + return { hasChanges }; } // transform workflow to normalized structure for comparison From 5d3cab8e1a562d77e708de022b32c7f81ca5871f Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Fri, 16 Jan 2026 08:09:15 +0000 Subject: [PATCH 26/31] tests: resolve --- assets/test/collaborative-editor/components/Header.test.tsx | 2 ++ 1 file changed, 2 insertions(+) diff --git a/assets/test/collaborative-editor/components/Header.test.tsx b/assets/test/collaborative-editor/components/Header.test.tsx index 6f97d0cb4f2..02785f7a882 100644 --- a/assets/test/collaborative-editor/components/Header.test.tsx +++ b/assets/test/collaborative-editor/components/Header.test.tsx @@ -92,6 +92,8 @@ async function createTestSetup(options: WrapperOptions = {}) { triggers: [], edges: [], positions: {}, + concurrency: workflowLockVersion !== null ? null : undefined, + enable_job_logs: workflowLockVersion !== null ? false : undefined, }, }; From c8d7d087ce970af2123b44f50f4b25e96c114f41 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Fri, 16 Jan 2026 08:17:06 +0000 Subject: [PATCH 27/31] test: resolve unsaved changes test --- .../test/collaborative-editor/hooks/useUnsavedChanges.test.tsx | 1 + 1 file changed, 1 insertion(+) diff --git a/assets/test/collaborative-editor/hooks/useUnsavedChanges.test.tsx b/assets/test/collaborative-editor/hooks/useUnsavedChanges.test.tsx index 1427c551e9b..92541c3c8e4 100644 --- a/assets/test/collaborative-editor/hooks/useUnsavedChanges.test.tsx +++ b/assets/test/collaborative-editor/hooks/useUnsavedChanges.test.tsx @@ -313,6 +313,7 @@ describe('useUnsavedChanges - Edge Cases', () => { triggers: [], edges: [], positions: {}, + enable_job_logs: false, }, }) ); From d32599e4f1800bde106c7218800ba1ea366d9bda Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Fri, 16 Jan 2026 08:54:41 +0000 Subject: [PATCH 28/31] fix: schema validation for base workflow --- .../stores/createSessionContextStore.ts | 9 +++++---- .../stores/createWorkflowStore.ts | 11 +++++------ .../types/sessionContext.ts | 11 ++++------- assets/js/collaborative-editor/types/trigger.ts | 4 ++-- .../js/collaborative-editor/types/workflow.ts | 17 +++++++++++++++-- 5 files changed, 31 insertions(+), 21 deletions(-) diff --git a/assets/js/collaborative-editor/stores/createSessionContextStore.ts b/assets/js/collaborative-editor/stores/createSessionContextStore.ts index 04a42b4deaf..414640f9fdc 100644 --- a/assets/js/collaborative-editor/stores/createSessionContextStore.ts +++ b/assets/js/collaborative-editor/stores/createSessionContextStore.ts @@ -94,6 +94,7 @@ import { WebhookAuthMethodSchema, WorkflowTemplateSchema, } from '../types/sessionContext'; +import type { BaseWorkflow } from '../types/workflow'; import { createWithSelector } from './common'; import { wrapStoreWithDevTools } from './devtools'; @@ -295,9 +296,9 @@ export const createSessionContextStore = ( notify('setLatestSnapshotLockVersion'); }; - const setBaseWorkflow = (workflow: unknown) => { + const setBaseWorkflow = (workflow: BaseWorkflow) => { state = produce(state, draft => { - draft.workflow = workflow as any; + draft.workflow = workflow; }); notify('setBaseWorkflow'); }; @@ -454,8 +455,8 @@ export const createSessionContextStore = ( ).latest_snapshot_lock_version; logger.debug('Workflow saved - updating lock version', lockVersion); setLatestSnapshotLockVersion(lockVersion); - if ('workflow' in message) { - setBaseWorkflow(message.workflow); + if ('workflow' in message && typeof message.workflow === 'object') { + setBaseWorkflow(message.workflow as BaseWorkflow); } } }; diff --git a/assets/js/collaborative-editor/stores/createWorkflowStore.ts b/assets/js/collaborative-editor/stores/createWorkflowStore.ts index 7c84bb82bd2..2c82ae2ffc5 100644 --- a/assets/js/collaborative-editor/stores/createWorkflowStore.ts +++ b/assets/js/collaborative-editor/stores/createWorkflowStore.ts @@ -143,8 +143,7 @@ import { notifications } from '../lib/notifications'; import { EdgeSchema } from '../types/edge'; import { JobSchema } from '../types/job'; import type { Session } from '../types/session'; -import type { Workflow } from '../types/workflow'; -import { WorkflowSchema } from '../types/workflow'; +import type { BaseWorkflow, Workflow } from '../types/workflow'; import { getIncomingEdgeIndices } from '../utils/workflowGraph'; import { createWithSelector } from './common'; @@ -1376,7 +1375,7 @@ export const createWorkflowStore = () => { const saveWorkflow = async (): Promise<{ saved_at?: string; lock_version?: number; - workflow?: unknown; + workflow?: BaseWorkflow; } | null> => { const { ydoc, provider } = ensureConnected(); @@ -1401,7 +1400,7 @@ export const createWorkflowStore = () => { const response = await channelRequest<{ saved_at: string; lock_version: number; - workflow: unknown; + workflow: BaseWorkflow; }>(provider.channel, 'save_workflow', payload); logger.debug('Saved workflow', response); @@ -1419,7 +1418,7 @@ export const createWorkflowStore = () => { saved_at?: string; lock_version?: number; repo?: string; - workflow?: unknown; + workflow?: BaseWorkflow; } | null> => { const { ydoc, provider } = ensureConnected(); @@ -1446,7 +1445,7 @@ export const createWorkflowStore = () => { saved_at: string; lock_version: number; repo: string; - workflow: unknown; + workflow: BaseWorkflow; }>(provider.channel, 'save_and_sync', payload); logger.debug('Saved and synced workflow to GitHub', response); diff --git a/assets/js/collaborative-editor/types/sessionContext.ts b/assets/js/collaborative-editor/types/sessionContext.ts index 6d76ba4f43f..13065dc69b9 100644 --- a/assets/js/collaborative-editor/types/sessionContext.ts +++ b/assets/js/collaborative-editor/types/sessionContext.ts @@ -2,10 +2,7 @@ import type { PhoenixChannelProvider } from 'y-phoenix-channel'; import * as z from 'zod'; import { isoDateTimeSchema, uuidSchema } from './common'; -import { type Workflow } from './workflow'; -import { JobSchema } from './job'; -import { TriggerSchema } from './trigger'; -import { EdgeSchema } from './edge'; +import { BaseWorkflowSchema, type BaseWorkflow } from './workflow'; export const UserContextSchema = z.object({ id: uuidSchema, @@ -106,7 +103,7 @@ export const SessionContextResponseSchema = z.object({ workflow_template: WorkflowTemplateSchema.nullable(), has_read_ai_disclaimer: z.boolean(), limits: LimitsSchema.optional(), - workflow: z.any(), // to be fixed + workflow: BaseWorkflowSchema, }); export type UserContext = z.infer; @@ -117,7 +114,7 @@ export type AppConfig = z.infer; export interface SessionContextState { user: UserContext | null; project: ProjectContext | null; - workflow: Workflow | null; + workflow: BaseWorkflow | null; config: AppConfig | null; permissions: Permissions | null; latestSnapshotLockVersion: number | null; @@ -144,7 +141,7 @@ interface SessionContextCommands { clearError: () => void; setLatestSnapshotLockVersion: (lockVersion: number) => void; clearIsNewWorkflow: () => void; - setBaseWorkflow: (workflow: unknown) => void; + setBaseWorkflow: (workflow: BaseWorkflow) => void; setHasReadAIDisclaimer: (hasRead: boolean) => void; markAIDisclaimerRead: () => Promise; getLimits: ( diff --git a/assets/js/collaborative-editor/types/trigger.ts b/assets/js/collaborative-editor/types/trigger.ts index 8c85b323893..8f2628d38d9 100644 --- a/assets/js/collaborative-editor/types/trigger.ts +++ b/assets/js/collaborative-editor/types/trigger.ts @@ -42,7 +42,7 @@ const cronTriggerSchema = baseTriggerSchema.extend({ 'Invalid cron expression. Use format: minute hour day month weekday', } ), - kafka_configuration: z.null(), + kafka_configuration: z.null().default(null), }); // Kafka configuration sub-schema @@ -92,7 +92,7 @@ const kafkaConfigSchema = z // Kafka trigger schema const kafkaTriggerSchema = baseTriggerSchema.extend({ type: z.literal('kafka'), - cron_expression: z.null(), + cron_expression: z.null().default(null), kafka_configuration: kafkaConfigSchema, }); diff --git a/assets/js/collaborative-editor/types/workflow.ts b/assets/js/collaborative-editor/types/workflow.ts index 061c4b0fcaa..9659a08b815 100644 --- a/assets/js/collaborative-editor/types/workflow.ts +++ b/assets/js/collaborative-editor/types/workflow.ts @@ -10,9 +10,10 @@ import type * as Y from 'yjs'; import { z } from 'zod'; -import type { Job as JobType } from './job'; +import { EdgeSchema } from './edge'; +import { JobSchema, type Job as JobType } from './job'; import type { Session } from './session'; -import type { Trigger as TriggerType } from './trigger'; +import { TriggerSchema, type Trigger as TriggerType } from './trigger'; /** * Zod schema for workflow validation @@ -41,6 +42,18 @@ export const WorkflowSchema = z.object({ export type WorkflowFormValues = z.infer; +export const BaseWorkflowSchema = z.object({ + jobs: z.array(JobSchema), + triggers: z.array(TriggerSchema), + edges: z.array(EdgeSchema), + positions: z.record(z.string(), z.object({}).loose()).nullable(), + name: z.string().min(1).max(255), + concurrency: z.number().nullable(), + enable_job_logs: z.boolean(), +}); + +export type BaseWorkflow = z.infer; + /** * Creates a workflow schema with dynamic project concurrency validation * From 6f8b964626156082f0426cd35bcf43fe31135193 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Fri, 16 Jan 2026 08:59:54 +0000 Subject: [PATCH 29/31] tests: undefined base --- assets/js/collaborative-editor/types/sessionContext.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/assets/js/collaborative-editor/types/sessionContext.ts b/assets/js/collaborative-editor/types/sessionContext.ts index 13065dc69b9..6bec0caa732 100644 --- a/assets/js/collaborative-editor/types/sessionContext.ts +++ b/assets/js/collaborative-editor/types/sessionContext.ts @@ -103,7 +103,7 @@ export const SessionContextResponseSchema = z.object({ workflow_template: WorkflowTemplateSchema.nullable(), has_read_ai_disclaimer: z.boolean(), limits: LimitsSchema.optional(), - workflow: BaseWorkflowSchema, + workflow: BaseWorkflowSchema.optional(), }); export type UserContext = z.infer; From 7efb05c322d242dfaccbf8fc4124bf5ea4f2663c Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Fri, 16 Jan 2026 09:16:49 +0000 Subject: [PATCH 30/31] tests: fix job logs validation --- assets/js/collaborative-editor/types/workflow.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/assets/js/collaborative-editor/types/workflow.ts b/assets/js/collaborative-editor/types/workflow.ts index 9659a08b815..0b51dc43568 100644 --- a/assets/js/collaborative-editor/types/workflow.ts +++ b/assets/js/collaborative-editor/types/workflow.ts @@ -48,8 +48,8 @@ export const BaseWorkflowSchema = z.object({ edges: z.array(EdgeSchema), positions: z.record(z.string(), z.object({}).loose()).nullable(), name: z.string().min(1).max(255), - concurrency: z.number().nullable(), - enable_job_logs: z.boolean(), + concurrency: z.number().nullable().optional(), + enable_job_logs: z.boolean().default(false), }); export type BaseWorkflow = z.infer; From 99c0c8d0f626745382acac28ca6bc27f03271425 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Fri, 16 Jan 2026 11:06:38 +0000 Subject: [PATCH 31/31] tests: broadcasts --- .../workflow_channel_broadcast_test.exs | 483 ++++++++++++++++++ 1 file changed, 483 insertions(+) create mode 100644 test/lightning_web/channels/workflow_channel_broadcast_test.exs diff --git a/test/lightning_web/channels/workflow_channel_broadcast_test.exs b/test/lightning_web/channels/workflow_channel_broadcast_test.exs new file mode 100644 index 00000000000..b69ab3db358 --- /dev/null +++ b/test/lightning_web/channels/workflow_channel_broadcast_test.exs @@ -0,0 +1,483 @@ +defmodule LightningWeb.WorkflowChannelBroadcastTest do + @moduledoc """ + Tests for workflow_saved broadcast behavior after save and save_and_sync operations. + + These tests verify that when a user saves a workflow, all other connected users + receive a broadcast with the updated workflow state, allowing them to update their + base workflow state and show they have no unsaved changes. + """ + use LightningWeb.ChannelCase + + import Lightning.CollaborationHelpers + import Lightning.Factories + import Mox + + setup :verify_on_exit! + + setup do + Mox.stub(Lightning.MockConfig, :check_flag?, fn + :require_email_verification -> true + _flag -> nil + end) + + # Set global mode for the mock to allow cross-process calls + Mox.set_mox_global(LightningMock) + # Stub the broadcast calls that save_workflow makes + Mox.stub(LightningMock, :broadcast, fn _topic, _message -> :ok end) + + user = insert(:user) + project = insert(:project, project_users: [%{user: user, role: :owner}]) + workflow = insert(:workflow, project: project) + + {:ok, _, socket} = + LightningWeb.UserSocket + |> socket("user_#{user.id}", %{current_user: user}) + |> subscribe_and_join( + LightningWeb.WorkflowChannel, + "workflow:collaborate:#{workflow.id}", + %{"project_id" => project.id, "action" => "edit"} + ) + + on_exit(fn -> + ensure_doc_supervisor_stopped(socket.assigns.workflow.id) + end) + + %{socket: socket, user: user, project: project, workflow: workflow} + end + + describe "save_workflow broadcasts workflow_saved to other users" do + setup %{socket: socket, user: user, project: project, workflow: workflow} do + # Create a second user and socket to simulate another collaborator + user2 = insert(:user) + insert(:project_user, project: project, user: user2, role: :editor) + + {:ok, _, socket2} = + LightningWeb.UserSocket + |> socket("user_#{user2.id}", %{current_user: user2}) + |> subscribe_and_join( + LightningWeb.WorkflowChannel, + "workflow:collaborate:#{workflow.id}", + %{"project_id" => project.id, "action" => "edit"} + ) + + %{socket: socket, socket2: socket2, user: user, user2: user2} + end + + test "broadcasts workflow_saved with lock_version and workflow to other users", + %{socket: socket, socket2: _socket2, workflow: workflow} do + # User 1 modifies and saves the workflow + session_pid = socket.assigns.session_pid + doc = Lightning.Collaboration.Session.get_doc(session_pid) + workflow_map = Yex.Doc.get_map(doc, "workflow") + + Yex.Doc.transaction(doc, "test_update", fn -> + Yex.Map.set(workflow_map, "name", "Updated Workflow Name") + end) + + # User 1 saves + ref = push(socket, "save_workflow", %{}) + + # User 1 gets reply + assert_reply ref, :ok, %{ + saved_at: saved_at, + lock_version: new_lock_version, + workflow: reply_workflow + } + + assert %DateTime{} = saved_at + assert new_lock_version == workflow.lock_version + 1 + assert reply_workflow.id == workflow.id + assert reply_workflow.name == "Updated Workflow Name" + assert reply_workflow.lock_version == new_lock_version + + # User 2 (socket2) receives broadcast with the same workflow + assert_broadcast "workflow_saved", %{ + latest_snapshot_lock_version: broadcast_lock_version, + workflow: broadcast_workflow + } + + assert broadcast_lock_version == new_lock_version + assert broadcast_workflow.id == workflow.id + assert broadcast_workflow.name == "Updated Workflow Name" + assert broadcast_workflow.lock_version == new_lock_version + assert broadcast_workflow.jobs == reply_workflow.jobs + assert broadcast_workflow.edges == reply_workflow.edges + assert broadcast_workflow.triggers == reply_workflow.triggers + end + + test "broadcast includes all workflow associations (jobs, edges, triggers)", + %{socket: socket, socket2: _socket2} do + # Add a job to the workflow via Y.Doc + session_pid = socket.assigns.session_pid + doc = Lightning.Collaboration.Session.get_doc(session_pid) + jobs_array = Yex.Doc.get_array(doc, "jobs") + job_id = Ecto.UUID.generate() + + job_map = + Yex.MapPrelim.from(%{ + "id" => job_id, + "name" => "Test Job", + "body" => Yex.TextPrelim.from("fn(state => state)"), + "adaptor" => "@openfn/language-common@1.0.0", + "project_credential_id" => nil, + "keychain_credential_id" => nil + }) + + Yex.Doc.transaction(doc, "test_add_job", fn -> + Yex.Array.push(jobs_array, job_map) + end) + + # Save the workflow + ref = push(socket, "save_workflow", %{}) + assert_reply ref, :ok, %{workflow: reply_workflow} + + # Verify reply contains the job + assert length(reply_workflow.jobs) == 1 + assert Enum.any?(reply_workflow.jobs, fn job -> job.id == job_id end) + + # User 2 receives broadcast with the job included + assert_broadcast "workflow_saved", %{ + workflow: broadcast_workflow + } + + assert length(broadcast_workflow.jobs) == 1 + assert Enum.any?(broadcast_workflow.jobs, fn job -> job.id == job_id end) + job = Enum.find(broadcast_workflow.jobs, &(&1.id == job_id)) + assert job.name == "Test Job" + assert job.adaptor == "@openfn/language-common@1.0.0" + end + + test "broadcast allows other users to reset their unsaved changes indicator", + %{socket: socket, socket2: socket2, workflow: workflow} do + # Scenario: User 2 has local unsaved changes, User 1 saves different changes + # User 2 should receive the broadcast and be able to update their base state + + # User 2 makes local changes (not saved) + session_pid2 = socket2.assigns.session_pid + doc2 = Lightning.Collaboration.Session.get_doc(session_pid2) + workflow_map2 = Yex.Doc.get_map(doc2, "workflow") + + Yex.Doc.transaction(doc2, "user2_local_change", fn -> + Yex.Map.set(workflow_map2, "name", "User 2 Local Change") + end) + + # User 1 makes and saves changes + session_pid = socket.assigns.session_pid + doc = Lightning.Collaboration.Session.get_doc(session_pid) + workflow_map = Yex.Doc.get_map(doc, "workflow") + + Yex.Doc.transaction(doc, "user1_save", fn -> + Yex.Map.set(workflow_map, "name", "User 1 Saved Change") + end) + + ref = push(socket, "save_workflow", %{}) + assert_reply ref, :ok, %{lock_version: new_lock_version} + + # User 2 receives the broadcast + assert_broadcast "workflow_saved", %{ + latest_snapshot_lock_version: broadcast_lock_version, + workflow: broadcast_workflow + } + + assert broadcast_lock_version == new_lock_version + assert broadcast_workflow.name == "User 1 Saved Change" + + # User 2 can now update their latestSnapshotLockVersion and compare + # with their current Y.Doc state to determine if they still have unsaved changes + assert broadcast_lock_version == workflow.lock_version + 1 + end + + test "multiple users receive the same broadcast when one user saves", + %{ + socket: socket, + socket2: _socket2, + project: project, + workflow: workflow + } do + # Add a third user + user3 = insert(:user) + insert(:project_user, project: project, user: user3, role: :editor) + + {:ok, _, _socket3} = + LightningWeb.UserSocket + |> socket("user_#{user3.id}", %{current_user: user3}) + |> subscribe_and_join( + LightningWeb.WorkflowChannel, + "workflow:collaborate:#{workflow.id}", + %{"project_id" => project.id, "action" => "edit"} + ) + + # User 1 saves + session_pid = socket.assigns.session_pid + doc = Lightning.Collaboration.Session.get_doc(session_pid) + workflow_map = Yex.Doc.get_map(doc, "workflow") + + Yex.Doc.transaction(doc, "test_update", fn -> + Yex.Map.set(workflow_map, "name", "Multi-User Save") + end) + + ref = push(socket, "save_workflow", %{}) + assert_reply ref, :ok, %{lock_version: new_lock_version} + + # Both user 2 and user 3 should receive the broadcast + # The same message should be broadcast twice (once for each subscriber) + assert_broadcast "workflow_saved", %{ + latest_snapshot_lock_version: lock_v1, + workflow: wf1 + } + + assert_broadcast "workflow_saved", %{ + latest_snapshot_lock_version: lock_v2, + workflow: wf2 + } + + assert lock_v1 == new_lock_version + assert lock_v2 == new_lock_version + assert wf1.name == "Multi-User Save" + assert wf2.name == "Multi-User Save" + end + + test "broadcast is not sent when save fails due to validation error", + %{socket: socket, socket2: _socket2} do + # Set invalid data + session_pid = socket.assigns.session_pid + doc = Lightning.Collaboration.Session.get_doc(session_pid) + workflow_map = Yex.Doc.get_map(doc, "workflow") + + Yex.Doc.transaction(doc, "test_invalid", fn -> + Yex.Map.set(workflow_map, "name", "") + end) + + ref = push(socket, "save_workflow", %{}) + + assert_reply ref, :error, %{ + errors: _errors, + type: "validation_error" + } + + # No broadcast should be sent on error + refute_broadcast "workflow_saved", _ + end + + test "broadcast is not sent when save fails due to optimistic lock error", + %{socket: socket, socket2: _socket2, workflow: workflow, user: user} do + # Simulate another user saving first (causing lock version mismatch) + {:ok, _updated_workflow} = + Lightning.Workflows.save_workflow( + Lightning.Workflows.change_workflow(workflow, %{ + name: "Concurrent Save" + }), + user + ) + + # Now try to save with stale lock version + session_pid = socket.assigns.session_pid + doc = Lightning.Collaboration.Session.get_doc(session_pid) + workflow_map = Yex.Doc.get_map(doc, "workflow") + + Yex.Doc.transaction(doc, "test_stale", fn -> + Yex.Map.set(workflow_map, "name", "My Stale Change") + end) + + ref = push(socket, "save_workflow", %{}) + + # May succeed or fail depending on Y.Doc merge, but if it fails, no broadcast + assert_reply ref, reply_type, _response + + if reply_type == :error do + refute_broadcast "workflow_saved", _ + end + end + + test "broadcast uses broadcast_from! to exclude the saving user", + %{socket: socket, socket2: _socket2} do + # The saving user should NOT receive their own broadcast + # (they already have the data in the reply) + + session_pid = socket.assigns.session_pid + doc = Lightning.Collaboration.Session.get_doc(session_pid) + workflow_map = Yex.Doc.get_map(doc, "workflow") + + Yex.Doc.transaction(doc, "test_update", fn -> + Yex.Map.set(workflow_map, "name", "Broadcast Exclusion Test") + end) + + ref = push(socket, "save_workflow", %{}) + assert_reply ref, :ok, %{lock_version: _} + + # User 1 (socket) should NOT receive the broadcast they triggered + # (broadcast_from! excludes the sender) + # User 2 (socket2) should receive it + + # We can't directly test broadcast_from! exclusion in this test setup + # because both sockets are in the same test process, but we can verify + # the broadcast happens + assert_broadcast "workflow_saved", %{ + workflow: broadcast_workflow + } + + assert broadcast_workflow.name == "Broadcast Exclusion Test" + end + end + + describe "save_and_sync broadcasts workflow_saved to other users" do + @tag :skip + test "broadcasts workflow_saved after successful save (without actual GitHub sync)", + %{socket: socket, user: _user, project: project, workflow: workflow} do + # Create repo connection for GitHub sync + insert(:project_repo_connection, + project: project, + repo: "openfn/demo", + branch: "main" + ) + + # Create a second user + user2 = insert(:user) + insert(:project_user, project: project, user: user2, role: :editor) + + {:ok, _, _socket2} = + LightningWeb.UserSocket + |> socket("user_#{user2.id}", %{current_user: user2}) + |> subscribe_and_join( + LightningWeb.WorkflowChannel, + "workflow:collaborate:#{workflow.id}", + %{"project_id" => project.id, "action" => "edit"} + ) + + # User 1 modifies workflow + session_pid = socket.assigns.session_pid + doc = Lightning.Collaboration.Session.get_doc(session_pid) + workflow_map = Yex.Doc.get_map(doc, "workflow") + + Yex.Doc.transaction(doc, "test_update", fn -> + Yex.Map.set(workflow_map, "name", "Sync Test Workflow") + end) + + # Note: This will fail at the GitHub sync step, but should still + # broadcast if the save succeeds. In real tests with proper mocking, + # this would verify the broadcast happens after successful sync. + ref = + push(socket, "save_and_sync", %{ + "commit_message" => "test: sync workflow" + }) + + # This will get an error reply in the test environment + # (no real GitHub connection), but in production with proper setup + # it would succeed and broadcast + assert_reply ref, reply_type, _response + + # If save succeeded, broadcast should have been sent + # If save failed, no broadcast + if reply_type == :ok do + assert_broadcast "workflow_saved", %{ + latest_snapshot_lock_version: _, + workflow: broadcast_workflow + } + + assert broadcast_workflow.name == "Sync Test Workflow" + else + # Expected in test environment without GitHub setup + :ok + end + end + + test "no broadcast when save fails before sync attempt (validation error)", + %{socket: socket, user: _user, project: project, workflow: workflow} do + # Create repo connection + insert(:project_repo_connection, + project: project, + repo: "openfn/demo", + branch: "main" + ) + + # Create a second user + user2 = insert(:user) + insert(:project_user, project: project, user: user2, role: :editor) + + {:ok, _, _socket2} = + LightningWeb.UserSocket + |> socket("user_#{user2.id}", %{current_user: user2}) + |> subscribe_and_join( + LightningWeb.WorkflowChannel, + "workflow:collaborate:#{workflow.id}", + %{"project_id" => project.id, "action" => "edit"} + ) + + # Set invalid workflow state + session_pid = socket.assigns.session_pid + doc = Lightning.Collaboration.Session.get_doc(session_pid) + workflow_map = Yex.Doc.get_map(doc, "workflow") + + Yex.Doc.transaction(doc, "test_invalid", fn -> + Yex.Map.set(workflow_map, "name", "") + end) + + ref = + push(socket, "save_and_sync", %{ + "commit_message" => "test: invalid save" + }) + + assert_reply ref, :error, %{ + errors: _errors, + type: "validation_error" + } + + # No broadcast when save fails before sync + refute_broadcast "workflow_saved", _ + end + end + + describe "workflow_saved broadcast content structure" do + test "broadcast includes correct workflow structure matching reply", + %{socket: socket, workflow: workflow} do + session_pid = socket.assigns.session_pid + doc = Lightning.Collaboration.Session.get_doc(session_pid) + workflow_map = Yex.Doc.get_map(doc, "workflow") + + Yex.Doc.transaction(doc, "test_structure", fn -> + Yex.Map.set(workflow_map, "name", "Structure Test") + end) + + ref = push(socket, "save_workflow", %{}) + + assert_reply ref, :ok, %{ + saved_at: _, + lock_version: reply_lock_version, + workflow: reply_workflow + } + + # Create second socket to receive broadcast + user2 = insert(:user) + + insert(:project_user, + project: workflow.project, + user: user2, + role: :editor + ) + + {:ok, _, _socket2} = + LightningWeb.UserSocket + |> socket("user_#{user2.id}", %{current_user: user2}) + |> subscribe_and_join( + LightningWeb.WorkflowChannel, + "workflow:collaborate:#{workflow.id}", + %{"project_id" => workflow.project_id, "action" => "edit"} + ) + + # Verify broadcast matches reply structure + assert_broadcast "workflow_saved", %{ + latest_snapshot_lock_version: broadcast_lock_version, + workflow: broadcast_workflow + } + + assert broadcast_lock_version == reply_lock_version + assert broadcast_workflow.id == reply_workflow.id + assert broadcast_workflow.name == reply_workflow.name + assert broadcast_workflow.lock_version == reply_workflow.lock_version + assert broadcast_workflow.project_id == reply_workflow.project_id + assert broadcast_workflow.jobs == reply_workflow.jobs + assert broadcast_workflow.edges == reply_workflow.edges + assert broadcast_workflow.triggers == reply_workflow.triggers + end + end +end