Skip to content

Commit c526551

Browse files
Last offset fetch: prevent destination connection by returning early (#3396)
Implementation in #3367 misses the fact that GetConnector which we call under the hood opens a destination connection, and only after we successfully get a connector do we assert that the connector supports T. This PR clones GetByNameAs and passes in an `expectedType` parameter which is used to error out with unsupported early
1 parent 3e220c5 commit c526551

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

flow/activities/flowable_core.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
143143

144144
lastOffset, err := func() (model.CdcCheckpoint, error) {
145145
// special case pg-pg replication, where offsets are stored on destination instead of catalog
146-
if _, isPg := any(srcConn).(*connpostgres.PostgresConnector); isPg {
147-
dstPgConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, config.Env, a.CatalogPool, config.DestinationName)
146+
if _, isSourcePg := any(srcConn).(*connpostgres.PostgresConnector); isSourcePg {
147+
dstPgConn, err := connectors.GetPostgresConnectorByName(ctx, config.Env, a.CatalogPool, config.DestinationName)
148148
if err != nil {
149149
if !errors.Is(err, errors.ErrUnsupported) {
150150
return model.CdcCheckpoint{}, fmt.Errorf("failed to get destination connector to get last offset: %w", err)

flow/connectors/core.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,22 @@ func GetByNameAs[T Connector](ctx context.Context, env map[string]string, catalo
485485
return GetAs[T](ctx, env, peer)
486486
}
487487

488+
func GetPostgresConnectorByName(
489+
ctx context.Context,
490+
env map[string]string,
491+
catalogPool shared.CatalogPool,
492+
name string,
493+
) (*connpostgres.PostgresConnector, error) {
494+
peer, err := LoadPeer(ctx, catalogPool, name)
495+
if err != nil {
496+
return nil, err
497+
}
498+
if peer.Type != protos.DBType_POSTGRES {
499+
return nil, errors.ErrUnsupported
500+
}
501+
return GetAs[*connpostgres.PostgresConnector](ctx, env, peer)
502+
}
503+
488504
func CloseConnector(ctx context.Context, conn Connector) {
489505
if err := conn.Close(); err != nil {
490506
internal.LoggerFromCtx(ctx).Error("error closing connector", slog.Any("error", err))

0 commit comments

Comments
 (0)