Skip to content

Commit 68b39ed

Browse files
Optimisation: Do not open destination connection for last offset fetch unless Postgres to Postgres (#3367)
Decoupling from #3206 and addressing the [comment](#3206 (comment)) there. This PR makes it so that we do not connect to the destination peer to get the last offset for sync flow. --------- Co-authored-by: Philip Dubé <[email protected]>
1 parent a8e2b4f commit 68b39ed

File tree

2 files changed

+12
-15
lines changed

2 files changed

+12
-15
lines changed

flow/activities/flowable_core.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"google.golang.org/protobuf/proto"
2222

2323
"github.com/PeerDB-io/peerdb/flow/connectors"
24-
connmysql "github.com/PeerDB-io/peerdb/flow/connectors/mysql"
24+
connmetadata "github.com/PeerDB-io/peerdb/flow/connectors/external_metadata"
2525
connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres"
2626
"github.com/PeerDB-io/peerdb/flow/connectors/utils/monitoring"
2727
"github.com/PeerDB-io/peerdb/flow/generated/protos"
@@ -139,17 +139,20 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
139139
}
140140

141141
lastOffset, err := func() (model.CdcCheckpoint, error) {
142-
if myConn, isMy := any(srcConn).(*connmysql.MySqlConnector); isMy {
143-
return myConn.GetLastOffset(ctx, config.FlowJobName)
144-
} else {
145-
dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName)
142+
// special case pg-pg replication, where offsets are stored on destination instead of catalog
143+
if _, isPg := any(srcConn).(*connpostgres.PostgresConnector); isPg {
144+
dstPgConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, config.Env, a.CatalogPool, config.DestinationName)
146145
if err != nil {
147-
return model.CdcCheckpoint{}, fmt.Errorf("failed to get destination connector: %w", err)
146+
if !errors.Is(err, errors.ErrUnsupported) {
147+
return model.CdcCheckpoint{}, fmt.Errorf("failed to get destination connector to get last offset: %w", err)
148+
}
149+
// else fallthrough to loading from catalog
150+
} else {
151+
return dstPgConn.GetLastOffset(ctx, config.FlowJobName)
148152
}
149-
defer connectors.CloseConnector(ctx, dstConn)
150-
151-
return dstConn.GetLastOffset(ctx, config.FlowJobName)
152153
}
154+
pgMetadata := connmetadata.NewPostgresMetadataFromCatalog(logger, a.CatalogPool)
155+
return pgMetadata.GetLastOffset(ctx, flowName)
153156
}()
154157
if err != nil {
155158
return nil, a.Alerter.LogFlowError(ctx, flowName, err)

flow/connectors/core.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,12 +167,6 @@ type CDCSyncConnectorCore interface {
167167
// SetupMetadataTables creates the metadata table [PEERDB_MIRROR_JOBS] if necessary.
168168
SetupMetadataTables(ctx context.Context) error
169169

170-
// GetLastOffset gets the last offset from the metadata table on the destination
171-
GetLastOffset(ctx context.Context, jobName string) (model.CdcCheckpoint, error)
172-
173-
// SetLastOffset updates the last offset on the metadata table on the destination
174-
SetLastOffset(ctx context.Context, jobName string, lastOffset model.CdcCheckpoint) error
175-
176170
// GetLastSyncBatchID gets the last batch synced to the destination from the metadata table
177171
GetLastSyncBatchID(ctx context.Context, jobName string) (int64, error)
178172

0 commit comments

Comments
 (0)