Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions changelog.d/24461_postgres_sink_columns.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
## Add columns configuration to PostgreSQL sink

Adds a new `columns` configuration option to the PostgreSQL sink that allows users to explicitly specify which columns to insert data into. This addresses the issue where PostgreSQL's default values and serial columns are not supported, as described in issue #24461.

### Key Changes

- Added `columns` parameter to `PostgresConfig` as an optional `Vec<String>`
- Modified `PostgresService` to accept and use the columns configuration
- Updated SQL query generation to use specified columns when provided
- Updated documentation warnings to mention the new columns feature
- Added test coverage for the new configuration option

### Usage Example

```yaml
sinks:
my_sink_id:
type: postgres
inputs:
- my-source-or-transform-id
endpoint: postgres://user:password@localhost/default
table: table1
columns:
- column1
- column2
```

This allows excluding columns like serial/auto-increment columns that should be handled by PostgreSQL, fixing the issue where NULL values were inserted into serial columns.

Fixes #24461
10 changes: 10 additions & 0 deletions src/sinks/postgres/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ pub struct PostgresConfig {
/// as table names as parameters in prepared statements are not allowed in PostgreSQL.
pub table: String,

/// The columns to insert data into. If not specified, all columns from the input data will be used.
/// This allows you to exclude columns like serial/auto-increment columns that should be handled by PostgreSQL.
/// This parameter is vulnerable to SQL injection attacks as Vector does not validate or sanitize it, you must not use untrusted input.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub columns: Vec<String>,

/// The postgres connection pool size. See [this](https://docs.rs/sqlx/latest/sqlx/struct.Pool.html#why-use-a-pool) for more
/// information about why a connection pool should be used.
#[serde(default = "default_pool_size")]
Expand Down Expand Up @@ -79,6 +85,7 @@ impl GenerateConfig for PostgresConfig {
toml::from_str(
r#"endpoint = "postgres://user:password@localhost/default"
table = "table"
columns = ["column1", "column2"]
"#,
)
.unwrap()
Expand All @@ -103,6 +110,7 @@ impl SinkConfig for PostgresConfig {
connection_pool,
self.table.clone(),
endpoint_uri.to_string(),
self.columns.clone(),
);
let service = ServiceBuilder::new()
.settings(request_settings, PostgresRetryLogic)
Expand Down Expand Up @@ -142,10 +150,12 @@ mod tests {
r#"
endpoint = "postgres://user:password@localhost/default"
table = "mytable"
columns = ["column1", "column2"]
"#,
)
.unwrap();
assert_eq!(cfg.endpoint, "postgres://user:password@localhost/default");
assert_eq!(cfg.table, "mytable");
assert_eq!(cfg.columns, vec!["column1", "column2"]);
}
}
21 changes: 17 additions & 4 deletions src/sinks/postgres/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@ pub struct PostgresService {
connection_pool: Pool<Postgres>,
table: String,
endpoint: String,
columns: Vec<String>,
}

impl PostgresService {
pub const fn new(connection_pool: Pool<Postgres>, table: String, endpoint: String) -> Self {
pub const fn new(connection_pool: Pool<Postgres>, table: String, endpoint: String, columns: Vec<String>) -> Self {
Self {
connection_pool,
table,
endpoint,
columns,
}
}
}
Expand Down Expand Up @@ -142,6 +144,7 @@ impl Service<PostgresRequest> for PostgresService {
let service = self.clone();
let future = async move {
let table = service.table;
let columns = service.columns;
let metadata = request.metadata;
let json_serializer = JsonSerializerConfig::default().build();
let serialized_values = request
Expand All @@ -151,9 +154,19 @@ impl Service<PostgresRequest> for PostgresService {
.collect::<Result<Vec<_>, _>>()
.context(VectorCommonSnafu)?;

sqlx::query(&format!(
"INSERT INTO {table} SELECT * FROM jsonb_populate_recordset(NULL::{table}, $1)"
))
let query = if columns.is_empty() {
format!(
"INSERT INTO {table} SELECT * FROM jsonb_populate_recordset(NULL::{table}, $1)"
)
} else {
let columns_str = columns.join(", ");
format!(
"INSERT INTO {table} ({}) SELECT {} FROM jsonb_populate_recordset(NULL::{table}, $1)",
columns_str, columns_str
)
};

sqlx::query(&query)
.bind(Json(serialized_values))
.execute(&service.connection_pool)
.await
Expand Down
3 changes: 3 additions & 0 deletions website/cue/reference/components/sinks/postgres.cue
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ components: sinks: postgres: {
As a workaround, you can add a `NOT NULL` constraint to the column, so when inserting an event which is missing that field
a `NOT NULL` constraint violation would be raised, and define a [constraint trigger](\(urls.postgresql_constraint_trigger))
to catch the exception and set the desired default value.

Alternatively, you can use the `columns` configuration option to explicitly specify which columns to insert,
excluding columns like serial/auto-increment columns that should be handled by PostgreSQL.
""",
]
notices: []
Expand Down
Loading