Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2cfa09c
Generate connector.yaml
hariso Dec 24, 2024
bb04c67
Add connector.yaml
hariso Dec 24, 2024
54b3f16
generate connector.yaml
hariso Dec 24, 2024
0f306c0
Set version
hariso Dec 24, 2024
a485549
more updates
hariso Dec 24, 2024
511bc21
make build
hariso Jan 16, 2025
b96f73e
update sdk
hariso Jan 16, 2025
32e84b2
Migrate to specgen
hariso Jan 17, 2025
3407604
Merge branch 'main' into haris/specgen
hariso Jan 17, 2025
b5bfdfe
go.sum fix
hariso Jan 17, 2025
6e1088e
lint
hariso Jan 17, 2025
91914bf
Merge branch 'migrate-to-specgen' into haris/specgen
hariso Jan 17, 2025
24f5c86
Upgrade SDK
hariso Jan 29, 2025
f44a634
Update connector.yaml
hariso Jan 29, 2025
91a198c
validate generated files
hariso Jan 29, 2025
b9872ab
Merge branch 'haris/specgen' of github.com:ConduitIO/conduit-connecto…
hariso Jan 29, 2025
1e6ae79
rename
hariso Jan 29, 2025
179ac45
update sdk, use conn-sdk-cli
lovromazgon Jan 31, 2025
0aee455
use readmegen
lovromazgon Jan 31, 2025
79a6ecb
remove local replacement
lovromazgon Jan 31, 2025
5c380d4
add phony generate
lovromazgon Jan 31, 2025
3ee4187
respect logrepl.withAvroSchema
lovromazgon Jan 31, 2025
646634d
use latest sdk from main
lovromazgon Jan 31, 2025
62c69a7
inline errors
hariso Jan 31, 2025
42fdf69
Merge branch 'haris/specgen' of github.com:ConduitIO/conduit-connecto…
hariso Jan 31, 2025
3c67968
receivers
hariso Jan 31, 2025
88bf253
upgrade conduit-connector-sdk to v0.13.0
lovromazgon Jan 31, 2025
650007e
inline errors
lovromazgon Jan 31, 2025
337cf49
Merge branch 'main' into haris/specgen
lovromazgon Jan 31, 2025
b09e561
make generate
lovromazgon Jan 31, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ VERSION=$(shell git describe --tags --dirty --always)

.PHONY: build
build:
go build -ldflags "-X 'github.com/conduitio/conduit-connector-postgres.version=${VERSION}'" -o conduit-connector-postgres cmd/connector/main.go
sed -i '/specification:/,/version:/ s/version: .*/version: '"${VERSION}"'/' connector.yaml
go build -o conduit-connector-postgres cmd/connector/main.go

.PHONY: test
test:
Expand Down
9 changes: 8 additions & 1 deletion connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate specgen

package postgres

import (
_ "embed"

sdk "github.com/conduitio/conduit-connector-sdk"
)

//go:embed connector.yaml
var specs string

