@@ -9,9 +9,11 @@ import (
99	"time" 
1010
1111	"github.com/google/uuid" 
12+ 	"github.com/jackc/pgerrcode" 
1213	"github.com/jackc/pgx/v5" 
1314	"github.com/jackc/pgx/v5/pgtype" 
1415	tEnums "go.temporal.io/api/enums/v1" 
16+ 	"go.temporal.io/api/serviceerror" 
1517	"go.temporal.io/api/workflowservice/v1" 
1618	"go.temporal.io/sdk/client" 
1719	"google.golang.org/protobuf/proto" 
@@ -57,8 +59,14 @@ func (h *FlowRequestHandler) getPeerID(ctx context.Context, peerName string) (in
5759	return  id .Int32 , nil 
5860}
5961
62+ func  (h  * FlowRequestHandler ) cdcJobEntryExists (ctx  context.Context , flowJobName  string ) (bool , error ) {
63+ 	var  exists  bool 
64+ 	err  :=  h .pool .QueryRow (ctx , `SELECT EXISTS(SELECT 1 FROM flows WHERE name = $1)` , flowJobName ).Scan (& exists )
65+ 	return  exists , err 
66+ }
67+ 
6068func  (h  * FlowRequestHandler ) createCdcJobEntry (ctx  context.Context ,
61- 	req  * protos.CreateCDCFlowRequest , workflowID  string ,
69+ 	req  * protos.CreateCDCFlowRequest , workflowID  string ,  idempotent   bool , 
6270) error  {
6371	sourcePeerID , srcErr  :=  h .getPeerID (ctx , req .ConnectionConfigs .SourceName )
6472	if  srcErr  !=  nil  {
@@ -77,11 +85,11 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
7785		return  fmt .Errorf ("unable to marshal flow config: %w" , err )
7886	}
7987
80- 	if  _ , err  : =  h .pool .Exec (ctx ,
81- 		`INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, 
82- 		description, source_table_identifier, destination_table_identifier)  VALUES ($1,$2,$3,$4,$5,$6,'gRPC','',' ')` ,
88+ 	if  _ , err  =  h .pool .Exec (ctx ,
89+ 		`INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status,	description)  
90+ 		VALUES ($1,$2,$3,$4,$5,$6,'gRPC')` ,
8391		workflowID , req .ConnectionConfigs .FlowJobName , sourcePeerID , destinationPeerID , cfgBytes , protos .FlowStatus_STATUS_SETUP ,
84- 	); err  !=  nil  {
92+ 	); err  !=  nil  &&   ! ( idempotent   &&   shared . IsSQLStateError ( err ,  pgerrcode . UniqueViolation ))  {
8593		return  fmt .Errorf ("unable to insert into flows table for flow %s: %w" ,
8694			req .ConnectionConfigs .FlowJobName , err )
8795	}
@@ -113,9 +121,8 @@ func (h *FlowRequestHandler) createQRepJobEntry(ctx context.Context,
113121
114122	flowName  :=  req .QrepConfig .FlowJobName 
115123	if  _ , err  :=  h .pool .Exec (ctx , `INSERT INTO flows(workflow_id,name,source_peer,destination_peer,config_proto,status, 
116- 		description, destination_table_identifier,  query_string) VALUES ($1,$2,$3,$4,$5,$6,'gRPC',$7,$8 ) 
124+ 		description, query_string) VALUES ($1,$2,$3,$4,$5,$6,'gRPC',$7) 
117125	` , workflowID , flowName , sourcePeerID , destinationPeerID , cfgBytes , protos .FlowStatus_STATUS_RUNNING ,
118- 		req .QrepConfig .DestinationTableIdentifier ,
119126		req .QrepConfig .Query ,
120127	); err  !=  nil  {
121128		return  fmt .Errorf ("unable to insert into flows table for flow %s with source table %s: %w" ,
@@ -135,30 +142,74 @@ func (h *FlowRequestHandler) CreateCDCFlow(
135142	}
136143	cfg .Version  =  internalVersion 
137144
138- 	// For resync, we validate the mirror before dropping it and getting to this step. 
139- 	// There is no point validating again here if it's a resync - the mirror is dropped already 
140- 	if  ! cfg .Resync  {
141- 		if  _ , err  :=  h .ValidateCDCMirror (ctx , req ); err  !=  nil  {
142- 			slog .ErrorContext (ctx , "validate mirror error" , slog .Any ("error" , err ))
143- 			return  nil , err 
145+ 	if  ! req .AttachToExisting  {
146+ 		if  exists , err  :=  h .cdcJobEntryExists (ctx , cfg .FlowJobName ); err  !=  nil  {
147+ 			return  nil , NewInternalApiError (fmt .Errorf ("unable to check flow job entry: %w" , err ))
148+ 		} else  if  exists  {
149+ 			return  nil , NewAlreadyExistsApiError (fmt .Errorf ("flow already exists: %s" , cfg .FlowJobName ))
144150		}
145151	}
146152
147- 	workflowID  :=  fmt .Sprintf ("%s-peerflow-%s" , cfg .FlowJobName , uuid .New ())
153+ 	workflowID  :=  getWorkflowID (cfg .FlowJobName )
154+ 	var  errNotFound  * serviceerror.NotFound 
155+ 	desc , err  :=  h .temporalClient .DescribeWorkflow (ctx , workflowID , "" )
156+ 	if  err  !=  nil  &&  ! errors .As (err , & errNotFound ) {
157+ 		return  nil , NewInternalApiError (fmt .Errorf ("failed to query the workflow execution: %w" , err ))
158+ 	} else  if  err  ==  nil  {
159+ 		// If workflow is actively running, handle based on AttachToExisting 
160+ 		// Workflows in terminal states are fine 
161+ 		if  desc .WorkflowExecutionMetadata .Status  ==  tEnums .WORKFLOW_EXECUTION_STATUS_RUNNING  || 
162+ 			desc .WorkflowExecutionMetadata .Status  ==  tEnums .WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW  {
163+ 			if  req .AttachToExisting  {
164+ 				// Idempotent attach to running workflow 
165+ 				return  & protos.CreateCDCFlowResponse {
166+ 					WorkflowId : workflowID ,
167+ 				}, nil 
168+ 			} else  {
169+ 				// Can't create duplicate of running workflow 
170+ 				return  nil , NewAlreadyExistsApiError (fmt .Errorf ("workflow already exists for flow: %s" , cfg .FlowJobName ))
171+ 			}
172+ 		}
173+ 	}
174+ 	// No running workflow, do the validations and start a new one 
175+ 
176+ 	// Use idempotent validation that skips mirror existence check 
177+ 	if  _ , err  :=  h .validateCDCMirrorImpl (ctx , req , true ); err  !=  nil  {
178+ 		slog .ErrorContext (ctx , "validate mirror error" , slog .Any ("error" , err ))
179+ 		return  nil , NewInternalApiError (fmt .Errorf ("invalid mirror: %w" , err ))
180+ 	}
181+ 
182+ 	if  resp , err  :=  h .createCDCFlow (ctx , req , workflowID ); err  !=  nil  {
183+ 		return  nil , NewInternalApiError (err )
184+ 	} else  {
185+ 		return  resp , nil 
186+ 	}
187+ }
188+ 
189+ func  getWorkflowID (flowName  string ) string  {
190+ 	return  flowName  +  "-peerflow" 
191+ }
192+ 
193+ func  (h  * FlowRequestHandler ) createCDCFlow (
194+ 	ctx  context.Context , req  * protos.CreateCDCFlowRequest , workflowID  string ,
195+ ) (* protos.CreateCDCFlowResponse , error ) {
196+ 	cfg  :=  req .ConnectionConfigs 
148197	workflowOptions  :=  client.StartWorkflowOptions {
149- 		ID :                    workflowID ,
150- 		TaskQueue :             h .peerflowTaskQueueID ,
151- 		TypedSearchAttributes : shared .NewSearchAttributes (cfg .FlowJobName ),
198+ 		ID :                       workflowID ,
199+ 		TaskQueue :                h .peerflowTaskQueueID ,
200+ 		TypedSearchAttributes :    shared .NewSearchAttributes (cfg .FlowJobName ),
201+ 		WorkflowIDConflictPolicy : tEnums .WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING , // two racing requests end up with the same workflow 
202+ 		WorkflowIDReusePolicy :    tEnums .WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE , // but creating the same id as a completed one is allowed 
152203	}
153204
154- 	if  err  :=  h .createCdcJobEntry (ctx , req , workflowID ); err  !=  nil  {
205+ 	if  err  :=  h .createCdcJobEntry (ctx , req , workflowID ,  true ); err  !=  nil  {
155206		slog .ErrorContext (ctx , "unable to create flow job entry" , slog .Any ("error" , err ))
156- 		return  nil , NewInternalApiError ( fmt .Errorf ("unable to create flow job entry: %w" , err ) )
207+ 		return  nil , fmt .Errorf ("unable to create flow job entry: %w" , err )
157208	}
158209
159210	if  _ , err  :=  h .temporalClient .ExecuteWorkflow (ctx , workflowOptions , peerflow .CDCFlowWorkflow , cfg , nil ); err  !=  nil  {
160211		slog .ErrorContext (ctx , "unable to start PeerFlow workflow" , slog .Any ("error" , err ))
161- 		return  nil , NewInternalApiError ( fmt .Errorf ("unable to start PeerFlow workflow: %w" , err ) )
212+ 		return  nil , fmt .Errorf ("unable to start PeerFlow workflow: %w" , err )
162213	}
163214
164215	return  & protos.CreateCDCFlowResponse {
@@ -257,6 +308,7 @@ func (h *FlowRequestHandler) shutdownFlow(
257308		DropFlowStats :         deleteStats ,
258309		FlowConnectionConfigs : cdcConfig ,
259310		SkipDestinationDrop :   skipDestinationDrop ,
311+ 		// NOTE: Resync is false here during snapshot-only resync 
260312	})
261313	if  err  !=  nil  {
262314		slog .ErrorContext (ctx , "unable to start DropFlow workflow" , logs , slog .Any ("error" , err ))
@@ -344,7 +396,7 @@ func (h *FlowRequestHandler) FlowStateChange(
344396			}
345397		case  protos .FlowStatus_STATUS_RESYNC :
346398			if  currState  ==  protos .FlowStatus_STATUS_COMPLETED  {
347- 				changeErr  =  h .resyncMirror (ctx , req .FlowJobName , req .DropMirrorStats )
399+ 				changeErr  =  h .resyncCompletedSnapshot (ctx , req .FlowJobName , req .DropMirrorStats )
348400			} else  if  isCDC , err  :=  h .isCDCFlow (ctx , req .FlowJobName ); err  !=  nil  {
349401				return  nil , NewInternalApiError (fmt .Errorf ("unable to determine if mirror is cdc: %w" , err ))
350402			} else  if  ! isCDC  {
@@ -489,8 +541,7 @@ func (h *FlowRequestHandler) getWorkflowID(ctx context.Context, flowJobName stri
489541	return  workflowID , nil 
490542}
491543
492- // only supports CDC resync for now 
493- func  (h  * FlowRequestHandler ) resyncMirror (
544+ func  (h  * FlowRequestHandler ) resyncCompletedSnapshot (
494545	ctx  context.Context ,
495546	flowName  string ,
496547	dropStats  bool ,
@@ -527,9 +578,11 @@ func (h *FlowRequestHandler) resyncMirror(
527578		return  err 
528579	}
529580
530- 	if  _ , err  :=  h .CreateCDCFlow (ctx , & protos.CreateCDCFlowRequest {
531- 		ConnectionConfigs : config ,
532- 	}); err  !=  nil  {
581+ 	workflowID  :=  getWorkflowID (config .FlowJobName )
582+ 	if  _ , err  :=  h .createCDCFlow (ctx ,
583+ 		& protos.CreateCDCFlowRequest {ConnectionConfigs : config },
584+ 		workflowID ,
585+ 	); err  !=  nil  {
533586		return  err 
534587	}
535588	return  nil 
0 commit comments