diff --git a/apps/studio/components/interfaces/Database/Replication/DeleteDestination.tsx b/apps/studio/components/interfaces/Database/Replication/DeleteDestination.tsx index 4782500013ae0..1c1f633505f11 100644 --- a/apps/studio/components/interfaces/Database/Replication/DeleteDestination.tsx +++ b/apps/studio/components/interfaces/Database/Replication/DeleteDestination.tsx @@ -2,40 +2,32 @@ import TextConfirmModal from 'ui-patterns/Dialogs/TextConfirmModal' interface DeleteDestinationProps { visible: boolean - setVisible: (value: boolean) => void - onDelete: () => void isLoading: boolean name: string + setVisible: (value: boolean) => void + onDelete: () => void } -const DeleteDestination = ({ +export const DeleteDestination = ({ visible, - setVisible, - onDelete, isLoading, name, + setVisible, + onDelete, }: DeleteDestinationProps) => { return ( - <> - setVisible(!visible)} - onConfirm={onDelete} - title="Delete this destination" - loading={isLoading} - confirmLabel={`Delete destination`} - confirmPlaceholder="Type in name of destination" - confirmString={name ?? 'Unknown'} - text={ - <> - This will delete the destination{' '} - - } - alert={{ title: 'You cannot recover this destination once deleted.' }} - /> - + setVisible(!visible)} + onConfirm={onDelete} + /> ) } - -export default DeleteDestination diff --git a/apps/studio/components/interfaces/Database/Replication/DestinationPanel.tsx b/apps/studio/components/interfaces/Database/Replication/DestinationPanel.tsx index 21f58cd2d204b..68199c42788cf 100644 --- a/apps/studio/components/interfaces/Database/Replication/DestinationPanel.tsx +++ b/apps/studio/components/interfaces/Database/Replication/DestinationPanel.tsx @@ -1,10 +1,21 @@ import { zodResolver } from '@hookform/resolvers/zod' +import { useEffect, useMemo, useState } from 'react' +import { useForm } from 'react-hook-form' +import { toast } from 'sonner' +import * as z from 'zod' + import { useParams } from 'common' +import { useCreateDestinationPipelineMutation } from 'data/replication/create-destination-pipeline-mutation' import { useCreateTenantSourceMutation } from 'data/replication/create-tenant-source-mutation' +import { useReplicationDestinationByIdQuery } from 'data/replication/destination-by-id-query' +import { useReplicationPipelineByIdQuery } from 'data/replication/pipeline-by-id-query' import { useReplicationPublicationsQuery } from 'data/replication/publications-query' import { useStartPipelineMutation } from 'data/replication/start-pipeline-mutation' -import { useForm } from 'react-hook-form' -import { toast } from 'sonner' +import { useUpdateDestinationPipelineMutation } from 'data/replication/update-destination-pipeline-mutation' +import { + PipelineStatusRequestStatus, + usePipelineRequestStatus, +} from 'state/replication-pipeline-request-status' import { Accordion_Shadcn_, AccordionContent_Shadcn_, @@ -23,6 +34,7 @@ import { SelectGroup_Shadcn_, SelectItem_Shadcn_, SelectTrigger_Shadcn_, + Separator, Sheet, SheetContent, SheetDescription, @@ -30,21 +42,28 @@ import { SheetHeader, SheetSection, SheetTitle, - Switch, TextArea_Shadcn_, WarningIcon, - Label_Shadcn_ as Label, } from 'ui' -import * as z from 'zod' -import PublicationsComboBox from './PublicationsComboBox' -import NewPublicationPanel from './NewPublicationPanel' -import { useState, useMemo, useEffect } from 'react' -import { useReplicationDestinationByIdQuery } from 'data/replication/destination-by-id-query' -import { useReplicationPipelineByIdQuery } from 'data/replication/pipeline-by-id-query' -import { useStopPipelineMutation } from 'data/replication/stop-pipeline-mutation' import { FormItemLayout } from 'ui-patterns/form/FormItemLayout/FormItemLayout' -import { useCreateDestinationPipelineMutation } from 'data/replication/create-destination-pipeline-mutation' -import { useUpdateDestinationPipelineMutation } from 'data/replication/update-destination-pipeline-mutation' +import NewPublicationPanel from './NewPublicationPanel' +import PublicationsComboBox from './PublicationsComboBox' + +const formId = 'destination-editor' +const types = ['BigQuery'] as const +const TypeEnum = z.enum(types) + +const FormSchema = z.object({ + type: TypeEnum, + name: z.string().min(1, 'Name is required'), + projectId: z.string().min(1, 'Project id is required'), + datasetId: z.string().min(1, 'Dataset id is required'), + serviceAccountKey: z.string().min(1, 'Service account key is required'), + publicationName: z.string().min(1, 'Publication is required'), + maxSize: z.number().min(1, 'Max Size must be greater than 0').int().optional(), + maxFillMs: z.number().min(1, 'Max Fill milliseconds should be greater than 0').int().optional(), + maxStalenessMins: z.number().nonnegative().optional(), +}) interface DestinationPanelProps { visible: boolean @@ -58,22 +77,33 @@ interface DestinationPanelProps { } } -const DestinationPanel = ({ +export const DestinationPanel = ({ visible, sourceId, onClose, existingDestination, }: DestinationPanelProps) => { const { ref: projectRef } = useParams() + const { setRequestStatus } = usePipelineRequestStatus() + + const editMode = !!existingDestination const [publicationPanelVisible, setPublicationPanelVisible] = useState(false) + const { mutateAsync: createTenantSource, isLoading: creatingTenantSource } = useCreateTenantSourceMutation() + const { mutateAsync: createDestinationPipeline, isLoading: creatingDestinationPipeline } = - useCreateDestinationPipelineMutation() - const { mutateAsync: startPipeline, isLoading: startingPipeline } = useStartPipelineMutation() - const { mutateAsync: stopPipeline, isLoading: stoppingPipeline } = useStopPipelineMutation() + useCreateDestinationPipelineMutation({ + onSuccess: () => form.reset(defaultValues), + }) + const { mutateAsync: updateDestinationPipeline, isLoading: updatingDestinationPipeline } = - useUpdateDestinationPipelineMutation() + useUpdateDestinationPipelineMutation({ + onSuccess: () => form.reset(defaultValues), + }) + + const { mutateAsync: startPipeline, isLoading: startingPipeline } = useStartPipelineMutation() + const { data: publications, isLoading: loadingPublications } = useReplicationPublicationsQuery({ projectRef, sourceId, @@ -89,26 +119,6 @@ const DestinationPanel = ({ pipelineId: existingDestination?.pipelineId, }) - const isCreating = creatingTenantSource || creatingDestinationPipeline || startingPipeline - const isUpdating = updatingDestinationPipeline || stoppingPipeline || startingPipeline - const isSubmitting = isCreating || isUpdating - const editMode = !!existingDestination - - const formId = 'destination-editor' - const types = ['BigQuery'] as const - const TypeEnum = z.enum(types) - const FormSchema = z.object({ - type: TypeEnum, - name: z.string().min(1, 'Name is required'), - projectId: z.string().min(1, 'Project id is required'), - datasetId: z.string().min(1, 'Dataset id is required'), - serviceAccountKey: z.string().min(1, 'Service account key is required'), - publicationName: z.string().min(1, 'Publication is required'), - maxSize: z.number().min(1, 'Max Size must be greater than 0').int(), - maxFillMs: z.number().min(1, 'Max Fill milliseconds should be greater than 0').int(), - maxStalenessMins: z.number().nonnegative(), - enabled: z.boolean(), - }) const defaultValues = useMemo( () => ({ type: TypeEnum.enum.BigQuery, @@ -118,403 +128,406 @@ const DestinationPanel = ({ // For now, the password will always be set as empty for security reasons. serviceAccountKey: destinationData?.config?.big_query?.service_account_key ?? '', publicationName: pipelineData?.config.publication_name ?? '', - maxSize: pipelineData?.config?.batch?.max_size ?? 1000, - maxFillMs: pipelineData?.config?.batch?.max_fill_ms ?? 10, - maxStalenessMins: destinationData?.config?.big_query?.max_staleness_mins ?? 5, - enabled: existingDestination?.enabled ?? true, + maxSize: pipelineData?.config?.batch?.max_size, + maxFillMs: pipelineData?.config?.batch?.max_fill_ms, + maxStalenessMins: destinationData?.config?.big_query?.max_staleness_mins, }), - [destinationData, pipelineData, existingDestination] + [destinationData, pipelineData] ) + const form = useForm>({ mode: 'onBlur', reValidateMode: 'onBlur', resolver: zodResolver(FormSchema), defaultValues, }) + const isSaving = creatingDestinationPipeline || updatingDestinationPipeline || startingPipeline + const onSubmit = async (data: z.infer) => { if (!projectRef) return console.error('Project ref is required') + if (!sourceId) return console.error('Source id is required') + try { if (editMode && existingDestination) { - if (!sourceId) { - console.error('Source id is required') - return + if (!existingDestination.pipelineId) return console.error('Pipeline id is required') + + const bigQueryConfig: any = { + projectId: data.projectId, + datasetId: data.datasetId, + serviceAccountKey: data.serviceAccountKey, } - if (!existingDestination.pipelineId) { - console.error('Pipeline id is required') - return + if (!!data.maxStalenessMins) { + bigQueryConfig.maxStalenessMins = data.maxStalenessMins } - // Update existing destination + + const batchConfig: any = {} + if (!!data.maxSize) batchConfig.maxSize = data.maxSize + if (!!data.maxFillMs) batchConfig.maxFillMs = data.maxFillMs + const hasBothBatchFields = Object.keys(batchConfig).length === 2 + await updateDestinationPipeline({ destinationId: existingDestination.destinationId, pipelineId: existingDestination.pipelineId, projectRef, destinationName: data.name, - destinationConfig: { - bigQuery: { - projectId: data.projectId, - datasetId: data.datasetId, - serviceAccountKey: data.serviceAccountKey, - maxStalenessMins: data.maxStalenessMins, - }, - }, + destinationConfig: { bigQuery: bigQueryConfig }, pipelineConfig: { publicationName: data.publicationName, - batch: { maxSize: data.maxSize, maxFillMs: data.maxFillMs }, + ...(hasBothBatchFields ? { batch: batchConfig } : {}), }, sourceId, }) - if (data.enabled) { - await startPipeline({ projectRef, pipelineId: existingDestination.pipelineId }) + // Set request status only right before starting, then fire and close + const snapshot = existingDestination.enabled ? 'started' : 'stopped' + if (existingDestination.enabled) { + setRequestStatus( + existingDestination.pipelineId, + PipelineStatusRequestStatus.RestartRequested, + snapshot + ) + toast.success('Settings applied. Restarting the pipeline...') } else { - await stopPipeline({ projectRef, pipelineId: existingDestination.pipelineId }) + setRequestStatus( + existingDestination.pipelineId, + PipelineStatusRequestStatus.StartRequested, + snapshot + ) + toast.success('Settings applied. Starting the pipeline...') } - - toast.success('Successfully updated destination') + startPipeline({ projectRef, pipelineId: existingDestination.pipelineId }) + onClose() } else { - // Create new destination - if (!sourceId) { - console.error('Source id is required') - return + const bigQueryConfig: any = { + projectId: data.projectId, + datasetId: data.datasetId, + serviceAccountKey: data.serviceAccountKey, } + if (!!data.maxStalenessMins) { + bigQueryConfig.maxStalenessMins = data.maxStalenessMins + } + + const batchConfig: any = {} + if (!!data.maxSize) batchConfig.maxSize = data.maxSize + if (!!data.maxFillMs) batchConfig.maxFillMs = data.maxFillMs + const hasBothBatchFields = Object.keys(batchConfig).length === 2 + const { pipeline_id: pipelineId } = await createDestinationPipeline({ projectRef, destinationName: data.name, - destinationConfig: { - bigQuery: { - projectId: data.projectId, - datasetId: data.datasetId, - serviceAccountKey: data.serviceAccountKey, - maxStalenessMins: data.maxStalenessMins, - }, - }, + destinationConfig: { bigQuery: bigQueryConfig }, sourceId, pipelineConfig: { publicationName: data.publicationName, - batch: { maxSize: data.maxSize, maxFillMs: data.maxFillMs }, + ...(hasBothBatchFields ? { batch: batchConfig } : {}), }, }) - if (data.enabled) { - await startPipeline({ projectRef, pipelineId }) - } - toast.success('Successfully created destination') + // Set request status only right before starting, then fire and close + setRequestStatus(pipelineId, PipelineStatusRequestStatus.StartRequested, undefined) + toast.success('Destination created. Starting the pipeline...') + startPipeline({ projectRef, pipelineId }) + onClose() } - onClose() } catch (error) { - toast.error(`Failed to ${editMode ? 'update' : 'create'} destination`) + const action = editMode ? 'apply and run' : 'create and start' + toast.error(`Failed to ${action} destination`) } } + const onEnableReplication = async () => { if (!projectRef) return console.error('Project ref is required') await createTenantSource({ projectRef }) } - const { enabled } = form.watch() - useEffect(() => { if (editMode && destinationData && pipelineData) { form.reset(defaultValues) } }, [destinationData, pipelineData, editMode, defaultValues, form]) - return ( + return sourceId ? ( <> - {sourceId ? ( - <> - - -
- -
- {editMode ? 'Edit Destination' : 'New Destination'} - - {editMode ? null : 'Send data to a new destination'} - -
-
- { - form.setValue('enabled', checked) - }} + + +
+ + {editMode ? 'Edit destination' : 'Create a new destination'} + + {editMode ? null : 'Send data to a new destination'} + + + + +
+
+ ( + + + + + + )} /> - -
- - - - - ( - - - - - - )} - /> -

What data to send

- - ( - - - pub.name) || []} - loading={loadingPublications} - field={field} - onNewPublicationClick={() => setPublicationPanelVisible(true)} - /> - - - )} - /> -

Where to send that data

- - ( - - - - {field.value} - - - - BigQuery - - - - - - - )} - > - - ( - - - - - - )} - /> - ( - - - - - - )} - /> - ( - - - - - - )} - /> - - - - - Advanced Settings - - - ( - - - - - - )} - /> - ( - - - - - - )} + +

