-
Notifications
You must be signed in to change notification settings - Fork 106
Open
Description
Seeing something abnormal with retry processor, if the message was deleted in one of the retry processors and using a sql_insert output, was not expecting execution of sql_insert. But sql_insert fails with a error. Here is a sample pipeline that fails.
input:
generate:
interval: 1m
mapping: |-
root = { "p1": "test val" }
pipeline:
processors:
- retry:
backoff:
initial_interval: 500ms
max_interval: 10s
max_elapsed_time: 0s
processors:
- log:
message: 'incoming msg ${! content() }'
- mapping: root = deleted()
output:
sql_insert:
driver: postgres
dsn: postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:5432/${DB_NAME}?sslmode=disable
conn_max_idle_time: 1m
table: test_table
columns:
- p1
args_mapping: |
root = [
this.p1
]
init_statement:
CREATE TABLE IF NOT EXISTS test_table (
p1 varchar
)
and console output:
INFO incoming msg {"p1":"test_value"} @service=benthos custom_source=true label="" path=root.pipeline.processors.0.retry.processors.0
INFO Output type sql_insert is now active @service=benthos label="" path=root.output
ERRO Failed to send message to sql_insert: insert statements must have at least one set of values or select clause @service=benthos label="" path=root.output
INFO incoming msg {"p1":"test_value"} @service=benthos custom_source=true label="" path=root.pipeline.processors.0.retry.processors.0
ERRO Failed to send message to sql_insert: insert statements must have at least one set of values or select clause @service=benthos label="" path=root.output
INFO incoming msg {"p1":"test_value"} @service=benthos custom_source=true label="" path=root.pipeline.processors.0.retry.processors.0
Could workaround it by deleting message in a pre-processor to sql_insert, like below:
output:
sql_insert:
driver: postgres
dsn: postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:5432/${DB_NAME}?sslmode=disable
conn_max_idle_time: 1m
table: test_table
columns:
- p1
args_mapping: |
root = [
this.p1
]
init_statement:
CREATE TABLE IF NOT EXISTS test_table (
p1 varchar
)
processors:
- mapping: |-
root = if this == null {
deleted()
}
Metadata
Metadata
Assignees
Labels
No labels