diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 076fa1f3ff..fb75924e28 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -423,50 +423,63 @@ func PullCdcRecords[Items model.Items]( } } - var standByLastLogged time.Time - cdcRecordsStorage, err := utils.NewCDCStore[Items](ctx, req.Env, p.flowJobName) + var cdcRecordsStorage *utils.CDCStore[Items] + var totalRecords int64 + cdcStoreEnabled, err := internal.PeerDBCDCStoreEnabled(ctx, req.Env) if err != nil { return err } + if cdcStoreEnabled { + cdcRecordsStorage, err = utils.NewCDCStore[Items](ctx, req.Env, p.flowJobName) + if err != nil { + return err + } + } defer func() { - if cdcRecordsStorage.IsEmpty() { + if totalRecords == 0 { records.SignalAsEmpty() } - logger.Info("[finished] PullRecords", slog.Int("records", cdcRecordsStorage.Len())) - if err := cdcRecordsStorage.Close(); err != nil { - logger.Warn("failed to clean up records storage", slog.Any("error", err)) + logger.Info("[finished] PullRecords", slog.Int64("records", totalRecords)) + if cdcRecordsStorage != nil { + if err := cdcRecordsStorage.Close(); err != nil { + logger.Warn("failed to clean up records storage", slog.Any("error", err)) + } } }() logger.Info("pulling records start") shutdown := shared.Interval(ctx, time.Minute, func() { - logger.Info("pulling records", slog.Int("records", cdcRecordsStorage.Len())) + logger.Info("pulling records", slog.Int64("records", totalRecords)) }) defer shutdown() nextStandbyMessageDeadline := time.Now().Add(req.IdleTimeout) + var standByLastLogged time.Time pkmRequiresResponse := false waitingForCommit := false fetchedBytes := 0 addRecordWithKey := func(key model.TableWithPkey, rec model.Record[Items]) error { - if err := cdcRecordsStorage.Set(logger, key, rec); err != nil { - return err + if cdcRecordsStorage != nil { + if err := cdcRecordsStorage.Set(logger, key, rec); err != nil { + return err + } } if err := records.AddRecord(ctx, rec); err != nil { return err } + totalRecords++ - if cdcRecordsStorage.Len() == 1 { + if totalRecords == 1 { records.SignalAsNotEmpty() nextStandbyMessageDeadline = time.Now().Add(req.IdleTimeout) logger.Info(fmt.Sprintf("pushing the standby deadline to %s", nextStandbyMessageDeadline)) } - if cdcRecordsStorage.Len()%50000 == 0 { + if totalRecords%50000 == 0 { logger.Info("pulling records", - slog.Int("records", cdcRecordsStorage.Len()), + slog.Int64("records", totalRecords), slog.Int("bytes", fetchedBytes), slog.Int("channelLen", records.ChannelLen()), slog.Bool("waitingForCommit", waitingForCommit)) @@ -481,7 +494,7 @@ func PullCdcRecords[Items model.Items]( lastEmptyBatchPkmSentTime := time.Now() for { if pkmRequiresResponse { - if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() { + if totalRecords == 0 && int64(clientXLogPos) > req.ConsumedOffset.Load() { err := p.updateConsumedOffset(ctx, logger, req.FlowJobName, req.ConsumedOffset, clientXLogPos) if err != nil { return err @@ -496,7 +509,7 @@ func PullCdcRecords[Items model.Items]( if time.Since(standByLastLogged) > 10*time.Second { logger.Info("Sent Standby status message", - slog.Int("records", cdcRecordsStorage.Len()), + slog.Int64("records", totalRecords), slog.Int("bytes", fetchedBytes), slog.Int("channelLen", records.ChannelLen()), slog.Bool("waitingForCommit", waitingForCommit)) @@ -505,9 +518,9 @@ func PullCdcRecords[Items model.Items]( } if p.commitLock == nil { - if cdcRecordsStorage.Len() >= int(req.MaxBatchSize) { + if totalRecords >= int64(req.MaxBatchSize) { logger.Info("batch filled, returning currently accumulated records", - slog.Int("records", cdcRecordsStorage.Len()), + slog.Int64("records", totalRecords), slog.Int("bytes", fetchedBytes), slog.Int("channelLen", records.ChannelLen())) return nil @@ -515,7 +528,7 @@ func PullCdcRecords[Items model.Items]( if waitingForCommit { logger.Info("commit received, returning currently accumulated records", - slog.Int("records", cdcRecordsStorage.Len()), + slog.Int64("records", totalRecords), slog.Int("bytes", fetchedBytes), slog.Int("channelLen", records.ChannelLen())) return nil @@ -524,18 +537,18 @@ func PullCdcRecords[Items model.Items]( // if we are past the next standby deadline (?) if time.Now().After(nextStandbyMessageDeadline) { - if !cdcRecordsStorage.IsEmpty() { - logger.Info("standby deadline reached", slog.Int("records", cdcRecordsStorage.Len())) + if totalRecords > 0 { + logger.Info("standby deadline reached", slog.Int64("records", totalRecords)) if p.commitLock == nil { logger.Info("no commit lock, returning currently accumulated records", - slog.Int("records", cdcRecordsStorage.Len()), + slog.Int64("records", totalRecords), slog.Int("bytes", fetchedBytes), slog.Int("channelLen", records.ChannelLen())) return nil } else { logger.Info("commit lock, waiting for commit to return records", - slog.Int("records", cdcRecordsStorage.Len()), + slog.Int64("records", totalRecords), slog.Int("bytes", fetchedBytes), slog.Int("channelLen", records.ChannelLen())) waitingForCommit = true @@ -548,7 +561,7 @@ func PullCdcRecords[Items model.Items]( var receiveCtx context.Context var cancel context.CancelFunc - if cdcRecordsStorage.IsEmpty() { + if totalRecords == 0 { receiveCtx, cancel = context.WithCancel(ctx) } else { receiveCtx, cancel = context.WithDeadline(ctx, nextStandbyMessageDeadline) @@ -567,7 +580,7 @@ func PullCdcRecords[Items model.Items]( if err != nil && p.commitLock == nil { if pgconn.Timeout(err) { logger.Info("Stand-by deadline reached, returning currently accumulated records", - slog.Int("records", cdcRecordsStorage.Len()), + slog.Int64("records", totalRecords), slog.Int("bytes", fetchedBytes), slog.Int("channelLen", records.ChannelLen())) return nil @@ -635,19 +648,19 @@ func PullCdcRecords[Items model.Items]( return err } - latestRecord, ok, err := cdcRecordsStorage.Get(tablePkeyVal) - if err != nil { - return err - } - if ok { - // iterate through unchanged toast cols and set them in new record - updatedCols := r.NewItems.UpdateIfNotExists(latestRecord.GetItems()) - for _, col := range updatedCols { - delete(r.UnchangedToastColumns, col) + if cdcRecordsStorage != nil { + if latestRecord, ok, err := cdcRecordsStorage.Get(tablePkeyVal); err != nil { + return err + } else if ok { + // iterate through unchanged toast cols and set them in new record + updatedCols := r.NewItems.UpdateIfNotExists(latestRecord.GetItems()) + for _, col := range updatedCols { + delete(r.UnchangedToastColumns, col) + } + p.otelManager.Metrics.UnchangedToastValuesCounter.Add(ctx, int64(len(updatedCols)), + metric.WithAttributeSet(attribute.NewSet( + attribute.Bool("backfilled", true)))) } - p.otelManager.Metrics.UnchangedToastValuesCounter.Add(ctx, int64(len(updatedCols)), - metric.WithAttributeSet(attribute.NewSet( - attribute.Bool("backfilled", true)))) } p.otelManager.Metrics.UnchangedToastValuesCounter.Add(ctx, int64(len(r.UnchangedToastColumns)), metric.WithAttributeSet(attribute.NewSet( @@ -686,16 +699,19 @@ func PullCdcRecords[Items model.Items]( return err } - latestRecord, ok, err := cdcRecordsStorage.Get(tablePkeyVal) - if err != nil { - return err - } - if ok { - r.Items = latestRecord.GetItems() - if updateRecord, ok := latestRecord.(*model.UpdateRecord[Items]); ok { - r.UnchangedToastColumns = updateRecord.UnchangedToastColumns + backfilled := false + if cdcRecordsStorage != nil { + if latestRecord, ok, err := cdcRecordsStorage.Get(tablePkeyVal); err != nil { + return err + } else if ok { + r.Items = latestRecord.GetItems() + if updateRecord, ok := latestRecord.(*model.UpdateRecord[Items]); ok { + r.UnchangedToastColumns = updateRecord.UnchangedToastColumns + backfilled = true + } } - } else { + } + if !backfilled { // there is nothing to backfill the items in the delete record with, // so don't update the row with this record // add sentinel value to prevent update statements from selecting @@ -720,9 +736,9 @@ func PullCdcRecords[Items model.Items]( } case *model.MessageRecord[Items]: - // if cdc store empty, we can move lsn, + // if there were no records, we can move lsn, // otherwise push to records so destination can ack once all previous messages processed - if cdcRecordsStorage.IsEmpty() { + if totalRecords == 0 { if int64(clientXLogPos) > req.ConsumedOffset.Load() { if err := p.updateConsumedOffset(ctx, logger, req.FlowJobName, req.ConsumedOffset, clientXLogPos); err != nil { return err diff --git a/flow/connectors/utils/cdc_store.go b/flow/connectors/utils/cdc_store.go index e9bdfaddaf..4dc6ce8219 100644 --- a/flow/connectors/utils/cdc_store.go +++ b/flow/connectors/utils/cdc_store.go @@ -32,7 +32,7 @@ func encVal(val any) ([]byte, error) { return buf.Bytes(), nil } -type cdcStore[Items model.Items] struct { +type CDCStore[Items model.Items] struct { inMemoryRecords map[model.TableWithPkey]model.Record[Items] pebbleDB *pebble.DB flowJobName string @@ -44,7 +44,7 @@ type cdcStore[Items model.Items] struct { numRecordsSwitchThreshold int } -func NewCDCStore[Items model.Items](ctx context.Context, env map[string]string, flowJobName string) (*cdcStore[Items], error) { +func NewCDCStore[Items model.Items](ctx context.Context, env map[string]string, flowJobName string) (*CDCStore[Items], error) { numRecordsSwitchThreshold, err := internal.PeerDBCDCDiskSpillRecordsThreshold(ctx, env) if err != nil { return nil, fmt.Errorf("failed to get CDC disk spill records threshold: %w", err) @@ -54,7 +54,7 @@ func NewCDCStore[Items model.Items](ctx context.Context, env map[string]string, return nil, fmt.Errorf("failed to get CDC disk spill memory percent threshold: %w", err) } - return &cdcStore[Items]{ + return &CDCStore[Items]{ inMemoryRecords: make(map[model.TableWithPkey]model.Record[Items]), pebbleDB: nil, numRecords: atomic.Int32{}, @@ -128,7 +128,7 @@ func init() { gob.Register(types.QValueArrayNumeric{}) } -func (c *cdcStore[T]) initPebbleDB() error { +func (c *CDCStore[T]) initPebbleDB() error { if c.pebbleDB != nil { return nil } @@ -152,7 +152,7 @@ func (c *cdcStore[T]) initPebbleDB() error { return nil } -func (c *cdcStore[T]) diskSpillThresholdsExceeded() bool { +func (c *CDCStore[T]) diskSpillThresholdsExceeded() bool { if c.numRecordsSwitchThreshold >= 0 && len(c.inMemoryRecords) >= c.numRecordsSwitchThreshold { c.thresholdReason = fmt.Sprintf("more than %d primary keys read, spilling to disk", c.numRecordsSwitchThreshold) @@ -170,7 +170,7 @@ func (c *cdcStore[T]) diskSpillThresholdsExceeded() bool { return false } -func (c *cdcStore[T]) Set(logger log.Logger, key model.TableWithPkey, rec model.Record[T]) error { +func (c *CDCStore[T]) Set(logger log.Logger, key model.TableWithPkey, rec model.Record[T]) error { if key.TableName != "" { _, ok := c.inMemoryRecords[key] if ok || !c.diskSpillThresholdsExceeded() { @@ -208,7 +208,7 @@ func (c *cdcStore[T]) Set(logger log.Logger, key model.TableWithPkey, rec model. } // bool is to indicate if a record is found or not [similar to ok] -func (c *cdcStore[T]) Get(key model.TableWithPkey) (model.Record[T], bool, error) { +func (c *CDCStore[T]) Get(key model.TableWithPkey) (model.Record[T], bool, error) { rec, ok := c.inMemoryRecords[key] if ok { return rec, true, nil @@ -244,15 +244,7 @@ func (c *cdcStore[T]) Get(key model.TableWithPkey) (model.Record[T], bool, error return nil, false, nil } -func (c *cdcStore[T]) Len() int { - return int(c.numRecords.Load()) -} - -func (c *cdcStore[T]) IsEmpty() bool { - return c.Len() == 0 -} - -func (c *cdcStore[T]) Close() error { +func (c *CDCStore[T]) Close() error { c.inMemoryRecords = nil if c.pebbleDB != nil { if err := c.pebbleDB.Close(); err != nil { diff --git a/flow/connectors/utils/cdc_store_test.go b/flow/connectors/utils/cdc_store_test.go index f12ca9c2b7..329c533471 100644 --- a/flow/connectors/utils/cdc_store_test.go +++ b/flow/connectors/utils/cdc_store_test.go @@ -137,7 +137,7 @@ func TestNullKeyDoesntStore(t *testing.T) { require.NoError(t, err) require.False(t, ok) - require.Equal(t, 1, cdcRecordsStore.Len()) + require.Equal(t, 1, int(cdcRecordsStore.numRecords.Load())) require.NoError(t, cdcRecordsStore.Close()) } diff --git a/flow/internal/dynamicconf.go b/flow/internal/dynamicconf.go index 1af2188ba9..acb2e7f91f 100644 --- a/flow/internal/dynamicconf.go +++ b/flow/internal/dynamicconf.go @@ -64,6 +64,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{ ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_QUEUES, }, + { + Name: "PEERDB_CDC_STORE_ENABLED", + Description: "Controls whether to enable the store for recovering unchanged Postgres TOAST values within a CDC batch", + DefaultValue: "true", + ValueType: protos.DynconfValueType_BOOL, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, + TargetForSetting: protos.DynconfTarget_ALL, + }, { Name: "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", Description: "CDC: number of records beyond which records are written to disk instead", @@ -544,6 +552,10 @@ func PeerDBQueueParallelism(ctx context.Context, env map[string]string) (int64, return dynamicConfSigned[int64](ctx, env, "PEERDB_QUEUE_PARALLELISM") } +func PeerDBCDCStoreEnabled(ctx context.Context, env map[string]string) (bool, error) { + return dynamicConfBool(ctx, env, "PEERDB_CDC_STORE_ENABLED") +} + func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context, env map[string]string) (int64, error) { return dynamicConfSigned[int64](ctx, env, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD") }