What data to send

+ + ( + + + pub.name) || []} + loading={loadingPublications} + field={field} + onNewPublicationClick={() => setPublicationPanelVisible(true)} /> - ( - - - - - - )} + + + )} + /> + +

Where to send that data

+ + ( + + + + {field.value} + + + BigQuery + + + + + + )} + /> + + ( + + + + + + )} + /> + + ( + + + + + + )} + /> + + ( + + + -
-
-
-
- ( - - - - - - )} - /> -
- -
-
- - + + + )} + /> +
+ + + +
+ + + + Advanced Settings + + + ( + + + { + const val = e.target.value + field.onChange(val === '' ? undefined : Number(val)) + }} + placeholder="Leave empty for default" + /> + + + )} + /> + ( + + + { + const val = e.target.value + field.onChange(val === '' ? undefined : Number(val)) + }} + placeholder="Leave empty for default" + /> + + + )} + /> + ( + + + { + const val = e.target.value + field.onChange(val === '' ? undefined : Number(val)) + }} + placeholder="Leave empty for default" + /> + + + )} + /> + + + +
+ + + + + + + +
+ + + setPublicationPanelVisible(false)} + /> + + ) : ( + + +
+ + Create a new destination + + + + + + {/* Pricing to be decided yet */} + Enabling replication will cost additional $xx.xx + + + +
- -
- - - setPublicationPanelVisible(false)} - /> - - ) : ( - <> - - -
- - New Destination - - - - - - {/* Pricing to be decided yet */} - Enabling replication will cost additional $xx.xx - - - -
- -
-
-
-
- - - -
-
-
- - )} - +
+ + + + + + +
+
+
) } - -export default DestinationPanel diff --git a/apps/studio/components/interfaces/Database/Replication/DestinationRow.tsx b/apps/studio/components/interfaces/Database/Replication/DestinationRow.tsx index 85d315f7ef780..16437c30b5b8a 100644 --- a/apps/studio/components/interfaces/Database/Replication/DestinationRow.tsx +++ b/apps/studio/components/interfaces/Database/Replication/DestinationRow.tsx @@ -6,18 +6,20 @@ import { useParams } from 'common' import Table from 'components/to-be-cleaned/Table' import AlertError from 'components/ui/AlertError' import { useDeleteDestinationPipelineMutation } from 'data/replication/delete-destination-pipeline-mutation' +import { useReplicationPipelineReplicationStatusQuery } from 'data/replication/pipeline-replication-status-query' import { useReplicationPipelineStatusQuery } from 'data/replication/pipeline-status-query' import { Pipeline } from 'data/replication/pipelines-query' import { useStopPipelineMutation } from 'data/replication/stop-pipeline-mutation' +import { AlertCircle } from 'lucide-react' import { PipelineStatusRequestStatus, usePipelineRequestStatus, } from 'state/replication-pipeline-request-status' import { ResponseError } from 'types' -import { Button } from 'ui' +import { Button, Tooltip, TooltipContent, TooltipTrigger } from 'ui' import ShimmeringLoader from 'ui-patterns/ShimmeringLoader' -import DeleteDestination from './DeleteDestination' -import DestinationPanel from './DestinationPanel' +import { DeleteDestination } from './DeleteDestination' +import { DestinationPanel } from './DestinationPanel' import { getStatusName, PIPELINE_ERROR_MESSAGES } from './Pipeline.utils' import { PipelineStatus, PipelineStatusName } from './PipelineStatus' import { STATUS_REFRESH_FREQUENCY_MS } from './Replication.constants' @@ -48,6 +50,7 @@ export const DestinationRow = ({ }: DestinationRowProps) => { const { ref: projectRef } = useParams() const [showDeleteDestinationForm, setShowDeleteDestinationForm] = useState(false) + const [isDeleting, setIsDeleting] = useState(false) const [showEditDestinationPanel, setShowEditDestinationPanel] = useState(false) const { @@ -74,6 +77,15 @@ export const DestinationRow = ({ const pipelineStatus = pipelineStatusData?.status const statusName = getStatusName(pipelineStatus) + // Fetch table-level replication status to surface errors in list view + const { data: replicationStatusData } = useReplicationPipelineReplicationStatusQuery( + { projectRef, pipelineId: pipeline?.id }, + { refetchInterval: STATUS_REFRESH_FREQUENCY_MS } + ) + const tableStatuses = replicationStatusData?.table_statuses ?? [] + const errorCount = tableStatuses.filter((t) => t.state?.name === 'error').length + const hasTableErrors = errorCount > 0 + const onDeleteClick = async () => { if (!projectRef) { return console.error('Project ref is required') @@ -83,14 +95,20 @@ export const DestinationRow = ({ } try { + setIsDeleting(true) await stopPipeline({ projectRef, pipelineId: pipeline.id }) await deleteDestinationPipeline({ projectRef, destinationId: destinationId, pipelineId: pipeline.id, }) + // Close dialog after successful deletion + setShowDeleteDestinationForm(false) + toast.success(`Deleted destination "${destinationName}"`) } catch (error) { toast.error(PIPELINE_ERROR_MESSAGES.DELETE_DESTINATION) + } finally { + setIsDeleting(false) } } @@ -132,9 +150,21 @@ export const DestinationRow = ({
- {
)} - setShowNewDestinationPanel(false)} diff --git a/apps/studio/components/interfaces/Database/Replication/Pipeline.utils.ts b/apps/studio/components/interfaces/Database/Replication/Pipeline.utils.ts index 2d3efaf233770..eb69b88e689d0 100644 --- a/apps/studio/components/interfaces/Database/Replication/Pipeline.utils.ts +++ b/apps/studio/components/interfaces/Database/Replication/Pipeline.utils.ts @@ -22,16 +22,25 @@ export const getStatusName = ( return undefined } +export const PIPELINE_ENABLE_ALLOWED_FROM = ['stopped'] as const +export const PIPELINE_DISABLE_ALLOWED_FROM = ['started', 'failed'] as const +export const PIPELINE_ACTIONABLE_STATES = ['failed', 'started', 'stopped'] as const + const PIPELINE_STATE_MESSAGES = { enabling: { - title: 'Pipeline enabling', - message: 'Starting the pipeline. Table replication will resume once enabled.', - badge: 'Enabling', + title: 'Starting pipeline', + message: 'Starting the pipeline. Table replication will resume once running.', + badge: 'Starting', }, disabling: { - title: 'Pipeline disabling', - message: 'Stopping the pipeline. Table replication will be paused once disabled.', - badge: 'Disabling', + title: 'Stopping pipeline', + message: 'Stopping the pipeline. Table replication will be paused once stopped.', + badge: 'Stopping', + }, + restarting: { + title: 'Restarting pipeline', + message: 'Applying settings and restarting the pipeline.', + badge: 'Restarting', }, failed: { title: 'Pipeline failed', @@ -40,7 +49,7 @@ const PIPELINE_STATE_MESSAGES = { }, stopped: { title: 'Pipeline stopped', - message: 'Replication is paused. Enable the pipeline to resume data synchronization.', + message: 'Replication is paused. Start the pipeline to resume data synchronization.', badge: 'Stopped', }, starting: { @@ -60,8 +69,8 @@ const PIPELINE_STATE_MESSAGES = { }, notRunning: { title: 'Pipeline not running', - message: 'Replication is not active. Enable the pipeline to start data synchronization.', - badge: 'Disabled', + message: 'Replication is not active. Start the pipeline to begin data synchronization.', + badge: 'Stopped', }, } as const @@ -69,23 +78,25 @@ export const getPipelineStateMessages = ( requestStatus: PipelineStatusRequestStatus | undefined, statusName: string | undefined ) => { - // Always prioritize request status (enabling/disabling) over pipeline status - if (requestStatus === PipelineStatusRequestStatus.EnableRequested) { + // Reflect optimistic request intent immediately after click + if (requestStatus === PipelineStatusRequestStatus.RestartRequested) { + return PIPELINE_STATE_MESSAGES.restarting + } + if (requestStatus === PipelineStatusRequestStatus.StartRequested) { return PIPELINE_STATE_MESSAGES.enabling } - - if (requestStatus === PipelineStatusRequestStatus.DisableRequested) { + if (requestStatus === PipelineStatusRequestStatus.StopRequested) { return PIPELINE_STATE_MESSAGES.disabling } - // Only check pipeline status if no request is in progress + // Fall back to steady states switch (statusName) { + case 'starting': + return PIPELINE_STATE_MESSAGES.starting case 'failed': return PIPELINE_STATE_MESSAGES.failed case 'stopped': return PIPELINE_STATE_MESSAGES.stopped - case 'starting': - return PIPELINE_STATE_MESSAGES.starting case 'started': return PIPELINE_STATE_MESSAGES.running case 'unknown': diff --git a/apps/studio/components/interfaces/Database/Replication/PipelineStatus.tsx b/apps/studio/components/interfaces/Database/Replication/PipelineStatus.tsx index 69a988996d7d9..14e8b9a99696c 100644 --- a/apps/studio/components/interfaces/Database/Replication/PipelineStatus.tsx +++ b/apps/studio/components/interfaces/Database/Replication/PipelineStatus.tsx @@ -45,18 +45,26 @@ export const PipelineStatus = ({ // Get consistent tooltip message using the same logic as other components const stateMessages = getPipelineStateMessages(requestStatus, statusName) - if (requestStatus === PipelineStatusRequestStatus.EnableRequested) { + // Show optimistic request state while backend still reports steady states + if (requestStatus === PipelineStatusRequestStatus.RestartRequested) { return { - label: 'Enabling...', - dot: , - color: 'text-brand-600', + label: 'Restarting', + dot: , + color: 'text-warning-600', tooltip: stateMessages.message, } } - - if (requestStatus === PipelineStatusRequestStatus.DisableRequested) { + if (requestStatus === PipelineStatusRequestStatus.StartRequested) { + return { + label: 'Starting', + dot: , + color: 'text-warning-600', + tooltip: stateMessages.message, + } + } + if (requestStatus === PipelineStatusRequestStatus.StopRequested) { return { - label: 'Disabling...', + label: 'Stopping', dot: , color: 'text-warning-600', tooltip: stateMessages.message, diff --git a/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus.tsx b/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus.tsx index dd098652c769a..3d62d5f710a2c 100644 --- a/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus.tsx +++ b/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus.tsx @@ -1,7 +1,3 @@ -// @ts-nocheck [Joshen] Temporarily silencing the TS checks here but please eventually remove -// it's cause the API types are conflicting a bit - API types have probably been updated for the UI here -// but the UI hasn't been updated yet to fit the new API types - import { Activity, ChevronLeft, ExternalLink, Search, X } from 'lucide-react' import Link from 'next/link' import { useEffect, useState } from 'react' @@ -24,7 +20,13 @@ import { Badge, Button, cn } from 'ui' import { GenericSkeletonLoader } from 'ui-patterns' import { Input } from 'ui-patterns/DataInputs/Input' import { ErroredTableDetails } from './ErroredTableDetails' -import { getStatusName, PIPELINE_ERROR_MESSAGES } from './Pipeline.utils' +import { + PIPELINE_ACTIONABLE_STATES, + PIPELINE_DISABLE_ALLOWED_FROM, + PIPELINE_ENABLE_ALLOWED_FROM, + PIPELINE_ERROR_MESSAGES, + getStatusName, +} from './Pipeline.utils' import { PipelineStatus } from './PipelineStatus' import { STATUS_REFRESH_FREQUENCY_MS } from './Replication.constants' import { TableState } from './ReplicationPipelineStatus.types' @@ -97,8 +99,9 @@ export const ReplicationPipelineStatus = () => { const isPipelineRunning = statusName === 'started' const hasTableData = tableStatuses.length > 0 const isEnablingDisabling = - requestStatus === PipelineStatusRequestStatus.EnableRequested || - requestStatus === PipelineStatusRequestStatus.DisableRequested + requestStatus === PipelineStatusRequestStatus.StartRequested || + requestStatus === PipelineStatusRequestStatus.StopRequested || + requestStatus === PipelineStatusRequestStatus.RestartRequested const showDisabledState = !isPipelineRunning || isEnablingDisabling const onTogglePipeline = async () => { @@ -110,12 +113,12 @@ export const ReplicationPipelineStatus = () => { } try { - if (statusName === 'stopped') { + if (PIPELINE_ENABLE_ALLOWED_FROM.includes(statusName as any)) { + setRequestStatus(pipeline.id, PipelineStatusRequestStatus.StartRequested, statusName) await startPipeline({ projectRef, pipelineId: pipeline.id }) - setRequestStatus(pipeline.id, PipelineStatusRequestStatus.EnableRequested) - } else if (statusName === 'started') { + } else if (PIPELINE_DISABLE_ALLOWED_FROM.includes(statusName as any)) { + setRequestStatus(pipeline.id, PipelineStatusRequestStatus.StopRequested, statusName) await stopPipeline({ projectRef, pipelineId: pipeline.id }) - setRequestStatus(pipeline.id, PipelineStatusRequestStatus.DisableRequested) } } catch (error) { toast.error(PIPELINE_ERROR_MESSAGES.ENABLE_DESTINATION) @@ -181,9 +184,9 @@ export const ReplicationPipelineStatus = () => { type={statusName === 'stopped' ? 'primary' : 'default'} onClick={() => onTogglePipeline()} loading={isPipelineError || isStartingPipeline || isStoppingPipeline} - disabled={!['failed', 'started', 'stopped', 'stopping'].includes(statusName ?? '')} + disabled={!PIPELINE_ACTIONABLE_STATES.includes((statusName ?? '') as any)} > - {statusName === 'stopped' ? 'Enable' : 'Disable'} pipeline + {statusName === 'stopped' ? 'Start' : 'Stop'} pipeline @@ -286,7 +289,7 @@ export const ReplicationPipelineStatus = () => { Status unavailable while pipeline is {config.badge.toLowerCase()}

) : ( -
+
{statusConfig.description}
diff --git a/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus.utils.tsx b/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus.utils.tsx index af4207cc5c545..cb2370426151e 100644 --- a/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus.utils.tsx +++ b/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus.utils.tsx @@ -56,9 +56,10 @@ export const getDisabledStateConfig = ({ const { title, message, badge } = getPipelineStateMessages(requestStatus, statusName) // Get icon and colors based on current state - const isEnabling = requestStatus === PipelineStatusRequestStatus.EnableRequested - const isDisabling = requestStatus === PipelineStatusRequestStatus.DisableRequested - const isTransitioning = isEnabling || isDisabling + const isEnabling = requestStatus === PipelineStatusRequestStatus.StartRequested + const isDisabling = requestStatus === PipelineStatusRequestStatus.StopRequested + const isRestarting = requestStatus === PipelineStatusRequestStatus.RestartRequested + const isTransitioning = isEnabling || isDisabling || isRestarting const icon = isTransitioning ? ( @@ -72,37 +73,38 @@ export const getDisabledStateConfig = ({ ) - const colors = isEnabling - ? { - bg: 'bg-brand-50', - text: 'text-brand-900', - subtext: 'text-brand-700', - iconBg: 'bg-brand-600', - icon: 'text-white dark:text-black', - } - : isDisabling || statusName === 'starting' || statusName === 'unknown' + const colors = + isEnabling || isRestarting ? { - bg: 'bg-warning-50', - text: 'text-warning-900', - subtext: 'text-warning-700', - iconBg: 'bg-warning-600', + bg: 'bg-brand-50', + text: 'text-brand-900', + subtext: 'text-brand-700', + iconBg: 'bg-brand-600', icon: 'text-white dark:text-black', } - : statusName === 'failed' + : isDisabling || statusName === 'starting' || statusName === 'unknown' ? { - bg: 'bg-destructive-50', - text: 'text-destructive-900', - subtext: 'text-destructive-700', - iconBg: 'bg-destructive-600', - icon: 'text-white dark:text-black', - } - : { - bg: 'bg-surface-100', - text: 'text-foreground', - subtext: 'text-foreground-light', - iconBg: 'bg-foreground-lighter', + bg: 'bg-warning-50', + text: 'text-warning-900', + subtext: 'text-warning-700', + iconBg: 'bg-warning-600', icon: 'text-white dark:text-black', } + : statusName === 'failed' + ? { + bg: 'bg-destructive-50', + text: 'text-destructive-900', + subtext: 'text-destructive-700', + iconBg: 'bg-destructive-600', + icon: 'text-white dark:text-black', + } + : { + bg: 'bg-surface-100', + text: 'text-foreground', + subtext: 'text-foreground-light', + iconBg: 'bg-foreground-lighter', + icon: 'text-white dark:text-black', + } return { title, message, badge, icon, colors } } diff --git a/apps/studio/components/interfaces/Database/Replication/RetryOptionsDropdown.tsx b/apps/studio/components/interfaces/Database/Replication/RetryOptionsDropdown.tsx index 350bd910bb449..147f8cc5befb3 100644 --- a/apps/studio/components/interfaces/Database/Replication/RetryOptionsDropdown.tsx +++ b/apps/studio/components/interfaces/Database/Replication/RetryOptionsDropdown.tsx @@ -40,7 +40,7 @@ export const RetryOptionsDropdown = ({ tableId, tableName }: RetryOptionsDropdow const { mutate: rollbackTable, isLoading: isRollingBack } = useRollbackTableMutation({ onSuccess: (_, vars) => { const { projectRef, pipelineId } = vars - toast.success(`Table "${tableName}" rolled back successfully`) + toast.success(`Table "${tableName}" rolled back successfully and pipeline is being restarted`) startPipeline({ projectRef, pipelineId }) }, onError: (error, vars) => { diff --git a/apps/studio/components/interfaces/Database/Replication/RowMenu.tsx b/apps/studio/components/interfaces/Database/Replication/RowMenu.tsx index ae5734b2e9c1c..7e083058ed5c0 100644 --- a/apps/studio/components/interfaces/Database/Replication/RowMenu.tsx +++ b/apps/studio/components/interfaces/Database/Replication/RowMenu.tsx @@ -20,7 +20,12 @@ import { DropdownMenuTrigger, } from 'ui' import ShimmeringLoader from 'ui-patterns/ShimmeringLoader' -import { PIPELINE_ERROR_MESSAGES } from './Pipeline.utils' +import { + PIPELINE_DISABLE_ALLOWED_FROM, + PIPELINE_ENABLE_ALLOWED_FROM, + PIPELINE_ERROR_MESSAGES, + getStatusName, +} from './Pipeline.utils' import { PipelineStatusName } from './PipelineStatus' interface RowMenuProps { @@ -44,13 +49,6 @@ export const RowMenu = ({ }: RowMenuProps) => { const { ref: projectRef } = useParams() - const getStatusName = (status: any) => { - if (status && typeof status === 'object' && 'name' in status) { - return status.name - } - return status - } - const statusName = getStatusName(pipelineStatus) const pipelineEnabled = statusName !== PipelineStatusName.STOPPED @@ -67,10 +65,13 @@ export const RowMenu = ({ } try { + // Only show 'enabling' when transitioning from allowed states + if (PIPELINE_ENABLE_ALLOWED_FROM.includes(statusName as any)) { + setGlobalRequestStatus(pipeline.id, PipelineStatusRequestStatus.StartRequested, statusName) + } await startPipeline({ projectRef, pipelineId: pipeline.id }) - toast(`Enabling pipeline ${pipeline.destination_name}`) - setGlobalRequestStatus(pipeline.id, PipelineStatusRequestStatus.EnableRequested) } catch (error) { + setGlobalRequestStatus(pipeline.id, PipelineStatusRequestStatus.None) toast.error(PIPELINE_ERROR_MESSAGES.ENABLE_DESTINATION) } } @@ -86,10 +87,13 @@ export const RowMenu = ({ } try { + // Only show 'disabling' when transitioning from allowed states + if (PIPELINE_DISABLE_ALLOWED_FROM.includes(statusName as any)) { + setGlobalRequestStatus(pipeline.id, PipelineStatusRequestStatus.StopRequested, statusName) + } await stopPipeline({ projectRef, pipelineId: pipeline.id }) - toast(`Disabling pipeline ${pipeline.destination_name}`) - setGlobalRequestStatus(pipeline.id, PipelineStatusRequestStatus.DisableRequested) } catch (error) { + setGlobalRequestStatus(pipeline.id, PipelineStatusRequestStatus.None) toast.error(PIPELINE_ERROR_MESSAGES.DISABLE_DESTINATION) } } @@ -109,12 +113,12 @@ export const RowMenu = ({ {pipelineEnabled ? ( -

Disable pipeline

+

Stop pipeline

) : ( -

Enable pipeline

+

Start pipeline

)} diff --git a/apps/studio/data/analytics/functions-req-stats-query.ts b/apps/studio/data/analytics/functions-req-stats-query.ts index d386a07b465f4..5a765b4148f43 100644 --- a/apps/studio/data/analytics/functions-req-stats-query.ts +++ b/apps/studio/data/analytics/functions-req-stats-query.ts @@ -6,7 +6,7 @@ import { analyticsKeys } from './keys' export type FunctionsReqStatsVariables = { projectRef?: string functionId?: string - interval?: operations['FunctionRequestLogsController_getStatus']['parameters']['query']['interval'] + interval?: operations['FunctionsLogsController_getRequestStats']['parameters']['query']['interval'] } export type FunctionsReqStatsResponse = any diff --git a/apps/studio/data/analytics/functions-resource-usage-query.ts b/apps/studio/data/analytics/functions-resource-usage-query.ts index cbb71f1d78e99..ae767d52ebbd0 100644 --- a/apps/studio/data/analytics/functions-resource-usage-query.ts +++ b/apps/studio/data/analytics/functions-resource-usage-query.ts @@ -6,7 +6,7 @@ import { analyticsKeys } from './keys' export type FunctionsResourceUsageVariables = { projectRef?: string functionId?: string - interval?: operations['FunctionResourceLogsController_getStatus']['parameters']['query']['interval'] + interval?: operations['FunctionsLogsController_getRequestStats']['parameters']['query']['interval'] } export type FunctionsResourceUsageResponse = any diff --git a/apps/studio/data/content/content-delete-mutation.ts b/apps/studio/data/content/content-delete-mutation.ts index d5fd129fa3f1b..a8110e1e8f668 100644 --- a/apps/studio/data/content/content-delete-mutation.ts +++ b/apps/studio/data/content/content-delete-mutation.ts @@ -15,7 +15,7 @@ export async function deleteContents( headers: { Version: '2' }, params: { path: { ref: projectRef }, - query: { ids }, + query: { ids: ids.join(',') }, }, signal, }) diff --git a/apps/studio/data/lint/delete-lint-rule-mutation.ts b/apps/studio/data/lint/delete-lint-rule-mutation.ts index 70dca1fdd73f8..cf6538fdff5f3 100644 --- a/apps/studio/data/lint/delete-lint-rule-mutation.ts +++ b/apps/studio/data/lint/delete-lint-rule-mutation.ts @@ -12,7 +12,7 @@ export type LintRuleDeleteVariables = { export async function deleteLintRule({ projectRef, ids }: LintRuleDeleteVariables) { const { data, error } = await del('/platform/projects/{ref}/notifications/advisor/exceptions', { - params: { path: { ref: projectRef }, query: { ids } }, + params: { path: { ref: projectRef }, query: { ids: ids.join(',') } }, }) if (error) handleError(error) diff --git a/apps/studio/data/replication/create-destination-pipeline-mutation.ts b/apps/studio/data/replication/create-destination-pipeline-mutation.ts index 67866a6f18b0a..ce9e285b09258 100644 --- a/apps/studio/data/replication/create-destination-pipeline-mutation.ts +++ b/apps/studio/data/replication/create-destination-pipeline-mutation.ts @@ -1,15 +1,15 @@ import { useMutation, UseMutationOptions, useQueryClient } from '@tanstack/react-query' import { toast } from 'sonner' +import { handleError, post } from 'data/fetchers' import type { ResponseError } from 'types' import { replicationKeys } from './keys' -import { handleError, post } from 'data/fetchers' export type BigQueryDestinationConfig = { projectId: string datasetId: string serviceAccountKey: string - maxStalenessMins: number + maxStalenessMins?: number } export type CreateDestinationPipelineParams = { @@ -21,7 +21,7 @@ export type CreateDestinationPipelineParams = { sourceId: number pipelineConfig: { publicationName: string - batch: { + batch?: { maxSize: number maxFillMs: number } @@ -35,10 +35,7 @@ async function createDestinationPipeline( destinationConfig: { bigQuery: { projectId, datasetId, serviceAccountKey, maxStalenessMins }, }, - pipelineConfig: { - publicationName, - batch: { maxSize, maxFillMs }, - }, + pipelineConfig: { publicationName, batch }, sourceId, }: CreateDestinationPipelineParams, signal?: AbortSignal @@ -48,23 +45,27 @@ async function createDestinationPipeline( const { data, error } = await post('/platform/replication/{ref}/destinations-pipelines', { params: { path: { ref: projectRef } }, body: { + source_id: sourceId, destination_name: destinationName, destination_config: { big_query: { project_id: projectId, dataset_id: datasetId, service_account_key: serviceAccountKey, - max_staleness_mins: maxStalenessMins, + ...(maxStalenessMins != null && { max_staleness_mins: maxStalenessMins }), }, }, pipeline_config: { publication_name: publicationName, - batch: { - max_size: maxSize, - max_fill_ms: maxFillMs, - }, + ...(batch + ? { + batch: { + max_size: batch.maxSize, + max_fill_ms: batch.maxFillMs, + }, + } + : {}), }, - source_id: sourceId, }, signal, }) @@ -92,8 +93,12 @@ export const useCreateDestinationPipelineMutation = ({ { async onSuccess(data, variables, context) { const { projectRef } = variables - await queryClient.invalidateQueries(replicationKeys.destinations(projectRef)) - await queryClient.invalidateQueries(replicationKeys.pipelines(projectRef)) + + await Promise.all([ + queryClient.invalidateQueries(replicationKeys.destinations(projectRef)), + queryClient.invalidateQueries(replicationKeys.pipelines(projectRef)), + ]) + await onSuccess?.(data, variables, context) }, async onError(data, variables, context) { diff --git a/apps/studio/data/replication/delete-destination-pipeline-mutation.ts b/apps/studio/data/replication/delete-destination-pipeline-mutation.ts index 2ccb247342311..91766e5a80db1 100644 --- a/apps/studio/data/replication/delete-destination-pipeline-mutation.ts +++ b/apps/studio/data/replication/delete-destination-pipeline-mutation.ts @@ -1,9 +1,9 @@ import { useMutation, UseMutationOptions, useQueryClient } from '@tanstack/react-query' import { toast } from 'sonner' +import { del, handleError } from 'data/fetchers' import type { ResponseError } from 'types' import { replicationKeys } from './keys' -import { handleError, del } from 'data/fetchers' export type DeleteDestinationPipelineParams = { projectRef: string @@ -47,8 +47,19 @@ export const useDeleteDestinationPipelineMutation = ({ (vars) => deleteDestinationPipeline(vars), { async onSuccess(data, variables, context) { - const { projectRef } = variables - await queryClient.invalidateQueries(replicationKeys.destinations(projectRef)) + const { projectRef, destinationId, pipelineId } = variables + + await Promise.all([ + queryClient.invalidateQueries(replicationKeys.destinations(projectRef)), + queryClient.invalidateQueries(replicationKeys.pipelines(projectRef)), + queryClient.invalidateQueries(replicationKeys.pipelineById(projectRef, pipelineId)), + queryClient.invalidateQueries(replicationKeys.pipelinesStatus(projectRef, pipelineId)), + queryClient.invalidateQueries( + replicationKeys.pipelinesReplicationStatus(projectRef, pipelineId) + ), + queryClient.invalidateQueries(replicationKeys.destinationById(projectRef, destinationId)), + ]) + await onSuccess?.(data, variables, context) }, async onError(data, variables, context) { diff --git a/apps/studio/data/replication/update-destination-pipeline-mutation.ts b/apps/studio/data/replication/update-destination-pipeline-mutation.ts index ee0cd6539ced2..2c04b48db4cdf 100644 --- a/apps/studio/data/replication/update-destination-pipeline-mutation.ts +++ b/apps/studio/data/replication/update-destination-pipeline-mutation.ts @@ -1,15 +1,15 @@ import { useMutation, UseMutationOptions, useQueryClient } from '@tanstack/react-query' import { toast } from 'sonner' +import { handleError, post } from 'data/fetchers' import type { ResponseError } from 'types' import { replicationKeys } from './keys' -import { handleError, post } from 'data/fetchers' export type BigQueryDestinationConfig = { projectId: string datasetId: string serviceAccountKey: string - maxStalenessMins: number + maxStalenessMins?: number } export type UpdateDestinationPipelineParams = { @@ -23,7 +23,7 @@ export type UpdateDestinationPipelineParams = { sourceId: number pipelineConfig: { publicationName: string - batch: { + batch?: { maxSize: number maxFillMs: number } @@ -39,10 +39,7 @@ async function updateDestinationPipeline( destinationConfig: { bigQuery: { projectId, datasetId, serviceAccountKey, maxStalenessMins }, }, - pipelineConfig: { - publicationName, - batch: { maxSize, maxFillMs }, - }, + pipelineConfig: { publicationName, batch }, sourceId, }: UpdateDestinationPipelineParams, signal?: AbortSignal @@ -60,15 +57,17 @@ async function updateDestinationPipeline( project_id: projectId, dataset_id: datasetId, service_account_key: serviceAccountKey, - max_staleness_mins: maxStalenessMins, + ...(maxStalenessMins != null && { max_staleness_mins: maxStalenessMins }), }, }, pipeline_config: { publication_name: publicationName, - batch: { - max_size: maxSize, - max_fill_ms: maxFillMs, - }, + ...(batch && { + batch: { + max_size: batch.maxSize, + max_fill_ms: batch.maxFillMs, + }, + }), }, source_id: sourceId, }, @@ -99,8 +98,12 @@ export const useUpdateDestinationPipelineMutation = ({ { async onSuccess(data, variables, context) { const { projectRef } = variables - await queryClient.invalidateQueries(replicationKeys.destinations(projectRef)) - await queryClient.invalidateQueries(replicationKeys.pipelines(projectRef)) + + await Promise.all([ + queryClient.invalidateQueries(replicationKeys.destinations(projectRef)), + queryClient.invalidateQueries(replicationKeys.pipelines(projectRef)), + ]) + await onSuccess?.(data, variables, context) }, async onError(data, variables, context) { diff --git a/apps/studio/state/replication-pipeline-request-status.tsx b/apps/studio/state/replication-pipeline-request-status.tsx index 722c4de9120b5..36561ab544440 100644 --- a/apps/studio/state/replication-pipeline-request-status.tsx +++ b/apps/studio/state/replication-pipeline-request-status.tsx @@ -1,14 +1,28 @@ -import { createContext, useContext, useState, ReactNode, useCallback } from 'react' +import { + createContext, + useContext, + useState, + ReactNode, + useCallback, + useRef, + useEffect, +} from 'react' export enum PipelineStatusRequestStatus { None = 'None', - EnableRequested = 'EnableRequested', - DisableRequested = 'DisableRequested', + StartRequested = 'StartRequested', + StopRequested = 'StopRequested', + RestartRequested = 'RestartRequested', } interface PipelineRequestStatusContextType { requestStatus: Record - setRequestStatus: (pipelineId: number, status: PipelineStatusRequestStatus) => void + pipelineStatusSnapshot: Record + setRequestStatus: ( + pipelineId: number, + status: PipelineStatusRequestStatus, + snapshotStatus?: string + ) => void getRequestStatus: (pipelineId: number) => PipelineStatusRequestStatus updatePipelineStatus: (pipelineId: number, backendStatus: string | undefined) => void } @@ -25,12 +39,58 @@ export const PipelineRequestStatusProvider = ({ children }: PipelineRequestStatu const [requestStatus, setRequestStatusState] = useState< Record >({}) + const [pipelineStatusSnapshot, setPipelineStatusSnapshot] = useState< + Record + >({}) + const timeoutsRef = useRef>({}) + const REQUEST_TIMEOUT_MS = 10_000 - const setRequestStatus = (pipelineId: number, status: PipelineStatusRequestStatus) => { + const setRequestStatus = ( + pipelineId: number, + status: PipelineStatusRequestStatus, + snapshotStatus?: string + ) => { setRequestStatusState((prev) => ({ ...prev, [pipelineId]: status, })) + setPipelineStatusSnapshot((prev) => { + if (status === PipelineStatusRequestStatus.None) { + const { [pipelineId]: _omit, ...rest } = prev + return rest + } + // Only set snapshot when provided to avoid undefined entries + if (snapshotStatus !== undefined) { + return { ...prev, [pipelineId]: snapshotStatus } + } + return prev + }) + + // Clear existing timeout for this pipeline + const existing = timeoutsRef.current[pipelineId] + if (existing !== undefined) { + clearTimeout(existing) + delete timeoutsRef.current[pipelineId] + } + + // Start auto-reset timer for non-None states + if (status !== PipelineStatusRequestStatus.None) { + const id = window.setTimeout(() => { + // If still pending, clear to None to show backend state + setRequestStatusState((prev) => { + if (prev[pipelineId] && prev[pipelineId] !== PipelineStatusRequestStatus.None) { + return { ...prev, [pipelineId]: PipelineStatusRequestStatus.None } + } + return prev + }) + setPipelineStatusSnapshot((prev) => { + const { [pipelineId]: _omit, ...rest } = prev + return rest + }) + delete timeoutsRef.current[pipelineId] + }, REQUEST_TIMEOUT_MS) + timeoutsRef.current[pipelineId] = id + } } const getRequestStatus = (pipelineId: number): PipelineStatusRequestStatus => { @@ -38,25 +98,32 @@ export const PipelineRequestStatusProvider = ({ children }: PipelineRequestStatu } const updatePipelineStatus = useCallback( - (pipelineId: number, backendStatus: string | undefined) => { + (pipelineId: number, newStatus: string | undefined) => { const currentRequestStatus = requestStatus[pipelineId] || PipelineStatusRequestStatus.None + if (currentRequestStatus === PipelineStatusRequestStatus.None) return - if ( - (currentRequestStatus === PipelineStatusRequestStatus.EnableRequested && - (backendStatus === 'started' || backendStatus === 'failed')) || - (currentRequestStatus === PipelineStatusRequestStatus.DisableRequested && - (backendStatus === 'stopped' || backendStatus === 'failed')) - ) { + // Only remove when backend status differs from snapshot + const snapshotStatus = pipelineStatusSnapshot[pipelineId] + if (newStatus !== snapshotStatus) { setRequestStatus(pipelineId, PipelineStatusRequestStatus.None) } }, - [requestStatus, setRequestStatus] + [requestStatus, pipelineStatusSnapshot] ) + // Cleanup all timers on unmount + useEffect(() => { + return () => { + Object.values(timeoutsRef.current).forEach((id) => clearTimeout(id)) + timeoutsRef.current = {} + } + }, []) + return (