Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions cmd/connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@ package main

import (
postgres "github.com/conduitio/conduit-connector-postgres"
"github.com/conduitio/conduit-connector-postgres/source/types"
sdk "github.com/conduitio/conduit-connector-sdk"
)

func main() {
// Running as standalone plugin
types.WithBuiltinPlugin = false

sdk.Serve(postgres.Connector)
}
2 changes: 0 additions & 2 deletions source/logrepl/internal/relationset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,9 @@ func TestRelationSetAllTypes(t *testing.T) {
t.Run("with standalone plugin", func(t *testing.T) {
is := is.New(t)

types.WithBuiltinPlugin = false
values, err := rs.Values(ins.RelationID, ins.Tuple)
is.NoErr(err)
isValuesAllTypesStandalone(is, values)
types.WithBuiltinPlugin = true
})
}

Expand Down
38 changes: 34 additions & 4 deletions source/snapshot/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (i *Iterator) Next(ctx context.Context) (opencdc.Record, error) {
}

i.acks.Add(1)
return i.buildRecord(d), nil
return i.buildRecord(d)
}
}

Expand All @@ -112,7 +112,7 @@ func (i *Iterator) Teardown(_ context.Context) error {
return nil
}

func (i *Iterator) buildRecord(d FetchData) opencdc.Record {
func (i *Iterator) buildRecord(d FetchData) (opencdc.Record, error) {
// merge this position with latest position
i.lastPosition.Type = position.TypeSnapshot
i.lastPosition.Snapshots[d.Table] = d.Position
Expand All @@ -121,11 +121,41 @@ func (i *Iterator) buildRecord(d FetchData) opencdc.Record {
metadata := make(opencdc.Metadata)
metadata["postgres.table"] = d.Table

rec := sdk.Util.Source.NewRecordSnapshot(pos, metadata, d.Key, d.Payload)
encodedPayload, err := i.encodeWithSchema(d.PayloadSchema, d.Payload)
if err != nil {
return opencdc.Record{}, fmt.Errorf("failed to encode payload with schema: %w", err)
}

encodedKey, err := i.encodeWithSchema(d.KeySchema, d.Key)
if err != nil {
return opencdc.Record{}, fmt.Errorf("failed to encode key with schema: %w", err)
}

rec := sdk.Util.Source.NewRecordSnapshot(
pos,
metadata,
opencdc.RawData(encodedKey),
opencdc.RawData(encodedPayload),
)

cschema.AttachKeySchemaToRecord(rec, d.KeySchema)
cschema.AttachPayloadSchemaToRecord(rec, d.PayloadSchema)

return rec
return rec, nil
}

func (i *Iterator) encodeWithSchema(sch cschema.Schema, data any) ([]byte, error) {
srd, err := sch.Serde()
if err != nil {
return nil, fmt.Errorf("failed to get serde for schema: %w", err)
}

encoded, err := srd.Marshal(data)
if err != nil {
return nil, fmt.Errorf("failed to marshal data with schema: %w", err)
}

return encoded, nil
}

func (i *Iterator) initFetchers(ctx context.Context) error {
Expand Down
31 changes: 0 additions & 31 deletions source/types/time.go

This file was deleted.

8 changes: 0 additions & 8 deletions source/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package types

import (
"time"

"github.com/jackc/pgx/v5/pgtype"
)

Expand All @@ -26,8 +24,6 @@ var (
UUID = UUIDFormatter{}
)

var WithBuiltinPlugin = true

func Format(oid uint32, v any) (any, error) {
if oid == pgtype.UUIDOID {
return UUID.Format(v)
Expand All @@ -38,10 +34,6 @@ func Format(oid uint32, v any) (any, error) {
return Numeric.Format(t)
case *pgtype.Numeric:
return Numeric.Format(*t)
case time.Time:
return Time.Format(t)
case *time.Time:
return Time.Format(*t)
case []uint8:
if oid == pgtype.XMLOID {
return string(t), nil
Expand Down
28 changes: 0 additions & 28 deletions source/types/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func Test_Format(t *testing.T) {
input []any
inputOID []uint32
expect []any
withBuiltin bool
}{
{
name: "int float string bool",
Expand Down Expand Up @@ -58,25 +57,6 @@ func Test_Format(t *testing.T) {
},
{
name: "time.Time",
input: []any{
func() time.Time {
is := is.New(t)
is.Helper()
t, err := time.Parse(time.DateTime, "2009-11-10 23:00:00")
is.NoErr(err)
return t
}(),
nil,
},
inputOID: []uint32{
0, 0,
},
expect: []any{
"2009-11-10 23:00:00 +0000 UTC", nil,
},
},
{
name: "builtin time.Time",
input: []any{
now,
},
Expand All @@ -86,7 +66,6 @@ func Test_Format(t *testing.T) {
expect: []any{
now,
},
withBuiltin: true,
},
{
name: "uuid",
Expand All @@ -107,13 +86,6 @@ func Test_Format(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)

prevWithBuiltinPlugin := WithBuiltinPlugin
WithBuiltinPlugin = tc.withBuiltin

t.Cleanup(func() {
WithBuiltinPlugin = prevWithBuiltinPlugin
})

for i, in := range tc.input {
v, err := Format(tc.inputOID[i], in)
is.NoErr(err)
Expand Down