Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 54 additions & 4 deletions core/connectors/sinks/iceberg_sink/src/router/dynamic_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use iceberg::Catalog;
use iceberg::table::Table;
use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload};
use simd_json::base::ValueAsObject;
use simd_json::prelude::ValueAsScalar;
use std::collections::HashMap;
use tracing::{info, warn};

Expand Down Expand Up @@ -86,10 +87,7 @@ impl DynamicRouter {

fn extract_route_field(&self, message: &ConsumedMessage) -> Option<String> {
match &message.payload {
Payload::Json(payload) => payload
.as_object()
.and_then(|obj| obj.get(&self.route_field))
.map(|val| val.to_string()),
Payload::Json(payload) => extract_route_field_value(payload, &self.route_field),
_ => {
warn!("Unsupported format for iceberg connector");
None
Expand All @@ -98,6 +96,18 @@ impl DynamicRouter {
}
}

fn extract_route_field_value(payload: &simd_json::OwnedValue, route_field: &str) -> Option<String> {
// Route fields are table identifiers, so only JSON string values are valid here.
// Calling `to_string()` on a JSON value serializes it, turning "tpch.lineitem"
// into a string containing literal quote characters. `as_str()` returns the
// underlying string instead, which can be validated and used as the lookup key.
payload
.as_object()
.and_then(|obj| obj.get(route_field))
.and_then(|val| val.as_str())
.map(ToString::to_string)
}

#[async_trait]
impl Router for DynamicRouter {
async fn route_data(
Expand Down Expand Up @@ -167,3 +177,43 @@ impl Router for DynamicRouter {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

fn extract_route_field_value(
payload: &simd_json::OwnedValue,
route_field: &str,
) -> Option<String> {
// Route values must be JSON strings. Using `as_str()` returns the
// underlying value (for example, `tpch.lineitem`) while `to_string()`
// would serialize the JSON value and include quote characters.
payload
.as_object()
.and_then(|obj| obj.get(route_field))
.and_then(|val| val.as_str())
.map(ToString::to_string)
}

#[test]
fn extracts_string_route_field_without_json_quotes() {
let payload = simd_json::json!({
"table": "tpch.lineitem"
});

assert_eq!(
extract_route_field_value(&payload, "table"),
Some("tpch.lineitem".to_string())
);
}

#[test]
fn ignores_non_string_route_field() {
let payload = simd_json::json!({
"table": 42
});

assert_eq!(extract_route_field_value(&payload, "table"), None);
}
}
Loading