diff --git a/README.md b/README.md index 4409f19..fbe2513 100644 --- a/README.md +++ b/README.md @@ -132,7 +132,8 @@ pipelines: logrepl.publicationName: "conduitpub" # LogreplSlotName determines the replication slot name in case the # connector uses logical replication to listen to changes (see - # CDCMode). + # CDCMode). Can only contain lower-case letters, numbers, and the + # underscore character. # Type: string # Required: no logrepl.slotName: "conduitslot" diff --git a/connector.yaml b/connector.yaml index a4bfbb8..14ae7b6 100644 --- a/connector.yaml +++ b/connector.yaml @@ -136,9 +136,12 @@ specification: description: |- LogreplSlotName determines the replication slot name in case the connector uses logical replication to listen to changes (see CDCMode). + Can only contain lower-case letters, numbers, and the underscore character. type: string default: conduitslot - validations: [] + validations: + - type: regex + value: ^[a-z0-9_]+$ - name: logrepl.withAvroSchema description: |- WithAvroSchema determines whether the connector should attach an avro schema on each diff --git a/source/config.go b/source/config.go index ae531e1..4d71cf9 100644 --- a/source/config.go +++ b/source/config.go @@ -69,7 +69,8 @@ type Config struct { LogreplPublicationName string `json:"logrepl.publicationName" default:"conduitpub"` // LogreplSlotName determines the replication slot name in case the // connector uses logical replication to listen to changes (see CDCMode). - LogreplSlotName string `json:"logrepl.slotName" default:"conduitslot"` + // Can only contain lower-case letters, numbers, and the underscore character. + LogreplSlotName string `json:"logrepl.slotName" validate:"regex=^[a-z0-9_]+$" default:"conduitslot"` // LogreplAutoCleanup determines if the replication slot and publication should be // removed when the connector is deleted. diff --git a/source/logrepl/internal/publication_test.go b/source/logrepl/internal/publication_test.go index 081689e..f04a707 100644 --- a/source/logrepl/internal/publication_test.go +++ b/source/logrepl/internal/publication_test.go @@ -27,7 +27,7 @@ func TestCreatePublication(t *testing.T) { ctx := test.Context(t) pool := test.ConnectPool(ctx, t, test.RegularConnString) - pubNames := []string{"testpub", "123", "test-hyphen", "test=equal"} + pubNames := []string{"testpub", "123", "test-hyphen", "test:semicolon", "test.dot", "test=equal"} pubParams := [][]string{ nil, {"publish = 'insert'"}, diff --git a/source_integration_test.go b/source_integration_test.go index 044b079..ac044cb 100644 --- a/source_integration_test.go +++ b/source_integration_test.go @@ -19,6 +19,8 @@ import ( "strings" "testing" + "github.com/conduitio/conduit-commons/config" + "github.com/conduitio/conduit-connector-postgres/source" "github.com/conduitio/conduit-connector-postgres/source/logrepl" "github.com/conduitio/conduit-connector-postgres/test" sdk "github.com/conduitio/conduit-connector-sdk" @@ -66,3 +68,46 @@ func TestSource_Open(t *testing.T) { is.NoErr(s.Teardown(ctx)) }() } + +func TestSource_ParseConfig(t *testing.T) { + testCases := []struct { + name string + cfg config.Config + wantErr bool + }{ + { + name: "valid postgres replication slot name", + cfg: config.Config{ + "url": "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb", + "tables": "table1,table2", + "cdcMode": "logrepl", + "logrepl.slotName": "valid_slot_name", + }, + wantErr: false, + }, { + name: "invalid postgres replication slot name", + cfg: config.Config{ + "url": "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb", + "tables": "table1,table2", + "cdcMode": "logrepl", + "logrepl.slotName": "invalid:slot.name", + }, + wantErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + is := is.New(t) + + var cfg source.Config + err := sdk.Util.ParseConfig(context.Background(), tc.cfg, cfg, Connector.NewSpecification().SourceParams) + + if tc.wantErr { + is.True(err != nil) + return + } + is.NoErr(err) + }) + } +}