Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/connectors/sdk/src/transforms/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use super::ComputedValue;
pub mod add_fields;
pub mod delete_fields;
pub mod filter_fields;
pub mod unwrap_envelope;
pub mod update_fields;

/// Computes a JSON value based on the specified computed value type
Expand Down
213 changes: 213 additions & 0 deletions core/connectors/sdk/src/transforms/json/unwrap_envelope.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

use crate::{
DecodedMessage, Error, Payload, TopicMetadata, transforms::unwrap_envelope::UnwrapEnvelope,
};
use simd_json::OwnedValue;
use tracing::warn;

impl UnwrapEnvelope {
pub(crate) fn transform_json(
&self,
_metadata: &TopicMetadata,
mut message: DecodedMessage,
) -> Result<Option<DecodedMessage>, Error> {
let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload else {
return Ok(Some(message));
};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-Object JSON (Array, scalar, null) silently passes through with no metric and no log. this is an operational blindspot - if a misconfigured source starts emitting arrays where the sink expects an unwrapped object, there's nothing in the logs that says the transform was a no-op.

at minimum a debug! with the actual payload variant, or a counter, so operators can see when the transform isn't doing anything.


let Some(inner) = map.remove(self.field.as_str()) else {
warn!(
"unwrap_envelope: field '{}' not found in payload, passing through unchanged",
self.field
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a per-message warn! on a hot path. with the default batch_length = 1000, a single misconfigured connector floods stdout with 1000 identical warnings per poll cycle, masking real errors.

fix options: downgrade to debug!, rate-limit (warn-once per (stream, topic, config)), or count occurrences and emit a single summary per batch.

return Ok(Some(message));
};

message.payload = Payload::Json(inner);
Ok(Some(message))
}
}

#[cfg(test)]
mod tests {
use crate::transforms::Transform;
use crate::transforms::json::test_utils::{
create_raw_test_message, create_test_message, create_test_topic_metadata,
extract_json_object,
};
use crate::transforms::unwrap_envelope::{UnwrapEnvelope, UnwrapEnvelopeConfig};
use crate::{DecodedMessage, Payload};
use simd_json::OwnedValue;

#[test]
fn should_extract_data_field_from_database_record_envelope() {
let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig {
field: "data".to_string(),
});
let msg = create_test_message(
r#"{
"table_name": "tpch.lineitem",
"operation_type": "SELECT",
"timestamp": "2026-04-24T19:57:00Z",
"data": {"id": 1, "l_orderkey": 123, "l_partkey": 456},
"old_data": null
}"#,
);
let result = transform
.transform_json(&create_test_topic_metadata(), msg)
.unwrap()
.unwrap();
let json_obj = extract_json_object(&result).unwrap();
assert_eq!(json_obj.len(), 3);
assert!(json_obj.contains_key("id"));
assert!(json_obj.contains_key("l_orderkey"));
assert!(json_obj.contains_key("l_partkey"));
assert!(!json_obj.contains_key("table_name"));
assert!(!json_obj.contains_key("operation_type"));
}

#[test]
fn should_handle_missing_field_gracefully() {
let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig {
field: "data".to_string(),
});
let msg = create_test_message(r#"{"table_name": "users", "operation_type": "INSERT"}"#);
let result = transform
.transform_json(&create_test_topic_metadata(), msg)
.unwrap()
.unwrap();
let json_obj = extract_json_object(&result).unwrap();
assert_eq!(json_obj.len(), 2);
assert!(json_obj.contains_key("table_name"));
}

#[test]
fn should_handle_scalar_inner_value() {
let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig {
field: "payload".to_string(),
});
let msg = create_test_message(r#"{"payload": "just a string", "meta": 1}"#);
let result = transform
.transform_json(&create_test_topic_metadata(), msg)
.unwrap()
.unwrap();
match &result.payload {
Payload::Json(OwnedValue::String(s)) => assert_eq!(s.as_str(), "just a string"),
other => panic!("Expected Json(String), got {other:?}"),
}
}

#[test]
fn should_handle_null_inner_value() {
let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig {
field: "old_data".to_string(),
});
let msg = create_test_message(r#"{"data": {"id": 1}, "old_data": null}"#);
let result = transform
.transform_json(&create_test_topic_metadata(), msg)
.unwrap()
.unwrap();
match &result.payload {
Payload::Json(OwnedValue::Static(simd_json::StaticNode::Null)) => {}
other => panic!("Expected Json(Null), got {other:?}"),
}
}

#[test]
fn should_pass_through_non_json_payload() {
let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig {
field: "data".to_string(),
});
let msg = create_raw_test_message(vec![1, 2, 3, 4]);
let result = transform
.transform(&create_test_topic_metadata(), msg)
.unwrap()
.unwrap();
if let Payload::Raw(bytes) = &result.payload {
assert_eq!(*bytes, vec![1u8, 2, 3, 4]);
} else {
panic!("Expected Raw payload");
}
}

#[test]
fn should_pass_through_non_object_json() {
let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig {
field: "data".to_string(),
});
let msg = DecodedMessage {
id: None,
offset: None,
checksum: None,
timestamp: None,
origin_timestamp: None,
headers: None,
payload: Payload::Json(OwnedValue::Array(Box::new(vec![
OwnedValue::Static(simd_json::StaticNode::I64(1)),
OwnedValue::Static(simd_json::StaticNode::I64(2)),
]))),
};
let result = transform
.transform(&create_test_topic_metadata(), msg)
.unwrap()
.unwrap();
match &result.payload {
Payload::Json(OwnedValue::Array(arr)) => assert_eq!(arr.len(), 2),
other => panic!("Expected Json(Array), got {other:?}"),
}
}

#[test]
fn should_unwrap_nested_array_field() {
let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig {
field: "items".to_string(),
});
let msg = create_test_message(r#"{"items": [1, 2, 3], "count": 3}"#);
let result = transform
.transform_json(&create_test_topic_metadata(), msg)
.unwrap()
.unwrap();
match &result.payload {
Payload::Json(OwnedValue::Array(arr)) => assert_eq!(arr.len(), 3),
other => panic!("Expected Json(Array), got {other:?}"),
}
}

#[test]
fn should_work_with_deeply_nested_data() {
let transform = UnwrapEnvelope::new(UnwrapEnvelopeConfig {
field: "data".to_string(),
});
let msg = create_test_message(
r#"{
"data": {"user": {"name": "John", "age": 30}, "active": true},
"meta": "ignored"
}"#,
);
let result = transform
.transform_json(&create_test_topic_metadata(), msg)
.unwrap()
.unwrap();
let json_obj = extract_json_object(&result).unwrap();
assert_eq!(json_obj.len(), 2);
assert!(json_obj.contains_key("user"));
assert!(json_obj.contains_key("active"));
}
}
8 changes: 8 additions & 0 deletions core/connectors/sdk/src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod filter_fields;
pub mod flatbuffer_convert;
pub mod json;
pub mod proto_convert;
mod unwrap_envelope;
mod update_fields;
use crate::{DecodedMessage, Error, TopicMetadata};
pub use add_fields::{AddFields, AddFieldsConfig, Field as AddField};
Expand All @@ -38,6 +39,7 @@ use serde::{Deserialize, Serialize};
use simd_json::OwnedValue;
use std::sync::Arc;
use strum_macros::{Display, IntoStaticStr};
pub use unwrap_envelope::{UnwrapEnvelope, UnwrapEnvelopeConfig};
pub use update_fields::{Field as UpdateField, UpdateCondition, UpdateFields, UpdateFieldsConfig};

/// The value of a field, either static or computed at runtime
Expand Down Expand Up @@ -89,6 +91,7 @@ pub enum TransformType {
ProtoConvert,
FlatBufferConvert,
AvroConvert,
UnwrapEnvelope,
}

pub fn from_config(
Expand Down Expand Up @@ -131,5 +134,10 @@ pub fn from_config(
serde_json::from_value(raw.clone()).map_err(|_| Error::InvalidConfig)?;
Ok(Arc::new(AvroConvert::new(cfg)))
}
TransformType::UnwrapEnvelope => {
let cfg: UnwrapEnvelopeConfig =
serde_json::from_value(raw.clone()).map_err(|_| Error::InvalidConfig)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map_err(|_| Error::InvalidConfig) throws away the serde error detail. for a user with a typo in field or a wrong type, all they see is InvalidConfig - no line number, no field name, no expected-vs-got. this is a pre-existing pattern in every arm of from_config, but this PR adds another instance.

worth either fixing all arms in a follow-up, or at least propagating the serde error through Error::InvalidConfigDetail(String) or similar.

Ok(Arc::new(UnwrapEnvelope::new(cfg)))
}
}
}
62 changes: 62 additions & 0 deletions core/connectors/sdk/src/transforms/unwrap_envelope.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

use super::{Transform, TransformType};
use crate::{DecodedMessage, Error, Payload, TopicMetadata};
use serde::{Deserialize, Serialize};

/// Configuration for the UnwrapEnvelope transform.
///
/// Extracts a nested JSON field from an envelope object and promotes it
/// to the top-level payload. For example, given a Postgres source
/// `DatabaseRecord` with shape `{ table_name, operation_type, timestamp,
/// data: { ... }, old_data }`, setting `field = "data"` replaces the
/// entire payload with the contents of `data`.
#[derive(Debug, Serialize, Deserialize)]
pub struct UnwrapEnvelopeConfig {
pub field: String,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnwrapEnvelopeConfig.field: String has no validation. an empty string deserializes fine through from_config at transforms/mod.rs:137-141, then every message hits the missing-field branch and warn-spams at message rate (compounds the log flood on the json side).

reject empty field either via a custom Deserialize impl, or inline in UnwrapEnvelope::new, or in the from_config arm.

}

/// Transform that extracts a nested field from a JSON envelope and
/// promotes it as the top-level payload.
pub struct UnwrapEnvelope {
pub field: String,
}

impl UnwrapEnvelope {
pub fn new(cfg: UnwrapEnvelopeConfig) -> Self {
Self { field: cfg.field }
}
}

impl Transform for UnwrapEnvelope {
fn r#type(&self) -> TransformType {
TransformType::UnwrapEnvelope
}

fn transform(
&self,
metadata: &TopicMetadata,
message: DecodedMessage,
) -> Result<Option<DecodedMessage>, Error> {
match &message.payload {
Payload::Json(_) => self.transform_json(metadata, message),
_ => Ok(Some(message)),
}
}
}
20 changes: 20 additions & 0 deletions core/connectors/sinks/iceberg_sink/src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ async fn write_data(
})
.collect();

if let Some(first) = msgs.first()
&& looks_like_envelope(first)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the sniff only looks at msgs.first(). mixed batch where the first message is flat and the rest are envelopes (or vice versa) skips detection entirely - envelope rows then hit JsonArrowReader and write nulls, which is the silent-corruption mode #3174 was opened for. envelope-first drops the whole batch including any valid flat rows.

either scan all messages, or document and enforce a batch-homogeneity invariant somewhere upstream.

{
error!(
"Incoming JSON appears to be wrapped in a source envelope \
(detected 'table_name' + 'data' fields). The Iceberg sink \
expects flat JSON matching the target table schema. Add an \
'unwrap_envelope' transform with field = \"data\" to your \
connector config to extract the inner payload."
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two issues with this error message:

  1. it hardcodes field = "data", which only matches the postgres envelope shape. if/when detection broadens to Debezium (after) or other shapes, the suggestion is wrong.

  2. no connector id / plugin id in the log line, so in a multi-tenant deployment with several iceberg sinks you cannot tell which one rejected the batch.

);
return Err(Error::InvalidRecord);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error::InvalidRecord is a unit variant; the actionable hint ("add an unwrap_envelope transform with field = data") lives only in the error! log and is lost the moment the caller receives the error. Error::InvalidRecordValue(String) already exists (sdk/src/lib.rs:389) and is used in delta_sink and influxdb_source - switch to it and carry the hint in the variant.

this is also a concrete instance of #3176 (overloaded InvalidRecord).

}

let cursor = JsonArrowReader::new(msgs.as_slice());
let reader = ReaderBuilder::new(Arc::new(
schema_to_arrow_schema(&table.metadata().current_schema().clone()).map_err(|err| {
Expand Down Expand Up @@ -230,6 +243,13 @@ async fn write_data(
Ok(())
}

fn looks_like_envelope(value: &simd_json::OwnedValue) -> bool {
let simd_json::OwnedValue::Object(obj) = value else {
return false;
};
obj.contains_key("table_name") && obj.contains_key("data")
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this heuristic checks 2 of the 5 keys on DatabaseRecord (table_name, operation_type, timestamp, data, old_data - see core/connectors/sources/postgres_source/src/lib.rs:110-116).

false positives: any legit iceberg table modeling audit logs, catalogs, or CDC metadata that happens to have table_name + data columns gets the whole batch rejected.

false negatives: Debezium / Kafka-Connect envelopes use before / after / op / source / payload and slip right through into JsonArrowReader, which then writes nulls - the exact bug #3174 is supposed to fix.

also it bakes a postgres-source shape into a generic iceberg sink. options: tighten to the full 4-or-5 key shape, or move detection behind an opt-in detect_envelope config flag, or drop sink-side sniffing entirely and rely on unwrap_envelope + docs.


#[async_trait]
pub trait Router: std::fmt::Debug + Sync + Send {
async fn route_data(
Expand Down
Loading