Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ pipelines:
logrepl.publicationName: "conduitpub"
# LogreplSlotName determines the replication slot name in case the
# connector uses logical replication to listen to changes (see
# CDCMode).
# CDCMode). Can only contain lower-case letters, numbers, and the
# underscore character.
# Type: string
# Required: no
logrepl.slotName: "conduitslot"
Expand Down
5 changes: 4 additions & 1 deletion connector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,12 @@ specification:
description: |-
LogreplSlotName determines the replication slot name in case the
connector uses logical replication to listen to changes (see CDCMode).
Can only contain lower-case letters, numbers, and the underscore character.
type: string
default: conduitslot
validations: []
validations:
- type: regex
value: ^[a-z0-9_]+$
- name: logrepl.withAvroSchema
description: |-
WithAvroSchema determines whether the connector should attach an avro schema on each
Expand Down
3 changes: 2 additions & 1 deletion source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ type Config struct {
LogreplPublicationName string `json:"logrepl.publicationName" default:"conduitpub"`
// LogreplSlotName determines the replication slot name in case the
// connector uses logical replication to listen to changes (see CDCMode).
LogreplSlotName string `json:"logrepl.slotName" default:"conduitslot"`
// Can only contain lower-case letters, numbers, and the underscore character.
LogreplSlotName string `json:"logrepl.slotName" validate:"regex=^[a-z0-9_]+$" default:"conduitslot"`

// LogreplAutoCleanup determines if the replication slot and publication should be
// removed when the connector is deleted.
Expand Down
2 changes: 1 addition & 1 deletion source/logrepl/internal/publication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestCreatePublication(t *testing.T) {
ctx := test.Context(t)
pool := test.ConnectPool(ctx, t, test.RegularConnString)

pubNames := []string{"testpub", "123", "test-hyphen", "test=equal"}
pubNames := []string{"testpub", "123", "test-hyphen", "test:semicolon", "test.dot", "test=equal"}
pubParams := [][]string{
nil,
{"publish = 'insert'"},
Expand Down
45 changes: 45 additions & 0 deletions source_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"strings"
"testing"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-connector-postgres/source"
"github.com/conduitio/conduit-connector-postgres/source/logrepl"
"github.com/conduitio/conduit-connector-postgres/test"
sdk "github.com/conduitio/conduit-connector-sdk"
Expand Down Expand Up @@ -66,3 +68,46 @@ func TestSource_Open(t *testing.T) {
is.NoErr(s.Teardown(ctx))
}()
}

func TestSource_ParseConfig(t *testing.T) {
testCases := []struct {
name string
cfg config.Config
wantErr bool
}{
{
name: "valid postgres replication slot name",
cfg: config.Config{
"url": "postgresql://meroxauser:[email protected]:5432/meroxadb",
"tables": "table1,table2",
"cdcMode": "logrepl",
"logrepl.slotName": "valid_slot_name",
},
wantErr: false,
}, {
name: "invalid postgres replication slot name",
cfg: config.Config{
"url": "postgresql://meroxauser:[email protected]:5432/meroxadb",
"tables": "table1,table2",
"cdcMode": "logrepl",
"logrepl.slotName": "invalid:slot.name",
},
wantErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)

var cfg source.Config
err := sdk.Util.ParseConfig(context.Background(), tc.cfg, cfg, Connector.NewSpecification().SourceParams)

if tc.wantErr {
is.True(err != nil)
return
}
is.NoErr(err)
})
}
}