diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index a097c428..f82fd342 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -3,7 +3,7 @@ name: release on: push: tags: - - v* + - '*' permissions: contents: write @@ -18,6 +18,50 @@ jobs: with: fetch-depth: 0 + - name: Validate Tag Format + run: | + TAG=${GITHUB_REF#refs/tags/} + + SV_REGEX="^v(0|[1-9][0-9]*)\.(0|[1-9][0-9]*)\.(0|[1-9][0-9]*)(-((0|[1-9][0-9]*|[0-9]*[a-zA-Z-][0-9a-zA-Z-]*)(\.(0|[1-9][0-9]*|[0-9]*[a-zA-Z-][0-9a-zA-Z-]*))*))?(\+([0-9a-zA-Z-]+(\.[0-9a-zA-Z-]+)*))?$" + + if ! [[ $TAG =~ $SV_REGEX ]]; then + echo "$TAG is NOT a valid tag (expected format: v)" + exit 1 + fi + + - name: Check Version Consistency + run: | + # Extract tag and remove 'v' prefix if exists + TAG=${GITHUB_REF#refs/tags/} + + # Read version from connector.yaml + YAML_VERSION=$(yq e '.specification.version' connector.yaml) + + # Compare versions + if [[ "$TAG" != "$YAML_VERSION" ]]; then + echo "Version mismatch detected:" + echo "Git Tag: $TAG" + echo "connector.yaml Version: $YAML_VERSION" + exit 1 + fi + + - name: Delete Invalid Tag + if: failure() + uses: actions/github-script@v7 + with: + github-token: ${{secrets.GITHUB_TOKEN}} + script: | + const tag = context.ref.replace('refs/tags/', '') + try { + await github.rest.git.deleteRef({ + owner: context.repo.owner, + repo: context.repo.repo, + ref: `tags/${tag}` + }) + } catch (error) { + console.log('Error deleting tag:', error) + } + - name: Set up Go uses: actions/setup-go@v5 with: diff --git a/Makefile b/Makefile index 5752635e..6cec5f12 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,7 @@ lint: .PHONY: generate generate: go generate ./... + conn-sdk-cli readmegen -w .PHONY: fmt fmt: diff --git a/README.md b/README.md index f5cb71d4..f7efe768 100644 --- a/README.md +++ b/README.md @@ -1,99 +1,254 @@ # Conduit Connector PostgreSQL -The PostgreSQL connector is a [Conduit](https://github.com/ConduitIO/conduit) plugin. It provides both, a source -and a destination PostgresSQL connectors. +The PostgreSQL connector is a [Conduit](https://github.com/ConduitIO/conduit) +plugin. It provides both, a source and a destination PostgresSQL connector. -# Source + +## Source -The Postgres Source Connector connects to a database with the provided `url` and starts creating records for each change -detected in the provided tables. +The Postgres Source Connector connects to a database with the provided `url` and +starts creating records for each change detected in the provided tables. -Upon starting, the source takes a snapshot of the provided tables in the database, then switches into CDC mode. In CDC mode, -the plugin reads from a buffer of CDC events. +Upon starting, the source takes a snapshot of the provided tables in the database, +then switches into CDC mode. In CDC mode, the plugin reads from a buffer of CDC events. -## Snapshot Capture +### Snapshot -When the connector first starts, snapshot mode is enabled. The connector acquires a read-only lock on the tables, and -then reads all rows of the tables into Conduit. Once all rows in that initial snapshot are read the connector releases -its lock and switches into CDC mode. +When the connector first starts, snapshot mode is enabled. The connector acquires +a read-only lock on the tables, and then reads all rows of the tables into Conduit. +Once all rows in that initial snapshot are read the connector releases its lock and +switches into CDC mode. -This behavior is enabled by default, but can be turned off by adding `"snapshotMode":"never"` to the Source -configuration. +This behavior is enabled by default, but can be turned off by adding +`"snapshotMode": "never"` to the Source configuration. -## Change Data Capture +### Change Data Capture -This connector implements CDC features for PostgreSQL by creating a logical replication slot and a publication that -listens to changes in the configured tables. Every detected change is converted into a record and returned in the call to -`Read`. If there is no record available at the moment `Read` is called, it blocks until a record is available or the -connector receives a stop signal. +This connector implements Change Data Capture (CDC) features for PostgreSQL by +creating a logical replication slot and a publication that listens to changes in the +configured tables. Every detected change is converted into a record. If there are no +records available, the connector blocks until a record is available or the connector +receives a stop signal. -### Logical Replication Configuration +#### Logical Replication Configuration -When the connector switches to CDC mode, it attempts to run the initial setup commands to create its logical replication -slot and publication. It will connect to an existing slot if one with the configured name exists. +When the connector switches to CDC mode, it attempts to run the initial setup commands +to create its logical replication slot and publication. It will connect to an existing +slot if one with the configured name exists. -The Postgres user specified in the connection URL must have sufficient privileges to run all of these setup commands, or -it will fail. +The Postgres user specified in the connection URL must have sufficient privileges to +run all of these setup commands, or it will fail. -Example configuration for CDC features: +Example pipeline configuration that's using logical replication: -```json -{ - "url": "url", - "tables": "records", - "cdcMode": "logrepl", - "logrepl.publicationName": "meroxademo", - "logrepl.slotName": "meroxademo" -} +```yaml +version: 2.2 +pipelines: + - id: pg-to-log + status: running + connectors: + - id: pg + type: source + plugin: builtin:postgres + settings: + url: "postgres://exampleuser:examplepass@localhost:5433/exampledb?sslmode=disable" + tables: "users" + cdcMode: "logrepl" + logrepl.publicationName: "examplepub" + logrepl.slotName": "exampleslot" + - id: log + type: destination + plugin: builtin:log + settings: + level: info ``` -:warning: When the connector or pipeline is deleted, the connector will automatically attempt to delete the replication slot and publication. This is the default behaviour and can be disabled by setting `logrepl.autoCleanup` to `false`. +:warning: When the connector or pipeline is deleted, the connector will automatically +attempt to delete the replication slot and publication. This is the default behaviour +and can be disabled by setting `logrepl.autoCleanup` to `false`. -## Key Handling +### Key Handling -The connector will automatically look up the primary key column for the specified tables. If that can't be determined, -the connector will return an error. +The connector will automatically look up the primary key column for the specified tables +and use them as the key value. If that can't be determined, the connector will return +an error. -## Configuration Options +## Destination -| name | description | required | default | -| ------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------ | -------- | ------------- | -| `url` | Connection string for the Postgres database. | true | | -| `tables` | List of table names to read from, separated by comma. Example: `"employees,offices,payments"`. Using `*` will read from all public tables. | true | | -| `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` | -| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl`). | false | `auto` | -| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` | -| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` | -| `logrepl.autoCleanup` | Whether or not to cleanup the replication slot and pub when connector is deleted | false | `true` | -| ~~`table`~~ | List of table names to read from, separated by comma. **Deprecated: use `tables` instead.** | false | | +The Postgres Destination takes a Conduit record and stores it using a SQL statement. +The Destination is designed to handle different payloads and keys. Because of this, +each record is individually parsed and upserted. -# Destination +### Handling record operations -The Postgres Destination takes a `record.Record` and parses it into a valid SQL query. The Destination is designed to -handle different payloads and keys. Because of this, each record is individually parsed and upserted. +Based on the `Operation` field in the record, the destination will either insert, +update or delete the record in the target table. Snapshot records are always inserted. -## Upsert Behavior +If the target table already contains a record with the same key as a record being +inserted, the record will be updated (upserted). This can overwrite and thus potentially +lose data, so keys should be assigned correctly from the Source. -If the target table already contains a record with the same key, the Destination will upsert with its current received -values. Because Keys must be unique, this can overwrite and thus potentially lose data, so keys should be assigned -correctly from the Source. +If the target table does not contain a record with the same key as a record being +deleted, the record will be ignored. If there is no key, the record will be simply appended. + -## Configuration Options +## Source Configuration Parameters -| name | description | required | default | -| ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------- | -------------------------------------------- | -| `url` | Connection string for the Postgres database. | true | | -| `table` | Table name. It can contain a Go template that will be executed for each record to determine the table. By default, the table is the value of the `opencdc.collection` metadata field. | false | `{{ index .Metadata "opencdc.collection" }}` | -| `key` | Key represents the column name for the key used to identify and update existing rows. | false | | + +```yaml +version: 2.2 +pipelines: + - id: example + status: running + connectors: + - id: example + plugin: "postgres" + settings: + # URL is the connection string for the Postgres database. + # Type: string + url: "" + # CDCMode determines how the connector should listen to changes. + # Type: string + cdcMode: "auto" + # LogreplAutoCleanup determines if the replication slot and + # publication should be removed when the connector is deleted. + # Type: bool + logrepl.autoCleanup: "true" + # LogreplPublicationName determines the publication name in case the + # connector uses logical replication to listen to changes (see + # CDCMode). + # Type: string + logrepl.publicationName: "conduitpub" + # LogreplSlotName determines the replication slot name in case the + # connector uses logical replication to listen to changes (see + # CDCMode). + # Type: string + logrepl.slotName: "conduitslot" + # WithAvroSchema determines whether the connector should attach an + # avro schema on each record. + # Type: bool + logrepl.withAvroSchema: "true" + # Maximum delay before an incomplete batch is read from the source. + # Type: duration + sdk.batch.delay: "0" + # Maximum size of batch before it gets read from the source. + # Type: int + sdk.batch.size: "0" + # Specifies whether to use a schema context name. If set to false, no + # schema context name will be used, and schemas will be saved with the + # subject name specified in the connector (not safe because of name + # conflicts). + # Type: bool + sdk.schema.context.enabled: "true" + # Schema context name to be used. Used as a prefix for all schema + # subject names. If empty, defaults to the connector ID. + # Type: string + sdk.schema.context.name: "" + # Whether to extract and encode the record key with a schema. + # Type: bool + sdk.schema.extract.key.enabled: "false" + # The subject of the key schema. If the record metadata contains the + # field "opencdc.collection" it is prepended to the subject name and + # separated with a dot. + # Type: string + sdk.schema.extract.key.subject: "key" + # Whether to extract and encode the record payload with a schema. + # Type: bool + sdk.schema.extract.payload.enabled: "false" + # The subject of the payload schema. If the record metadata contains + # the field "opencdc.collection" it is prepended to the subject name + # and separated with a dot. + # Type: string + sdk.schema.extract.payload.subject: "payload" + # The type of the payload schema. + # Type: string + sdk.schema.extract.type: "avro" + # Snapshot fetcher size determines the number of rows to retrieve at a + # time. + # Type: int + snapshot.fetchSize: "50000" + # SnapshotMode is whether the plugin will take a snapshot of the + # entire table before starting cdc mode. + # Type: string + snapshotMode: "initial" + # Deprecated: use `tables` instead. + # Type: string + table: "" + # Tables is a List of table names to read from, separated by a comma, + # e.g.:"table1,table2". Use "*" if you'd like to listen to all tables. + # Type: string + tables: "" +``` + + +## Destination Configuration Parameters + + +```yaml +version: 2.2 +pipelines: + - id: example + status: running + connectors: + - id: example + plugin: "postgres" + settings: + # URL is the connection string for the Postgres database. + # Type: string + url: "" + # Key represents the column name for the key used to identify and + # update existing rows. + # Type: string + key: "" + # Maximum delay before an incomplete batch is written to the + # destination. + # Type: duration + sdk.batch.delay: "0" + # Maximum size of batch before it gets written to the destination. + # Type: int + sdk.batch.size: "0" + # Allow bursts of at most X records (0 or less means that bursts are + # not limited). Only takes effect if a rate limit per second is set. + # Note that if `sdk.batch.size` is bigger than `sdk.rate.burst`, the + # effective batch size will be equal to `sdk.rate.burst`. + # Type: int + sdk.rate.burst: "0" + # Maximum number of records written per second (0 means no rate + # limit). + # Type: float + sdk.rate.perSecond: "0" + # The format of the output record. See the Conduit documentation for a + # full list of supported formats + # (https://conduit.io/docs/using/connectors/configuration-parameters/output-format). + # Type: string + sdk.record.format: "opencdc/json" + # Options to configure the chosen output record format. Options are + # normally key=value pairs separated with comma (e.g. + # opt1=val2,opt2=val2), except for the `template` record format, where + # options are a Go template. + # Type: string + sdk.record.format.options: "" + # Whether to extract and decode the record key with a schema. + # Type: bool + sdk.schema.extract.key.enabled: "true" + # Whether to extract and decode the record payload with a schema. + # Type: bool + sdk.schema.extract.payload.enabled: "true" + # Table is used as the target table into which records are inserted. + # Type: string + table: "{{ index .Metadata "opencdc.collection" }}" +``` + -# Testing +## Testing -Run `make test` to run all the unit and integration tests, which require Docker to be installed and running. The command -will handle starting and stopping docker containers for you. +Run `make test` to run all the unit and integration tests, which require Docker +to be installed and running. The command will handle starting and stopping +docker containers for you. -# References +## References - https://github.com/bitnami/bitnami-docker-postgresql-repmgr - https://github.com/Masterminds/squirrel diff --git a/connector.go b/connector.go index cf218f95..7cf6e5cb 100644 --- a/connector.go +++ b/connector.go @@ -12,14 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:generate conn-sdk-cli specgen + package postgres import ( + _ "embed" + sdk "github.com/conduitio/conduit-connector-sdk" ) +//go:embed connector.yaml +var specs string + +var version = "(devel)" + var Connector = sdk.Connector{ - NewSpecification: Specification, + NewSpecification: sdk.YAMLSpecification(specs, version), NewSource: NewSource, NewDestination: NewDestination, } diff --git a/connector.yaml b/connector.yaml new file mode 100644 index 00000000..645ba408 --- /dev/null +++ b/connector.yaml @@ -0,0 +1,299 @@ +version: "1.0" +specification: + name: postgres + summary: Conduit connector for PostgreSQL + description: | + ## Source + + The Postgres Source Connector connects to a database with the provided `url` and + starts creating records for each change detected in the provided tables. + + Upon starting, the source takes a snapshot of the provided tables in the database, + then switches into CDC mode. In CDC mode, the plugin reads from a buffer of CDC events. + + ### Snapshot + + When the connector first starts, snapshot mode is enabled. The connector acquires + a read-only lock on the tables, and then reads all rows of the tables into Conduit. + Once all rows in that initial snapshot are read the connector releases its lock and + switches into CDC mode. + + This behavior is enabled by default, but can be turned off by adding + `"snapshotMode": "never"` to the Source configuration. + + ### Change Data Capture + + This connector implements Change Data Capture (CDC) features for PostgreSQL by + creating a logical replication slot and a publication that listens to changes in the + configured tables. Every detected change is converted into a record. If there are no + records available, the connector blocks until a record is available or the connector + receives a stop signal. + + #### Logical Replication Configuration + + When the connector switches to CDC mode, it attempts to run the initial setup commands + to create its logical replication slot and publication. It will connect to an existing + slot if one with the configured name exists. + + The Postgres user specified in the connection URL must have sufficient privileges to + run all of these setup commands, or it will fail. + + Example pipeline configuration that's using logical replication: + + ```yaml + version: 2.2 + pipelines: + - id: pg-to-log + status: running + connectors: + - id: pg + type: source + plugin: builtin:postgres + settings: + url: "postgres://exampleuser:examplepass@localhost:5433/exampledb?sslmode=disable" + tables: "users" + cdcMode: "logrepl" + logrepl.publicationName: "examplepub" + logrepl.slotName": "exampleslot" + - id: log + type: destination + plugin: builtin:log + settings: + level: info + ``` + + :warning: When the connector or pipeline is deleted, the connector will automatically + attempt to delete the replication slot and publication. This is the default behaviour + and can be disabled by setting `logrepl.autoCleanup` to `false`. + + ### Key Handling + + The connector will automatically look up the primary key column for the specified tables + and use them as the key value. If that can't be determined, the connector will return + an error. + + ## Destination + + The Postgres Destination takes a Conduit record and stores it using a SQL statement. + The Destination is designed to handle different payloads and keys. Because of this, + each record is individually parsed and upserted. + + ### Handling record operations + + Based on the `Operation` field in the record, the destination will either insert, + update or delete the record in the target table. Snapshot records are always inserted. + + If the target table already contains a record with the same key as a record being + inserted, the record will be updated (upserted). This can overwrite and thus potentially + lose data, so keys should be assigned correctly from the Source. + + If the target table does not contain a record with the same key as a record being + deleted, the record will be ignored. + + If there is no key, the record will be simply appended. + version: v0.11.0-dev + author: Meroxa, Inc. + source: + parameters: + - name: url + description: URL is the connection string for the Postgres database. + type: string + default: "" + validations: + - type: required + value: "" + - name: cdcMode + description: CDCMode determines how the connector should listen to changes. + type: string + default: auto + validations: + - type: inclusion + value: auto,logrepl + - name: logrepl.autoCleanup + description: |- + LogreplAutoCleanup determines if the replication slot and publication should be + removed when the connector is deleted. + type: bool + default: "true" + validations: [] + - name: logrepl.publicationName + description: |- + LogreplPublicationName determines the publication name in case the + connector uses logical replication to listen to changes (see CDCMode). + type: string + default: conduitpub + validations: [] + - name: logrepl.slotName + description: |- + LogreplSlotName determines the replication slot name in case the + connector uses logical replication to listen to changes (see CDCMode). + type: string + default: conduitslot + validations: [] + - name: logrepl.withAvroSchema + description: |- + WithAvroSchema determines whether the connector should attach an avro schema on each + record. + type: bool + default: "true" + validations: [] + - name: sdk.batch.delay + description: Maximum delay before an incomplete batch is read from the source. + type: duration + default: "0" + validations: + - type: greater-than + value: "-1" + - name: sdk.batch.size + description: Maximum size of batch before it gets read from the source. + type: int + default: "0" + validations: + - type: greater-than + value: "-1" + - name: sdk.schema.context.enabled + description: |- + Specifies whether to use a schema context name. If set to false, no schema context name will + be used, and schemas will be saved with the subject name specified in the connector + (not safe because of name conflicts). + type: bool + default: "true" + validations: [] + - name: sdk.schema.context.name + description: |- + Schema context name to be used. Used as a prefix for all schema subject names. + If empty, defaults to the connector ID. + type: string + default: "" + validations: [] + - name: sdk.schema.extract.key.enabled + description: Whether to extract and encode the record key with a schema. + type: bool + default: "false" + validations: [] + - name: sdk.schema.extract.key.subject + description: |- + The subject of the key schema. If the record metadata contains the field + "opencdc.collection" it is prepended to the subject name and separated + with a dot. + type: string + default: key + validations: [] + - name: sdk.schema.extract.payload.enabled + description: Whether to extract and encode the record payload with a schema. + type: bool + default: "false" + validations: [] + - name: sdk.schema.extract.payload.subject + description: |- + The subject of the payload schema. If the record metadata contains the + field "opencdc.collection" it is prepended to the subject name and + separated with a dot. + type: string + default: payload + validations: [] + - name: sdk.schema.extract.type + description: The type of the payload schema. + type: string + default: avro + validations: + - type: inclusion + value: avro + - name: snapshot.fetchSize + description: Snapshot fetcher size determines the number of rows to retrieve at a time. + type: int + default: "50000" + validations: [] + - name: snapshotMode + description: SnapshotMode is whether the plugin will take a snapshot of the entire table before starting cdc mode. + type: string + default: initial + validations: + - type: inclusion + value: initial,never + - name: table + description: 'Deprecated: use `tables` instead.' + type: string + default: "" + validations: [] + - name: tables + description: |- + Tables is a List of table names to read from, separated by a comma, e.g.:"table1,table2". + Use "*" if you'd like to listen to all tables. + type: string + default: "" + validations: [] + destination: + parameters: + - name: url + description: URL is the connection string for the Postgres database. + type: string + default: "" + validations: + - type: required + value: "" + - name: key + description: Key represents the column name for the key used to identify and update existing rows. + type: string + default: "" + validations: [] + - name: sdk.batch.delay + description: Maximum delay before an incomplete batch is written to the destination. + type: duration + default: "0" + validations: [] + - name: sdk.batch.size + description: Maximum size of batch before it gets written to the destination. + type: int + default: "0" + validations: + - type: greater-than + value: "-1" + - name: sdk.rate.burst + description: |- + Allow bursts of at most X records (0 or less means that bursts are not + limited). Only takes effect if a rate limit per second is set. Note that + if `sdk.batch.size` is bigger than `sdk.rate.burst`, the effective batch + size will be equal to `sdk.rate.burst`. + type: int + default: "0" + validations: + - type: greater-than + value: "-1" + - name: sdk.rate.perSecond + description: Maximum number of records written per second (0 means no rate limit). + type: float + default: "0" + validations: + - type: greater-than + value: "-1" + - name: sdk.record.format + description: |- + The format of the output record. See the Conduit documentation for a full + list of supported formats (https://conduit.io/docs/using/connectors/configuration-parameters/output-format). + type: string + default: opencdc/json + validations: [] + - name: sdk.record.format.options + description: |- + Options to configure the chosen output record format. Options are normally + key=value pairs separated with comma (e.g. opt1=val2,opt2=val2), except + for the `template` record format, where options are a Go template. + type: string + default: "" + validations: [] + - name: sdk.schema.extract.key.enabled + description: Whether to extract and decode the record key with a schema. + type: bool + default: "true" + validations: [] + - name: sdk.schema.extract.payload.enabled + description: Whether to extract and decode the record payload with a schema. + type: bool + default: "true" + validations: [] + - name: table + description: Table is used as the target table into which records are inserted. + type: string + default: '{{ index .Metadata "opencdc.collection" }}' + validations: [] diff --git a/destination.go b/destination.go index d2e0a2ce..6844518b 100644 --- a/destination.go +++ b/destination.go @@ -21,7 +21,6 @@ import ( "strings" sq "github.com/Masterminds/squirrel" - "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-postgres/destination" sdk "github.com/conduitio/conduit-connector-sdk" @@ -38,45 +37,32 @@ type Destination struct { stmtBuilder sq.StatementBuilderType } +func (d *Destination) Config() sdk.DestinationConfig { + return &d.config +} + func NewDestination() sdk.Destination { d := &Destination{ stmtBuilder: sq.StatementBuilder.PlaceholderFormat(sq.Dollar), } - return sdk.DestinationWithMiddleware(d, sdk.DefaultDestinationMiddleware()...) + return sdk.DestinationWithMiddleware(d) } -func (d *Destination) Parameters() config.Parameters { - return d.config.Parameters() -} - -func (d *Destination) Configure(ctx context.Context, cfg config.Config) error { - err := sdk.Util.ParseConfig(ctx, cfg, &d.config, NewDestination().Parameters()) - if err != nil { - return err - } - // try parsing the url - _, err = pgx.ParseConfig(d.config.URL) +func (d *Destination) Open(ctx context.Context) error { + conn, err := pgx.Connect(ctx, d.config.URL) if err != nil { - return fmt.Errorf("invalid url: %w", err) + return fmt.Errorf("failed to open connection: %w", err) } + d.conn = conn d.getTableName, err = d.config.TableFunction() if err != nil { - return fmt.Errorf("invalid table name or table function: %w", err) + return fmt.Errorf("invalid table name or table name function: %w", err) } return nil } -func (d *Destination) Open(ctx context.Context) error { - conn, err := pgx.Connect(ctx, d.config.URL) - if err != nil { - return fmt.Errorf("failed to open connection: %w", err) - } - d.conn = conn - return nil -} - // Write routes incoming records to their appropriate handler based on the // operation. func (d *Destination) Write(ctx context.Context, recs []opencdc.Record) (int, error) { diff --git a/destination/config.go b/destination/config.go index 2d72152c..0739008a 100644 --- a/destination/config.go +++ b/destination/config.go @@ -12,23 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:generate paramgen Config - package destination import ( "bytes" + "context" "fmt" "strings" "text/template" "github.com/Masterminds/sprig/v3" "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/jackc/pgx/v5" ) type TableFn func(opencdc.Record) (string, error) type Config struct { + sdk.DefaultDestinationMiddleware + // URL is the connection string for the Postgres database. URL string `json:"url" validate:"required"` // Table is used as the target table into which records are inserted. @@ -37,10 +40,22 @@ type Config struct { Key string `json:"key"` } +func (c *Config) Validate(context.Context) error { + if _, err := pgx.ParseConfig(c.URL); err != nil { + return fmt.Errorf("invalid url: %w", err) + } + + if _, err := c.TableFunction(); err != nil { + return fmt.Errorf("invalid table name or table function: %w", err) + } + + return nil +} + // TableFunction returns a function that determines the table for each record individually. // The function might be returning a static table name. // If the table is neither static nor a template, an error is returned. -func (c Config) TableFunction() (f TableFn, err error) { +func (c *Config) TableFunction() (f TableFn, err error) { // Not a template, i.e. it's a static table name if !strings.HasPrefix(c.Table, "{{") && !strings.HasSuffix(c.Table, "}}") { return func(_ opencdc.Record) (string, error) { diff --git a/destination/paramgen.go b/destination/paramgen.go deleted file mode 100644 index 320c2233..00000000 --- a/destination/paramgen.go +++ /dev/null @@ -1,39 +0,0 @@ -// Code generated by paramgen. DO NOT EDIT. -// Source: github.com/ConduitIO/conduit-commons/tree/main/paramgen - -package destination - -import ( - "github.com/conduitio/conduit-commons/config" -) - -const ( - ConfigKey = "key" - ConfigTable = "table" - ConfigUrl = "url" -) - -func (Config) Parameters() map[string]config.Parameter { - return map[string]config.Parameter{ - ConfigKey: { - Default: "", - Description: "Key represents the column name for the key used to identify and update existing rows.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigTable: { - Default: "{{ index .Metadata \"opencdc.collection\" }}", - Description: "Table is used as the target table into which records are inserted.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigUrl: { - Default: "", - Description: "URL is the connection string for the Postgres database.", - Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationRequired{}, - }, - }, - } -} diff --git a/destination_integration_test.go b/destination_integration_test.go index d8e6a127..fd151cb2 100644 --- a/destination_integration_test.go +++ b/destination_integration_test.go @@ -21,6 +21,7 @@ import ( "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-postgres/test" + sdk "github.com/conduitio/conduit-connector-sdk" "github.com/jackc/pgx/v5" "github.com/matryer/is" ) @@ -32,14 +33,17 @@ func TestDestination_Write(t *testing.T) { tableName := test.SetupTestTable(ctx, t, conn) d := NewDestination() - err := d.Configure( + err := sdk.Util.ParseConfig( ctx, map[string]string{ "url": test.RegularConnString, "table": "{{ index .Metadata \"opencdc.collection\" }}", }, + d.Config(), + Connector.NewSpecification().DestinationParams, ) is.NoErr(err) + err = d.Open(ctx) is.NoErr(err) defer func() { @@ -149,8 +153,15 @@ func TestDestination_Batch(t *testing.T) { tableName := test.SetupTestTable(ctx, t, conn) d := NewDestination() - err := d.Configure(ctx, map[string]string{"url": test.RegularConnString, "table": tableName}) + + err := sdk.Util.ParseConfig( + ctx, + map[string]string{"url": test.RegularConnString, "table": tableName}, + d.Config(), + Connector.NewSpecification().DestinationParams, + ) is.NoErr(err) + err = d.Open(ctx) is.NoErr(err) defer func() { diff --git a/go.mod b/go.mod index d584c079..9fa06767 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/Masterminds/sprig/v3 v3.3.0 github.com/Masterminds/squirrel v1.5.4 github.com/conduitio/conduit-commons v0.5.0 - github.com/conduitio/conduit-connector-sdk v0.12.0 + github.com/conduitio/conduit-connector-sdk v0.13.0 github.com/daixiang0/gci v0.13.5 github.com/golangci/golangci-lint v1.63.4 github.com/google/go-cmp v0.6.0 @@ -36,7 +36,7 @@ require ( github.com/Djarvur/go-err113 v0.0.0-20210108212216-aea10b59be24 // indirect github.com/GaijinEntertainment/go-exhaustruct/v3 v3.3.0 // indirect github.com/Masterminds/goutils v1.1.1 // indirect - github.com/Masterminds/semver/v3 v3.3.0 // indirect + github.com/Masterminds/semver/v3 v3.3.1 // indirect github.com/OpenPeeDeeP/depguard/v2 v2.2.0 // indirect github.com/alecthomas/go-check-sumtype v0.3.1 // indirect github.com/alexkohler/nakedret/v2 v2.0.5 // indirect @@ -60,6 +60,9 @@ require ( github.com/chavacava/garif v0.1.0 // indirect github.com/ckaznocha/intrange v0.3.0 // indirect github.com/conduitio/conduit-connector-protocol v0.9.0 // indirect + github.com/conduitio/evolviconf v0.1.0 // indirect + github.com/conduitio/evolviconf/evolviyaml v0.1.0 // indirect + github.com/conduitio/yaml/v3 v3.3.0 // indirect github.com/curioswitch/go-reassign v0.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/denis-tingaikin/go-header v0.5.0 // indirect @@ -81,7 +84,7 @@ require ( github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/go-xmlfmt/xmlfmt v1.1.3 // indirect github.com/gobwas/glob v0.2.3 // indirect - github.com/goccy/go-json v0.10.3 // indirect + github.com/goccy/go-json v0.10.5 // indirect github.com/gofrs/flock v0.12.1 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a // indirect @@ -173,6 +176,9 @@ require ( github.com/ryanrolds/sqlclosecheck v0.5.1 // indirect github.com/sagikazarmark/locafero v0.6.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect + github.com/samber/lo v1.47.0 // indirect + github.com/samber/slog-common v0.18.1 // indirect + github.com/samber/slog-zerolog/v2 v2.7.3 // indirect github.com/sanposhiho/wastedassign/v2 v2.1.0 // indirect github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 // indirect github.com/sashamelentyev/interfacebloat v1.1.0 // indirect @@ -227,10 +233,10 @@ require ( golang.org/x/sync v0.10.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.21.0 // indirect - golang.org/x/time v0.8.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240930140551-af27646dc61f // indirect - google.golang.org/grpc v1.68.0 // indirect - google.golang.org/protobuf v1.35.1 // indirect + golang.org/x/time v0.9.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect + google.golang.org/grpc v1.70.0 // indirect + google.golang.org/protobuf v1.35.2 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index cb01b7a3..83baae69 100644 --- a/go.sum +++ b/go.sum @@ -24,8 +24,8 @@ github.com/GaijinEntertainment/go-exhaustruct/v3 v3.3.0 h1:/fTUt5vmbkAcMBt4YQiuC github.com/GaijinEntertainment/go-exhaustruct/v3 v3.3.0/go.mod h1:ONJg5sxcbsdQQ4pOW8TGdTidT2TMAUy/2Xhr8mrYaao= github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU= -github.com/Masterminds/semver/v3 v3.3.0 h1:B8LGeaivUe71a5qox1ICM/JLl0NqZSW5CHyL+hmvYS0= -github.com/Masterminds/semver/v3 v3.3.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= +github.com/Masterminds/semver/v3 v3.3.1 h1:QtNSWtVZ3nBfk8mAOu/B6v7FMJ+NHTIgUPi7rj+4nv4= +github.com/Masterminds/semver/v3 v3.3.1/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= github.com/Masterminds/sprig/v3 v3.3.0 h1:mQh0Yrg1XPo6vjYXgtf5OtijNAKJRNcTdOOGZe3tPhs= github.com/Masterminds/sprig/v3 v3.3.0/go.mod h1:Zy1iXRYNqNLUolqCpL4uhk6SHUMAOSCzdgBfDb35Lz0= github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM= @@ -84,8 +84,14 @@ github.com/conduitio/conduit-commons v0.5.0 h1:28UIuOIo+6WvBZ4EU54KfPhSf44I1/Y65 github.com/conduitio/conduit-commons v0.5.0/go.mod h1:xyT6XpGvj79gdtsn3qaD2KxadhsAYS+mmBOdln08Wio= github.com/conduitio/conduit-connector-protocol v0.9.0 h1:7MailxYxAsr376Nz8WStVYSXnlf86bjtzpA/d/66if0= github.com/conduitio/conduit-connector-protocol v0.9.0/go.mod h1:lF7RUjr9ZMj1rtNubaryHw4mPfjj4DGYDW+wvvRwBkM= -github.com/conduitio/conduit-connector-sdk v0.12.0 h1:WD/ZQhEAJMkvkq0KIyVCGeU8ni2ASMyPpBbAWZQ+lKo= -github.com/conduitio/conduit-connector-sdk v0.12.0/go.mod h1:keZ4eZ4q+7GFEz+Q8G97wvPrrdnBoxh+Bmxl9P9pZW0= +github.com/conduitio/conduit-connector-sdk v0.13.0 h1:TcPwCJJOhNhpOO4H7fqeykVUxoew2M/Bp6DNuNjsgiU= +github.com/conduitio/conduit-connector-sdk v0.13.0/go.mod h1:rduMJDB541+dolgZdCkPPaJk2CHPaTe1FUnV1YvwTl0= +github.com/conduitio/evolviconf v0.1.0 h1:rcG+hs6tlrYlX9qomOQJz+K+OnDhbMbioGx3ci55yo0= +github.com/conduitio/evolviconf v0.1.0/go.mod h1:RnbnSqDDYarKgG2p+krP71svG6qLms3+/TnKrPKWk+0= +github.com/conduitio/evolviconf/evolviyaml v0.1.0 h1:nMW7CROIMtHhscm/QLMpMs7uCPp6O2dS4CfU9bhugd4= +github.com/conduitio/evolviconf/evolviyaml v0.1.0/go.mod h1:22+FHPuroT5pPZpg0fuhE8ACIMCl1S+HsAFN1CM3Vho= +github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI= +github.com/conduitio/yaml/v3 v3.3.0/go.mod h1:JNgFMOX1t8W4YJuRZOh6GggVtSMsgP9XgTw+7dIenpc= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/curioswitch/go-reassign v0.3.0 h1:dh3kpQHuADL3cobV/sSGETA8DOv457dwl+fbBAhrQPs= @@ -121,6 +127,8 @@ github.com/go-critic/go-critic v0.11.5 h1:TkDTOn5v7EEngMxu8KbuFqFR43USaaH8XRJLz1 github.com/go-critic/go-critic v0.11.5/go.mod h1:wu6U7ny9PiaHaZHcvMDmdysMqvDem162Rh3zWTrqk8M= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI= github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= @@ -150,8 +158,8 @@ github.com/go-xmlfmt/xmlfmt v1.1.3 h1:t8Ey3Uy7jDSEisW2K3somuMKIpzktkWptA0iFCnRUW github.com/go-xmlfmt/xmlfmt v1.1.3/go.mod h1:aUCEOzzezBEjDBbFBoSiya/gduyIiWYRP6CnSFIV8AM= github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= -github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= -github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/flock v0.12.1 h1:MTLVXXHf8ekldpJk3AKicLij9MdwOWkZ+a/jHHZby9E= github.com/gofrs/flock v0.12.1/go.mod h1:9zxTsyu5xtJ9DK+1tFZyibEV7y3uwDxPPfbxeeHCoD0= @@ -240,8 +248,8 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jgautheron/goconst v1.7.1 h1:VpdAG7Ca7yvvJk5n8dMwQhfEZJh95kl/Hl9S1OI5Jkk= github.com/jgautheron/goconst v1.7.1/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4= -github.com/jhump/protoreflect v1.16.0 h1:54fZg+49widqXYQ0b+usAFHbMkBGR4PpXrsHc8+TBDg= -github.com/jhump/protoreflect v1.16.0/go.mod h1:oYPd7nPvcBw/5wlDfm/AVmU9zH9BgqGCI469pGxfj/8= +github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c= +github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo= github.com/jingyugao/rowserrcheck v1.1.1 h1:zibz55j/MJtLsjP1OF4bSdgXxwL1b+Vn7Tjzq7gFzUs= github.com/jingyugao/rowserrcheck v1.1.1/go.mod h1:4yvlZSDb3IyDTUZJUmpZfm2Hwok+Dtp+nu2qOq+er9c= github.com/jjti/go-spancheck v0.6.4 h1:Tl7gQpYf4/TMU7AT84MN83/6PutY21Nb9fuQjFTpRRc= @@ -403,6 +411,12 @@ github.com/sagikazarmark/locafero v0.6.0 h1:ON7AQg37yzcRPU69mt7gwhFEBwxI6P9T4Qu3 github.com/sagikazarmark/locafero v0.6.0/go.mod h1:77OmuIc6VTraTXKXIs/uvUxKGUXjE1GbemJYHqdNjX0= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= +github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc= +github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= +github.com/samber/slog-common v0.18.1 h1:c0EipD/nVY9HG5shgm/XAs67mgpWDMF+MmtptdJNCkQ= +github.com/samber/slog-common v0.18.1/go.mod h1:QNZiNGKakvrfbJ2YglQXLCZauzkI9xZBjOhWFKS3IKk= +github.com/samber/slog-zerolog/v2 v2.7.3 h1:/MkPDl/tJhijN2GvB1MWwBn2FU8RiL3rQ8gpXkQm2EY= +github.com/samber/slog-zerolog/v2 v2.7.3/go.mod h1:oWU7WHof4Xp8VguiNO02r1a4VzkgoOyOZhY5CuRke60= github.com/sanposhiho/wastedassign/v2 v2.1.0 h1:crurBF7fJKIORrV85u9UUpePDYGWnwvv3+A96WvwXT0= github.com/sanposhiho/wastedassign/v2 v2.1.0/go.mod h1:+oSmSC+9bQ+VUAxA66nBb0Z7N8CK7mscKTDYC6aIek4= github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 h1:PKK9DyHxif4LZo+uQSgXNqs0jj5+xZwwfKHgph2lxBw= @@ -511,6 +525,16 @@ go-simpler.org/musttag v0.13.0 h1:Q/YAW0AHvaoaIbsPj3bvEI5/QFP7w696IMUpnKXQfCE= go-simpler.org/musttag v0.13.0/go.mod h1:FTzIGeK6OkKlUDVpj0iQUXZLUO1Js9+mvykDQy9C5yM= go-simpler.org/sloglint v0.7.2 h1:Wc9Em/Zeuu7JYpl+oKoYOsQSy2X560aVueCW/m6IijY= go-simpler.org/sloglint v0.7.2/go.mod h1:US+9C80ppl7VsThQclkM7BkCHQAzuz8kHLsW3ppuluo= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= +go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= +go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= +go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= +go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU= +go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -630,8 +654,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= -golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= +golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190321232350-e250d351ecad/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -658,12 +682,12 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240930140551-af27646dc61f h1:cUMEy+8oS78BWIH9OWazBkzbr090Od9tWBNtZHkOhf0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240930140551-af27646dc61f/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= -google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0= -google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a h1:hgh8P4EuoxpsuKMXX/To36nOFD7vixReXgn8lPGnt+o= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= +google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= +google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/source.go b/source.go index aa82f69d..f9d2b7fa 100644 --- a/source.go +++ b/source.go @@ -42,36 +42,27 @@ type Source struct { tableKeys map[string]string } +func (s *Source) Config() sdk.SourceConfig { + return &s.config +} + func NewSource() sdk.Source { return sdk.SourceWithMiddleware( &Source{ tableKeys: make(map[string]string), - }, - sdk.DefaultSourceMiddleware( - // disable schema extraction by default, postgres will build its own schema - sdk.SourceWithSchemaExtractionConfig{ - PayloadEnabled: lang.Ptr(false), - KeyEnabled: lang.Ptr(false), + config: source.Config{ + DefaultSourceMiddleware: sdk.DefaultSourceMiddleware{ + // disable schema extraction by default, postgres will build its own schema + SourceWithSchemaExtraction: sdk.SourceWithSchemaExtraction{ + PayloadEnabled: lang.Ptr(false), + KeyEnabled: lang.Ptr(false), + }, + }, }, - )..., + }, ) } -func (s *Source) Parameters() config.Parameters { - return s.config.Parameters() -} - -func (s *Source) Configure(ctx context.Context, cfg config.Config) error { - err := sdk.Util.ParseConfig(ctx, cfg, &s.config, NewSource().Parameters()) - if err != nil { - return err - } - - s.config = s.config.Init() - - return s.config.Validate() -} - func (s *Source) Open(ctx context.Context, pos opencdc.Position) error { pool, err := cpool.New(ctx, s.config.URL) if err != nil { @@ -157,42 +148,28 @@ func (s *Source) Teardown(ctx context.Context) error { } func (s *Source) LifecycleOnDeleted(ctx context.Context, cfg config.Config) error { - if err := s.Configure(ctx, cfg); err != nil { - return fmt.Errorf("fail to handle lifecycle delete event: %w", err) - } - - // N.B. This should not stay in here for long, enrich the default. - // Events are not passed enriched config with defaults. - params := s.config.Parameters() - - if _, ok := cfg["logrepl.autoCleanup"]; !ok { // not set - s.config.LogreplAutoCleanup = params["logrepl.autoCleanup"].Default == "true" - } - - if _, ok := cfg["logrepl.slotName"]; !ok { - s.config.LogreplSlotName = params["logrepl.slotName"].Default - } - - if _, ok := cfg["logrepl.publicationName"]; !ok { - s.config.LogreplPublicationName = params["logrepl.publicationName"].Default + var oldConfig source.Config + err := sdk.Util.ParseConfig(ctx, cfg, &oldConfig, Connector.NewSpecification().SourceParams) + if err != nil { + return fmt.Errorf("lifecycle delete event: failed to parse configuration: %w", err) } - switch s.config.CDCMode { + switch oldConfig.CDCMode { case source.CDCModeAuto: fallthrough // TODO: Adjust as `auto` changes. case source.CDCModeLogrepl: - if !s.config.LogreplAutoCleanup { + if !oldConfig.LogreplAutoCleanup { sdk.Logger(ctx).Warn().Msg("Skipping logrepl auto cleanup") return nil } return logrepl.Cleanup(ctx, logrepl.CleanupConfig{ - URL: s.config.URL, - SlotName: s.config.LogreplSlotName, - PublicationName: s.config.LogreplPublicationName, + URL: oldConfig.URL, + SlotName: oldConfig.LogreplSlotName, + PublicationName: oldConfig.LogreplPublicationName, }) default: - sdk.Logger(ctx).Warn().Msgf("cannot handle CDC mode %q", s.config.CDCMode) + sdk.Logger(ctx).Warn().Msgf("cannot handle CDC mode %q", oldConfig.CDCMode) return nil } } diff --git a/source/config.go b/source/config.go index 3d921a34..bb02a0a7 100644 --- a/source/config.go +++ b/source/config.go @@ -12,15 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:generate paramgen Config - package source import ( + "context" "errors" "fmt" "github.com/conduitio/conduit-commons/config" + sdk "github.com/conduitio/conduit-connector-sdk" "github.com/jackc/pgx/v5" ) @@ -47,6 +47,8 @@ const ( ) type Config struct { + sdk.DefaultSourceMiddleware + // URL is the connection string for the Postgres database. URL string `json:"url" validate:"required"` @@ -82,12 +84,11 @@ type Config struct { } // Validate validates the provided config values. -func (c Config) Validate() error { - var errs []error +func (c *Config) Validate(context.Context) error { + c.Init() - // try parsing the url - _, err := pgx.ParseConfig(c.URL) - if err != nil { + var errs []error + if _, err := pgx.ParseConfig(c.URL); err != nil { errs = append(errs, fmt.Errorf("invalid url: %w", err)) } @@ -102,10 +103,9 @@ func (c Config) Validate() error { } // Init sets the desired value on Tables while Table is being deprecated. -func (c Config) Init() Config { +func (c *Config) Init() { if len(c.Table) > 0 && len(c.Tables) == 0 { c.Tables = c.Table c.Table = nil } - return c } diff --git a/source/config_test.go b/source/config_test.go index 7e9ae3bf..7d7e313b 100644 --- a/source/config_test.go +++ b/source/config_test.go @@ -15,6 +15,7 @@ package source import ( + "context" "testing" "github.com/matryer/is" @@ -47,7 +48,7 @@ func TestConfig_Validate(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { is := is.New(t) - err := tc.cfg.Validate() + err := tc.cfg.Validate(context.Background()) if tc.wantErr { is.True(err != nil) return diff --git a/source/paramgen.go b/source/paramgen.go deleted file mode 100644 index 32164496..00000000 --- a/source/paramgen.go +++ /dev/null @@ -1,92 +0,0 @@ -// Code generated by paramgen. DO NOT EDIT. -// Source: github.com/ConduitIO/conduit-commons/tree/main/paramgen - -package source - -import ( - "github.com/conduitio/conduit-commons/config" -) - -const ( - ConfigCdcMode = "cdcMode" - ConfigLogreplAutoCleanup = "logrepl.autoCleanup" - ConfigLogreplPublicationName = "logrepl.publicationName" - ConfigLogreplSlotName = "logrepl.slotName" - ConfigLogreplWithAvroSchema = "logrepl.withAvroSchema" - ConfigSnapshotFetchSize = "snapshot.fetchSize" - ConfigSnapshotMode = "snapshotMode" - ConfigTable = "table" - ConfigTables = "tables" - ConfigUrl = "url" -) - -func (Config) Parameters() map[string]config.Parameter { - return map[string]config.Parameter{ - ConfigCdcMode: { - Default: "auto", - Description: "CDCMode determines how the connector should listen to changes.", - Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationInclusion{List: []string{"auto", "logrepl"}}, - }, - }, - ConfigLogreplAutoCleanup: { - Default: "true", - Description: "LogreplAutoCleanup determines if the replication slot and publication should be\nremoved when the connector is deleted.", - Type: config.ParameterTypeBool, - Validations: []config.Validation{}, - }, - ConfigLogreplPublicationName: { - Default: "conduitpub", - Description: "LogreplPublicationName determines the publication name in case the\nconnector uses logical replication to listen to changes (see CDCMode).", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigLogreplSlotName: { - Default: "conduitslot", - Description: "LogreplSlotName determines the replication slot name in case the\nconnector uses logical replication to listen to changes (see CDCMode).", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigLogreplWithAvroSchema: { - Default: "true", - Description: "WithAvroSchema determines whether the connector should attach an avro schema on each\nrecord.", - Type: config.ParameterTypeBool, - Validations: []config.Validation{}, - }, - ConfigSnapshotFetchSize: { - Default: "50000", - Description: "Snapshot fetcher size determines the number of rows to retrieve at a time.", - Type: config.ParameterTypeInt, - Validations: []config.Validation{}, - }, - ConfigSnapshotMode: { - Default: "initial", - Description: "SnapshotMode is whether the plugin will take a snapshot of the entire table before starting cdc mode.", - Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationInclusion{List: []string{"initial", "never"}}, - }, - }, - ConfigTable: { - Default: "", - Description: "Deprecated: use `tables` instead.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigTables: { - Default: "", - Description: "Tables is a List of table names to read from, separated by a comma, e.g.:\"table1,table2\".\nUse \"*\" if you'd like to listen to all tables.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigUrl: { - Default: "", - Description: "URL is the connection string for the Postgres database.", - Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationRequired{}, - }, - }, - } -} diff --git a/source_integration_test.go b/source_integration_test.go index b051b242..c92e31aa 100644 --- a/source_integration_test.go +++ b/source_integration_test.go @@ -20,6 +20,7 @@ import ( "github.com/conduitio/conduit-connector-postgres/source/logrepl" "github.com/conduitio/conduit-connector-postgres/test" + sdk "github.com/conduitio/conduit-connector-sdk" "github.com/matryer/is" ) @@ -32,7 +33,7 @@ func TestSource_Open(t *testing.T) { publicationName := "conduitpub1" s := NewSource() - err := s.Configure( + err := sdk.Util.ParseConfig( ctx, map[string]string{ "url": test.RepmgrConnString, @@ -42,6 +43,8 @@ func TestSource_Open(t *testing.T) { "logrepl.slotName": slotName, "logrepl.publicationName": publicationName, }, + s.Config(), + Connector.NewSpecification().SourceParams, ) is.NoErr(err) diff --git a/spec.go b/spec.go deleted file mode 100644 index c65e2f61..00000000 --- a/spec.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright © 2022 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package postgres - -import ( - sdk "github.com/conduitio/conduit-connector-sdk" -) - -// version is set during the build process (i.e. the Makefile). -// Default version matches default from runtime/debug. -var version = "(devel)" - -// Specification returns the Plugin's Specification. -func Specification() sdk.Specification { - return sdk.Specification{ - Name: "postgres", - Summary: "A PostgreSQL source and destination plugin for Conduit.", - Version: version, - Author: "Meroxa, Inc.", - } -} diff --git a/tools.go b/tools.go index eef7ac3d..4832f771 100644 --- a/tools.go +++ b/tools.go @@ -17,7 +17,7 @@ package postgres import ( - _ "github.com/conduitio/conduit-commons/paramgen" + _ "github.com/conduitio/conduit-connector-sdk/conn-sdk-cli" _ "github.com/daixiang0/gci" _ "github.com/golangci/golangci-lint/cmd/golangci-lint" _ "golang.org/x/tools/cmd/stringer"