Skip to content

Commit 5d8a4bc

Browse files
authored
move src/dst getPeerType out of cloneTable loop (#3559)
1 parent 2f272a8 commit 5d8a4bc

File tree

2 files changed

+16
-9
lines changed

2 files changed

+16
-9
lines changed

flow/connectors/core.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,9 +317,9 @@ func LoadPeerType(ctx context.Context, catalogPool shared.CatalogPool, peerName
317317
err := row.Scan(&dbtype)
318318
if err != nil {
319319
if errors.Is(err, pgx.ErrNoRows) {
320-
return 0, fmt.Errorf("peer not found: %s", peerName)
320+
return 0, exceptions.NewNotFoundError(fmt.Errorf("peer not found: %s", peerName))
321321
}
322-
return 0, fmt.Errorf("failed to load peer type for %s: %w", peerName, err)
322+
return 0, exceptions.NewCatalogError(fmt.Errorf("failed to load peer type for %s: %w", peerName, err))
323323
}
324324
return dbtype, nil
325325
}

flow/workflows/snapshot_flow.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ func (s *SnapshotFlowExecution) cloneTable(
106106
boundSelector *shared.BoundSelector,
107107
snapshotName string,
108108
mapping *protos.TableMapping,
109+
sourcePeerType protos.DBType,
110+
destinationPeerType protos.DBType,
109111
) error {
110112
flowName := s.config.FlowJobName
111113
cloneLog := slog.Group("clone-log",
@@ -173,9 +175,7 @@ func (s *SnapshotFlowExecution) cloneTable(
173175
// usually MySQL supports double quotes with ANSI_QUOTES, but Vitess doesn't
174176
// Vitess currently only supports initial load so change here is enough
175177
srcTableEscaped := parsedSrcTable.String()
176-
if dbtype, err := getPeerType(ctx, s.config.SourceName); err != nil {
177-
return err
178-
} else if dbtype == protos.DBType_MYSQL {
178+
if sourcePeerType == protos.DBType_MYSQL {
179179
srcTableEscaped = parsedSrcTable.MySQL()
180180
}
181181

@@ -208,9 +208,7 @@ func (s *SnapshotFlowExecution) cloneTable(
208208

209209
// ensure document IDs are synchronized across initial load and CDC
210210
// for the same document
211-
if dbtype, err := getPeerType(ctx, s.config.DestinationName); err != nil {
212-
return err
213-
} else if dbtype == protos.DBType_ELASTICSEARCH {
211+
if destinationPeerType == protos.DBType_ELASTICSEARCH {
214212
if err := initTableSchema(); err != nil {
215213
return err
216214
}
@@ -277,6 +275,15 @@ func (s *SnapshotFlowExecution) cloneTables(
277275

278276
boundSelector := shared.NewBoundSelector(ctx, "CloneTablesSelector", maxParallelClones)
279277

278+
sourcePeerType, err := getPeerType(ctx, s.config.SourceName)
279+
if err != nil {
280+
return err
281+
}
282+
destinationPeerType, err := getPeerType(ctx, s.config.DestinationName)
283+
if err != nil {
284+
return err
285+
}
286+
280287
for _, v := range s.config.TableMappings {
281288
source := v.SourceTableIdentifier
282289
destination := v.DestinationTableIdentifier
@@ -287,7 +294,7 @@ func (s *SnapshotFlowExecution) cloneTables(
287294
if v.PartitionKey == "" {
288295
v.PartitionKey = res.TableDefaultPartitionKeyMapping[source]
289296
}
290-
if err := s.cloneTable(ctx, boundSelector, snapshotName, v); err != nil {
297+
if err := s.cloneTable(ctx, boundSelector, snapshotName, v, sourcePeerType, destinationPeerType); err != nil {
291298
s.logger.Error("failed to start clone child workflow", slog.Any("error", err))
292299
return err
293300
}

0 commit comments

Comments
 (0)