Skip to content

sql_insert doesn't support bloblang interpolation in dsn field #3765

@darshanj

Description

@darshanj

Hi,

Is there a way to support interpolation in dsn field in sql_insert?
Our usecase: we read messages as inputs, we want to use AWS IAM role based authentication to postgres on RDS. So dsn will have password as token generated by aws.

There can be other usecases like based on metadata of message we want to route message to different DB server host.

I can see code in
https://github.com/redpanda-data/connect/blob/main/internal/impl/sql/conn_fields.go#L33
https://github.com/redpanda-data/connect/blob/main/internal/impl/sql/output_sql_insert.go#L38C9-L38C17
dsnField is not defined as NewBloblangField like arg_mapping or topics for input here.

Purposed solutions (either of below):

  1. we can change dsnField to NewBloblangField
  2. add support for IAM based authentication which will use golang aws sdk to get token and use that instead of password in sql_insert output.

output:
  sql_insert:
    driver: postgres
    dsn: 'postgres://postgres:${! meta("pg_pass") }@localhost:5432/postgres?sslmode=disable'
    table: test
    iam_based_auth:
          role: 'somerole_aws_role' 
          ....

My test config:


inputs:
  label: ""
  redpanda:
    seed_brokers: [] # No default (optional)
    topics: [] # No default (required)
    regexp_topics: false
    transaction_isolation_level: read_uncommitted
    consumer_group: "" # No default (optional)
    auto_replay_nacks: true
pipeline:
  processors:
    - mapping: |
#        we need to get this from message metadata or our dynamic plugin which will generate password for given message
#        metadata which will replace pg_pass here.
        meta pg_pass = "postgres" 
output:
  sql_insert:
    init_statement: | # No default (optional)
      CREATE TABLE IF NOT EXISTS test (
           alias VARCHAR,
           query VARCHAR not null,
           parsing_time BIGINT);
    driver: postgres
    dsn: 'postgres://postgres:${! meta("pg_pass") }@localhost:5432/postgres?sslmode=disable'
    table: test
    columns:
      [
        "alias",
        "query",
        "parsing_time"
      ]
    args_mapping: root = [
      this.alias,
      this.query,
      this.parsing_time ]
    max_in_flight: 64

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions