diff --git a/src/service/flows.rs b/src/service/flows.rs index 2726956a0..e1ebce46b 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -61,7 +61,7 @@ pub struct GetKeysParam { #[derive(Serialize)] pub struct GetKeysResponse { key_type: schema::EnrichedValueType, - keys: Vec, + keys: Vec<(value::KeyValue, serde_json::Value)>, } pub async fn get_keys( @@ -110,7 +110,7 @@ pub async fn get_keys( }); let mut keys = Vec::new(); while let Some(rows) = rows_stream.next().await { - keys.extend(rows?.into_iter().map(|row| row.key)); + keys.extend(rows?.into_iter().map(|row| (row.key, row.key_aux_info))); } Ok(Json(GetKeysResponse { key_type: key_type.clone(), @@ -122,7 +122,7 @@ pub async fn get_keys( pub struct SourceRowKeyParams { field: String, key: Vec, - key_aux: Option, + key_aux: Option, } #[derive(Serialize)] @@ -165,7 +165,11 @@ impl<'a> SourceRowKeyContextHolder<'a> { .key_field() .ok_or_else(|| api_error!("field {} does not have a key", source_row_key.field))?; let key = value::KeyValue::from_strs(source_row_key.key, &key_field.value_type.typ)?; - let key_aux_info = source_row_key.key_aux.unwrap_or_default(); + let key_aux_info = source_row_key + .key_aux + .map(|s| serde_json::from_str(&s)) + .transpose()? + .unwrap_or_default(); Ok(Self { plan, import_op_idx,