var Connector = sdk.Connector{
NewSpecification: Specification,
NewSpecification: sdk.YAMLSpecification(specs),
NewSource: NewSource,
NewDestination: NewDestination,
}
211 changes: 211 additions & 0 deletions connector.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
version: "1.0"
specification:
name: postgres
summary: A PostgreSQL source and destination plugin for Conduit.
description: ""
version: v0.10.1-11-g0f306c0-dirty
author: Meroxa, Inc.
source:
parameters:
- name: url
description: URL is the connection string for the Postgres database.
type: string
default: ""
validations:
- type: required
value: ""
- name: cdcMode
description: CDCMode determines how the connector should listen to changes.
type: string
default: auto
validations:
- type: inclusion
value: auto,logrepl
- name: logrepl.autoCleanup
description: |-
LogreplAutoCleanup determines if the replication slot and publication should be
removed when the connector is deleted.
type: bool
default: "true"
validations: []
- name: logrepl.publicationName
description: |-
LogreplPublicationName determines the publication name in case the
connector uses logical replication to listen to changes (see CDCMode).
type: string
default: conduitpub
validations: []
- name: logrepl.slotName
description: |-
LogreplSlotName determines the replication slot name in case the
connector uses logical replication to listen to changes (see CDCMode).
type: string
default: conduitslot
validations: []
- name: logrepl.withAvroSchema
description: |-
WithAvroSchema determines whether the connector should attach an avro schema on each
record.
type: bool
default: "false"
validations: []
- name: sdk.batch.delay
description: Maximum delay before an incomplete batch is read from the source.
type: duration
default: "0"
validations:
- type: greater-than
value: "-1"
- name: sdk.batch.size
description: Maximum size of batch before it gets read from the source.
type: int
default: "0"
validations:
- type: greater-than
value: "-1"
- name: sdk.schema.context.enabled
description: |-
Specifies whether to use a schema context name. If set to false, no schema context name will
be used, and schemas will be saved with the subject name specified in the connector
(not safe because of name conflicts).
type: bool
default: "true"
validations: []
- name: sdk.schema.context.name
description: |-
Schema context name to be used. Used as a prefix for all schema subject names.
If empty, defaults to the connector ID.
type: string
default: ""
validations: []
- name: sdk.schema.extract.key.enabled
description: Whether to extract and encode the record key with a schema.
type: bool
default: "false"
validations: []
- name: sdk.schema.extract.key.subject
description: |-
The subject of the key schema. If the record metadata contains the field
"opencdc.collection" it is prepended to the subject name and separated
with a dot.
type: string
default: key
validations: []
- name: sdk.schema.extract.payload.enabled
description: Whether to extract and encode the record payload with a schema.
type: bool
default: "false"
validations: []
- name: sdk.schema.extract.payload.subject
description: |-
The subject of the payload schema. If the record metadata contains the
field "opencdc.collection" it is prepended to the subject name and
separated with a dot.
type: string
default: payload
validations: []
- name: sdk.schema.extract.type
description: The type of the payload schema.
type: string
default: avro
validations:
- type: inclusion
value: avro
- name: snapshot.fetchSize
description: Snapshot fetcher size determines the number of rows to retrieve at a time.
type: int
default: "50000"
validations: []
- name: snapshotMode
description: SnapshotMode is whether the plugin will take a snapshot of the entire table before starting cdc mode.
type: string
default: initial
validations:
- type: inclusion
value: initial,never
- name: table
description: 'Deprecated: use `tables` instead.'
type: string
default: ""
validations: []
- name: tables
description: |-
Tables is a List of table names to read from, separated by a comma, e.g.:"table1,table2".
Use "*" if you'd like to listen to all tables.
type: string
default: ""
validations: []
destination:
parameters:
- name: url
description: URL is the connection string for the Postgres database.
type: string
default: ""
validations:
- type: required
value: ""
- name: key
description: Key represents the column name for the key used to identify and update existing rows.
type: string
default: ""
validations: []
- name: sdk.batch.delay
description: Maximum delay before an incomplete batch is written to the destination.
type: duration
default: "0"
validations: []
- name: sdk.batch.size
description: Maximum size of batch before it gets written to the destination.
type: int
default: "0"
validations:
- type: greater-than
value: "-1"
- name: sdk.rate.burst
description: |-
Allow bursts of at most X records (0 or less means that bursts are not
limited). Only takes effect if a rate limit per second is set. Note that
if `sdk.batch.size` is bigger than `sdk.rate.burst`, the effective batch
size will be equal to `sdk.rate.burst`.
type: int
default: "0"
validations:
- type: greater-than
value: "-1"
- name: sdk.rate.perSecond
description: Maximum number of records written per second (0 means no rate limit).
type: float
default: "0"
validations:
- type: greater-than
value: "-1"
- name: sdk.record.format
description: |-
The format of the output record. See the Conduit documentation for a full
list of supported formats (https://conduit.io/docs/using/connectors/configuration-parameters/output-format).
type: string
default: opencdc/json
validations: []
- name: sdk.record.format.options
description: |-
Options to configure the chosen output record format. Options are normally
key=value pairs separated with comma (e.g. opt1=val2,opt2=val2), except
for the `template` record format, where options are a Go template.
type: string
default: ""
validations: []
- name: sdk.schema.extract.key.enabled
description: Whether to extract and decode the record key with a schema.
type: bool
default: "true"
validations: []
- name: sdk.schema.extract.payload.enabled
description: Whether to extract and decode the record payload with a schema.
type: bool
default: "true"
validations: []
- name: table
description: Table is used as the target table into which records are inserted.
type: string
default: '{{ index .Metadata "opencdc.collection" }}'
validations: []
34 changes: 10 additions & 24 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"strings"

sq "github.com/Masterminds/squirrel"
"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-postgres/destination"
sdk "github.com/conduitio/conduit-connector-sdk"
Expand All @@ -38,45 +37,32 @@ type Destination struct {
stmtBuilder sq.StatementBuilderType
}

func (d *Destination) Config() sdk.DestinationConfig {
return &d.config
}

func NewDestination() sdk.Destination {
d := &Destination{
stmtBuilder: sq.StatementBuilder.PlaceholderFormat(sq.Dollar),
}
return sdk.DestinationWithMiddleware(d, sdk.DefaultDestinationMiddleware()...)
return sdk.DestinationWithMiddleware(d)
}

func (d *Destination) Parameters() config.Parameters {
return d.config.Parameters()
}

func (d *Destination) Configure(ctx context.Context, cfg config.Config) error {
err := sdk.Util.ParseConfig(ctx, cfg, &d.config, NewDestination().Parameters())
if err != nil {
return err
}
// try parsing the url
_, err = pgx.ParseConfig(d.config.URL)
func (d *Destination) Open(ctx context.Context) error {
conn, err := pgx.Connect(ctx, d.config.URL)
if err != nil {
return fmt.Errorf("invalid url: %w", err)
return fmt.Errorf("failed to open connection: %w", err)
}
d.conn = conn

d.getTableName, err = d.config.TableFunction()
if err != nil {
return fmt.Errorf("invalid table name or table function: %w", err)
return fmt.Errorf("invalid table name or table name function: %w", err)
}

return nil
}

func (d *Destination) Open(ctx context.Context) error {
conn, err := pgx.Connect(ctx, d.config.URL)
if err != nil {
return fmt.Errorf("failed to open connection: %w", err)
}
d.conn = conn
return nil
}

// Write routes incoming records to their appropriate handler based on the
// operation.
func (d *Destination) Write(ctx context.Context, recs []opencdc.Record) (int, error) {
Expand Down
23 changes: 21 additions & 2 deletions destination/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate paramgen Config

package destination

import (
"bytes"
"context"
"fmt"
"strings"
"text/template"

"github.com/Masterminds/sprig/v3"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/jackc/pgx/v5"
)

type TableFn func(opencdc.Record) (string, error)

type Config struct {
sdk.DefaultDestinationMiddleware

// URL is the connection string for the Postgres database.
URL string `json:"url" validate:"required"`
// Table is used as the target table into which records are inserted.
Expand All @@ -37,6 +40,22 @@ type Config struct {
Key string `json:"key"`
}

// todo pointer receiver, others are value receivers
func (c *Config) Validate(context.Context) error {
// try parsing the url
_, err := pgx.ParseConfig(c.URL)
if err != nil {
return fmt.Errorf("invalid url: %w", err)
}

_, err = c.TableFunction()
if err != nil {
return fmt.Errorf("invalid table name or table function: %w", err)
}

return nil
}

// TableFunction returns a function that determines the table for each record individually.
// The function might be returning a static table name.
// If the table is neither static nor a template, an error is returned.
Expand Down
Loading
Loading