Skip to content

Commit 62564da

Browse files
committed
feat: Add columns configuration to PostgreSQL sink
Add support for explicit column specification in PostgreSQL sink to address issue with default values and serial columns. This allows users to exclude columns like serial/auto-increment fields that should be handled by PostgreSQL itself. Changes: - Add optional `columns` parameter to PostgresConfig - Modify PostgresService to use specified columns in SQL queries - Update documentation warnings to mention new feature - Add test coverage for columns configuration
1 parent 473e31c commit 62564da

File tree

4 files changed

+60
-4
lines changed

4 files changed

+60
-4
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
## Add columns configuration to PostgreSQL sink
2+
3+
Adds a new `columns` configuration option to the PostgreSQL sink that allows users to explicitly specify which columns to insert data into. This addresses the issue where PostgreSQL's default values and serial columns are not supported, as described in issue #24461.
4+
5+
### Key Changes
6+
7+
- Added `columns` parameter to `PostgresConfig` as an optional `Vec<String>`
8+
- Modified `PostgresService` to accept and use the columns configuration
9+
- Updated SQL query generation to use specified columns when provided
10+
- Updated documentation warnings to mention the new columns feature
11+
- Added test coverage for the new configuration option
12+
13+
### Usage Example
14+
15+
```yaml
16+
sinks:
17+
my_sink_id:
18+
type: postgres
19+
inputs:
20+
- my-source-or-transform-id
21+
endpoint: postgres://user:password@localhost/default
22+
table: table1
23+
columns:
24+
- column1
25+
- column2
26+
```
27+
28+
This allows excluding columns like serial/auto-increment columns that should be handled by PostgreSQL, fixing the issue where NULL values were inserted into serial columns.
29+
30+
Fixes #24461

src/sinks/postgres/config.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ pub struct PostgresConfig {
4242
/// as table names as parameters in prepared statements are not allowed in PostgreSQL.
4343
pub table: String,
4444

45+
/// The columns to insert data into. If not specified, all columns from the input data will be used.
46+
/// This allows you to exclude columns like serial/auto-increment columns that should be handled by PostgreSQL.
47+
/// This parameter is vulnerable to SQL injection attacks as Vector does not validate or sanitize it, you must not use untrusted input.
48+
#[serde(default, skip_serializing_if = "Vec::is_empty")]
49+
pub columns: Vec<String>,
50+
4551
/// The postgres connection pool size. See [this](https://docs.rs/sqlx/latest/sqlx/struct.Pool.html#why-use-a-pool) for more
4652
/// information about why a connection pool should be used.
4753
#[serde(default = "default_pool_size")]
@@ -79,6 +85,7 @@ impl GenerateConfig for PostgresConfig {
7985
toml::from_str(
8086
r#"endpoint = "postgres://user:password@localhost/default"
8187
table = "table"
88+
columns = ["column1", "column2"]
8289
"#,
8390
)
8491
.unwrap()
@@ -103,6 +110,7 @@ impl SinkConfig for PostgresConfig {
103110
connection_pool,
104111
self.table.clone(),
105112
endpoint_uri.to_string(),
113+
self.columns.clone(),
106114
);
107115
let service = ServiceBuilder::new()
108116
.settings(request_settings, PostgresRetryLogic)
@@ -142,10 +150,12 @@ mod tests {
142150
r#"
143151
endpoint = "postgres://user:password@localhost/default"
144152
table = "mytable"
153+
columns = ["column1", "column2"]
145154
"#,
146155
)
147156
.unwrap();
148157
assert_eq!(cfg.endpoint, "postgres://user:password@localhost/default");
149158
assert_eq!(cfg.table, "mytable");
159+
assert_eq!(cfg.columns, vec!["column1", "column2"]);
150160
}
151161
}

src/sinks/postgres/service.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,16 @@ pub struct PostgresService {
5050
connection_pool: Pool<Postgres>,
5151
table: String,
5252
endpoint: String,
53+
columns: Vec<String>,
5354
}
5455

5556
impl PostgresService {
56-
pub const fn new(connection_pool: Pool<Postgres>, table: String, endpoint: String) -> Self {
57+
pub const fn new(connection_pool: Pool<Postgres>, table: String, endpoint: String, columns: Vec<String>) -> Self {
5758
Self {
5859
connection_pool,
5960
table,
6061
endpoint,
62+
columns,
6163
}
6264
}
6365
}
@@ -142,6 +144,7 @@ impl Service<PostgresRequest> for PostgresService {
142144
let service = self.clone();
143145
let future = async move {
144146
let table = service.table;
147+
let columns = service.columns;
145148
let metadata = request.metadata;
146149
let json_serializer = JsonSerializerConfig::default().build();
147150
let serialized_values = request
@@ -151,9 +154,19 @@ impl Service<PostgresRequest> for PostgresService {
151154
.collect::<Result<Vec<_>, _>>()
152155
.context(VectorCommonSnafu)?;
153156

154-
sqlx::query(&format!(
155-
"INSERT INTO {table} SELECT * FROM jsonb_populate_recordset(NULL::{table}, $1)"
156-
))
157+
let query = if columns.is_empty() {
158+
format!(
159+
"INSERT INTO {table} SELECT * FROM jsonb_populate_recordset(NULL::{table}, $1)"
160+
)
161+
} else {
162+
let columns_str = columns.join(", ");
163+
format!(
164+
"INSERT INTO {table} ({}) SELECT {} FROM jsonb_populate_recordset(NULL::{table}, $1)",
165+
columns_str, columns_str
166+
)
167+
};
168+
169+
sqlx::query(&query)
157170
.bind(Json(serialized_values))
158171
.execute(&service.connection_pool)
159172
.await

website/cue/reference/components/sinks/postgres.cue

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ components: sinks: postgres: {
5454
As a workaround, you can add a `NOT NULL` constraint to the column, so when inserting an event which is missing that field
5555
a `NOT NULL` constraint violation would be raised, and define a [constraint trigger](\(urls.postgresql_constraint_trigger))
5656
to catch the exception and set the desired default value.
57+
58+
Alternatively, you can use the `columns` configuration option to explicitly specify which columns to insert,
59+
excluding columns like serial/auto-increment columns that should be handled by PostgreSQL.
5760
""",
5861
]
5962
notices: []

0 commit comments

Comments
 (0)