Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (s *Source) Open(ctx context.Context, pos opencdc.Position) error {
Tables: s.config.Tables,
TableKeys: s.tableKeys,
WithSnapshot: s.config.SnapshotMode == source.SnapshotModeInitial,
WithAvroSchema: s.config.WithAvroSchema,
SnapshotFetchSize: s.config.SnapshotFetchSize,
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion source/logrepl/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewCDCIterator(ctx context.Context, pool *pgxpool.Pool, c CDCConfig) (*CDCI
}

records := make(chan opencdc.Record)
handler := NewCDCHandler(internal.NewRelationSet(), c.TableKeys, records)
handler := NewCDCHandler(internal.NewRelationSet(), c.TableKeys, records, c.WithAvroSchema)

sub, err := internal.CreateSubscription(
ctx,
Expand Down
1 change: 1 addition & 0 deletions source/logrepl/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ func testCDCIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, tabl
TableKeys: map[string]string{table: "id"},
PublicationName: table, // table is random, reuse for publication name
SlotName: table, // table is random, reuse for slot name
WithAvroSchema: true,
}

i, err := NewCDCIterator(ctx, pool, config)
Expand Down
13 changes: 8 additions & 5 deletions source/logrepl/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Config struct {
Tables []string
TableKeys map[string]string
WithSnapshot bool
WithAvroSchema bool
SnapshotFetchSize int
}

Expand Down Expand Up @@ -178,6 +179,7 @@ func (c *CombinedIterator) initCDCIterator(ctx context.Context, pos position.Pos
PublicationName: c.conf.PublicationName,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
WithAvroSchema: c.conf.WithAvroSchema,
})
if err != nil {
return fmt.Errorf("failed to create CDC iterator: %w", err)
Expand All @@ -199,11 +201,12 @@ func (c *CombinedIterator) initSnapshotIterator(ctx context.Context, pos positio
}

snapshotIterator, err := snapshot.NewIterator(ctx, c.pool, snapshot.Config{
Position: c.conf.Position,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
TXSnapshotID: c.cdcIterator.TXSnapshotID(),
FetchSize: c.conf.SnapshotFetchSize,
Position: c.conf.Position,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
TXSnapshotID: c.cdcIterator.TXSnapshotID(),
FetchSize: c.conf.SnapshotFetchSize,
WithAvroSchema: c.conf.WithAvroSchema,
})
if err != nil {
return fmt.Errorf("failed to create snapshot iterator: %w", err)
Expand Down
10 changes: 9 additions & 1 deletion source/logrepl/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,17 @@ type CDCHandler struct {
out chan<- opencdc.Record
lastTXLSN pglogrepl.LSN

withAvroSchema bool
keySchemas map[string]cschema.Schema
payloadSchemas map[string]cschema.Schema
}

func NewCDCHandler(rs *internal.RelationSet, tableKeys map[string]string, out chan<- opencdc.Record) *CDCHandler {
func NewCDCHandler(rs *internal.RelationSet, tableKeys map[string]string, out chan<- opencdc.Record, withAvroSchema bool) *CDCHandler {
return &CDCHandler{
tableKeys: tableKeys,
relationSet: rs,
out: out,
withAvroSchema: withAvroSchema,
keySchemas: make(map[string]cschema.Schema),
payloadSchemas: make(map[string]cschema.Schema),
}
Expand Down Expand Up @@ -245,6 +247,9 @@ func (*CDCHandler) buildPosition(lsn pglogrepl.LSN) opencdc.Position {
// updateAvroSchema generates and stores avro schema based on the relation's row,
// when usage of avro schema is requested.
func (h *CDCHandler) updateAvroSchema(ctx context.Context, rel *pglogrepl.RelationMessage) error {
if !h.withAvroSchema {
return nil
}
// Payload schema
avroPayloadSch, err := schema.Avro.ExtractLogrepl(rel.RelationName+"_payload", rel)
if err != nil {
Expand Down Expand Up @@ -281,6 +286,9 @@ func (h *CDCHandler) updateAvroSchema(ctx context.Context, rel *pglogrepl.Relati
}

func (h *CDCHandler) attachSchemas(rec opencdc.Record, relationName string) {
if !h.withAvroSchema {
return
}
cschema.AttachPayloadSchemaToRecord(rec, h.payloadSchemas[relationName])
cschema.AttachKeySchemaToRecord(rec, h.keySchemas[relationName])
}
34 changes: 21 additions & 13 deletions source/snapshot/fetch_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ var supportedKeyTypes = []string{
}

type FetchConfig struct {
Table string
Key string
TXSnapshotID string
FetchSize int
Position position.Position
Table string
Key string
TXSnapshotID string
FetchSize int
Position position.Position
WithAvroSchema bool
}

var (
Expand Down Expand Up @@ -337,14 +338,17 @@ func (f *FetchWorker) buildFetchData(fields []pgconn.FieldDescription, values []
return FetchData{}, fmt.Errorf("failed to encode record data: %w", err)
}

return FetchData{
Key: key,
Payload: payload,
Position: pos,
Table: f.conf.Table,
PayloadSchema: *f.payloadSchema,
KeySchema: *f.keySchema,
}, nil
fd := FetchData{
Key: key,
Payload: payload,
Position: pos,
Table: f.conf.Table,
}
if f.conf.WithAvroSchema {
fd.PayloadSchema = *f.payloadSchema
fd.KeySchema = *f.keySchema
}
return fd, nil
}

func (f *FetchWorker) buildSnapshotPosition(fields []pgconn.FieldDescription, values []any) (position.SnapshotPosition, error) {
Expand Down Expand Up @@ -467,6 +471,10 @@ func (*FetchWorker) validateTable(ctx context.Context, table string, tx pgx.Tx)
}

func (f *FetchWorker) extractSchemas(ctx context.Context, fields []pgconn.FieldDescription) error {
if !f.conf.WithAvroSchema {
return nil
}

if f.payloadSchema == nil {
sdk.Logger(ctx).Debug().
Msgf("extracting payload schema for %v fields in %v", len(fields), f.conf.Table)
Expand Down
28 changes: 16 additions & 12 deletions source/snapshot/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ import (
var ErrIteratorDone = errors.New("snapshot complete")

type Config struct {
Position opencdc.Position
Tables []string
TableKeys map[string]string
TXSnapshotID string
FetchSize int
Position opencdc.Position
Tables []string
TableKeys map[string]string
TXSnapshotID string
FetchSize int
WithAvroSchema bool
}

type Iterator struct {
Expand Down Expand Up @@ -122,8 +123,10 @@ func (i *Iterator) buildRecord(d FetchData) opencdc.Record {
metadata["postgres.table"] = d.Table

rec := sdk.Util.Source.NewRecordSnapshot(pos, metadata, d.Key, d.Payload)
cschema.AttachKeySchemaToRecord(rec, d.KeySchema)
cschema.AttachPayloadSchemaToRecord(rec, d.PayloadSchema)
if i.conf.WithAvroSchema {
cschema.AttachKeySchemaToRecord(rec, d.KeySchema)
cschema.AttachPayloadSchemaToRecord(rec, d.PayloadSchema)
}

return rec
}
Expand All @@ -135,11 +138,12 @@ func (i *Iterator) initFetchers(ctx context.Context) error {

for j, t := range i.conf.Tables {
w := NewFetchWorker(i.db, i.data, FetchConfig{
Table: t,
Key: i.conf.TableKeys[t],
TXSnapshotID: i.conf.TXSnapshotID,
Position: i.lastPosition,
FetchSize: i.conf.FetchSize,
Table: t,
Key: i.conf.TableKeys[t],
TXSnapshotID: i.conf.TXSnapshotID,
Position: i.lastPosition,
FetchSize: i.conf.FetchSize,
WithAvroSchema: i.conf.WithAvroSchema,
})

if err := w.Validate(ctx); err != nil {
Expand Down
Loading