diff --git a/core/connectors/runtime/src/error.rs b/core/connectors/runtime/src/error.rs index 0f6705faff..225ac85eae 100644 --- a/core/connectors/runtime/src/error.rs +++ b/core/connectors/runtime/src/error.rs @@ -31,6 +31,19 @@ pub enum RuntimeError { FailedToSerializeRawMessages, #[error("Failed to serialize headers")] FailedToSerializeHeaders, + #[error( + "Sink connector with ID: {plugin_id} failed to consume {processed_count} processed messages from stream: {stream}, topic: {topic}, partition: {partition_id}, current offset: {current_offset}, schema: {schema}, status: {status}" + )] + SinkConsumeFailed { + plugin_id: u32, + status: i32, + stream: String, + topic: String, + partition_id: u32, + current_offset: u64, + schema: String, + processed_count: usize, + }, #[error("Connector SDK error")] ConnectorSdkError(#[from] iggy_connector_sdk::Error), #[error("Iggy client error")] @@ -75,6 +88,7 @@ impl RuntimeError { RuntimeError::MissingIggyCredentials => "invalid_configuration", RuntimeError::InvalidConfiguration(_) => "invalid_configuration", RuntimeError::HttpRequestFailed(_) => "http_request_failed", + RuntimeError::SinkConsumeFailed { .. } => "sink_consume_failed", RuntimeError::TokenFileNotFound(_) => "invalid_configuration", RuntimeError::TokenFileReadError(_, _) => "invalid_configuration", RuntimeError::TokenFileEmpty(_) => "invalid_configuration", diff --git a/core/connectors/runtime/src/sink.rs b/core/connectors/runtime/src/sink.rs index b6c8965d3e..bf602ffb09 100644 --- a/core/connectors/runtime/src/sink.rs +++ b/core/connectors/runtime/src/sink.rs @@ -582,7 +582,7 @@ async fn process_messages( RuntimeError::FailedToSerializeRawMessages })?; - (consume)( + let status = (consume)( plugin_id, topic_meta.as_ptr(), topic_meta.len(), @@ -591,6 +591,100 @@ async fn process_messages( messages.as_ptr(), messages.len(), ); + if status != 0 { + return Err(RuntimeError::SinkConsumeFailed { + plugin_id, + status, + stream: topic_metadata.stream.clone(), + topic: topic_metadata.topic.clone(), + partition_id: messages_metadata.partition_id, + current_offset: messages_metadata.current_offset, + schema: messages_metadata.schema.to_string(), + processed_count, + }); + } Ok(processed_count) } + +#[cfg(test)] +mod tests { + use super::*; + use iggy::prelude::IggyMessageHeader; + use iggy_connector_sdk::{Error, Payload, Schema}; + + struct TestDecoder; + + impl StreamDecoder for TestDecoder { + fn schema(&self) -> Schema { + Schema::Raw + } + + fn decode(&self, payload: Vec) -> Result { + Ok(Payload::Raw(payload)) + } + } + + extern "C" fn failing_consume( + _plugin_id: u32, + _topic_meta_ptr: *const u8, + _topic_meta_len: usize, + _messages_meta_ptr: *const u8, + _messages_meta_len: usize, + _messages_ptr: *const u8, + _messages_len: usize, + ) -> i32 { + 1 + } + + #[tokio::test] + async fn process_messages_returns_error_when_consume_callback_fails() { + let plugin_id = 42; + let consume: ConsumeCallback = failing_consume; + let decoder: Arc = Arc::new(TestDecoder); + let result = process_messages( + plugin_id, + MessagesMetadata { + partition_id: 1, + current_offset: 0, + schema: Schema::Raw, + }, + &TopicMetadata { + stream: "stream".to_string(), + topic: "topic".to_string(), + }, + vec![IggyMessage { + header: IggyMessageHeader { + checksum: 1, + id: 2, + offset: 0, + timestamp: 3, + origin_timestamp: 4, + user_headers_length: 0, + payload_length: 7, + reserved: 0, + }, + payload: "payload".into(), + user_headers: None, + }], + &consume, + &Vec::new(), + &decoder, + ) + .await; + + assert!(matches!( + result, + Err(RuntimeError::SinkConsumeFailed { + plugin_id: 42, + status: 1, + stream, + topic, + partition_id: 1, + current_offset: 0, + schema, + processed_count: 1 + }) if stream == "stream" && topic == "topic" && schema == "raw" + )); + } +} diff --git a/core/connectors/sinks/http_sink/README.md b/core/connectors/sinks/http_sink/README.md index cccd5329cc..04bb9b6a61 100644 --- a/core/connectors/sinks/http_sink/README.md +++ b/core/connectors/sinks/http_sink/README.md @@ -786,7 +786,7 @@ cargo test -p integration --test connectors -- http_sink ## Delivery Semantics -All retry logic lives inside `consume()`. The connector runtime invokes `consume()` via an FFI callback that returns an `i32` status code. The runtime does not inspect this return value (see `process_messages()` in `runtime/src/sink.rs`), so errors logged by the sink are not propagated to the runtime's retry or alerting mechanisms. Additionally, consumer group offsets are committed before processing ([runtime issue #1](#known-limitations)). This means: +All retry logic lives inside `consume()`. The connector runtime invokes `consume()` via an FFI callback that returns an `i32` status code. A non-zero return value is treated as a processing error by `process_messages()` in `runtime/src/sink.rs`, stopping the sink task and surfacing the failure to the runtime. Additionally, consumer group offsets are committed before processing ([runtime issue #1](#known-limitations)). This means: - Failed messages are **not retried by the runtime** — only by the sink's internal retry loop - Messages are committed **before delivery** — a crash after commit but before delivery loses messages @@ -795,9 +795,9 @@ The effective delivery guarantee is **at-most-once** at the runtime level. The s ## Known Limitations -1. **Runtime ignores `consume()` status**: The connector runtime invokes `consume()` via an FFI callback returning `i32`. The `process_messages()` function in `runtime/src/sink.rs` does not inspect the return value. Errors are logged internally by the sink but do not trigger runtime-level retry or alerting. ([#2927](https://github.com/apache/iggy/issues/2927)) +1. **No runtime retry after `consume()` failure**: The connector runtime now treats a non-zero `consume()` FFI status as a processing error, but the failed batch is not retried by the runtime. ([#2927](https://github.com/apache/iggy/issues/2927)) -2. **Offsets committed before processing**: The `PollingMessages` auto-commit strategy commits consumer group offsets before `consume()` is called. Combined with limitation 1, at-least-once delivery is not achievable. ([#2928](https://github.com/apache/iggy/issues/2928)) +2. **Offsets committed before processing**: The `PollingMessages` auto-commit strategy commits consumer group offsets before `consume()` is called. Because a failed batch is already committed, at-least-once delivery is not achievable. ([#2928](https://github.com/apache/iggy/issues/2928)) 3. **`Retry-After` header not used for backoff**: The `reqwest-middleware` retry layer uses computed exponential backoff. `Retry-After` headers are logged as warnings but do not influence retry timing. diff --git a/core/connectors/sinks/http_sink/src/lib.rs b/core/connectors/sinks/http_sink/src/lib.rs index 6f86885b50..07094a60d4 100644 --- a/core/connectors/sinks/http_sink/src/lib.rs +++ b/core/connectors/sinks/http_sink/src/lib.rs @@ -1187,8 +1187,8 @@ impl Sink for HttpSink { /// /// **Runtime note**: The FFI boundary in `sdk/src/sink.rs` maps `consume()`'s `Result` to /// `i32` (0=ok, 1=err), but the runtime's `process_messages()` in `runtime/src/sink.rs` - /// discards that return code. All retry logic lives inside this method — returning `Err` - /// does not trigger a runtime-level retry. + /// treats a non-zero return code as a processing error. All retry logic lives inside this + /// method — returning `Err` stops the sink task but does not retry the already-polled batch. async fn consume( &self, topic_metadata: &TopicMetadata, @@ -1229,7 +1229,7 @@ impl Sink for HttpSink { if let Err(ref e) = result { error!( - "HTTP sink ID: {} — consume() returning error (runtime ignores FFI status code): {}", + "HTTP sink ID: {} — consume() returning error (runtime will receive non-zero FFI status): {}", self.id, e ); } diff --git a/core/integration/tests/connectors/http/http_sink.rs b/core/integration/tests/connectors/http/http_sink.rs index e41c59ab61..8dbc5d2f61 100644 --- a/core/integration/tests/connectors/http/http_sink.rs +++ b/core/integration/tests/connectors/http/http_sink.rs @@ -159,11 +159,11 @@ //! //! ## Known Limitations //! -//! 1. **FFI return value ignored**: The runtime's `process_messages()` discards `consume()`'s -//! `i32` return code. Errors are logged by the sink but invisible to the runtime. +//! 1. **No runtime retry after `consume()` failure**: The runtime treats a non-zero `consume()` +//! FFI status as a processing error, but it does not retry the already-polled batch. //! See [#2927](https://github.com/apache/iggy/issues/2927). //! 2. **Offsets committed before processing**: `PollingMessages` auto-commit strategy commits -//! offsets before `consume()`. Combined with (1), effective guarantee is at-most-once. +//! offsets before `consume()`, so effective guarantee is at-most-once. //! See [#2928](https://github.com/apache/iggy/issues/2928). //! //! ## Test History