Skip to content

Commit 3464ecf

Browse files
committed
fix: postgres source key schema
1 parent c180bc4 commit 3464ecf

File tree

1 file changed

+12
-48
lines changed

1 file changed

+12
-48
lines changed

src/ops/sources/postgres.rs

Lines changed: 12 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -512,58 +512,22 @@ impl SourceFactoryBase for Factory {
512512
)
513513
.await?;
514514

515-
// Build fields: first key, then value columns
516-
let mut fields: Vec<FieldSchema> = Vec::new();
517-
518-
if table_schema.primary_key_columns.len() == 1 {
519-
let pk_col = &table_schema.primary_key_columns[0];
520-
fields.push(FieldSchema::new(
521-
&pk_col.schema.name,
522-
pk_col.schema.value_type.clone(),
523-
));
524-
} else {
525-
// Composite primary key - put all PK columns into a nested struct `_key`
526-
let key_fields: Vec<FieldSchema> = table_schema
527-
.primary_key_columns
528-
.iter()
529-
.map(|pk_col| {
530-
FieldSchema::new(&pk_col.schema.name, pk_col.schema.value_type.clone())
531-
})
532-
.collect();
533-
let key_struct_schema = StructSchema {
534-
fields: Arc::new(key_fields),
535-
description: None,
536-
};
537-
fields.push(FieldSchema::new(
538-
"_key",
539-
make_output_type(key_struct_schema),
540-
));
541-
}
542-
543-
for value_col in &table_schema.value_columns {
544-
fields.push(FieldSchema::new(
545-
&value_col.schema.name,
546-
value_col.schema.value_type.clone(),
547-
));
548-
}
549-
550-
// Log schema information for debugging
551-
if table_schema.primary_key_columns.len() > 1 {
552-
info!(
553-
"Composite primary key detected: {} columns",
554-
table_schema.primary_key_columns.len()
555-
);
556-
}
557-
558-
let struct_schema = StructSchema {
559-
fields: Arc::new(fields),
560-
description: None,
561-
};
562515
Ok(make_output_type(TableSchema::new(
563516
TableKind::KTable {
564517
num_key_parts: table_schema.primary_key_columns.len(),
565518
},
566-
struct_schema,
519+
StructSchema {
520+
fields: Arc::new(
521+
(table_schema.primary_key_columns.into_iter().map(|pk_col| {
522+
FieldSchema::new(&pk_col.schema.name, pk_col.schema.value_type)
523+
}))
524+
.chain(table_schema.value_columns.into_iter().map(|value_col| {
525+
FieldSchema::new(&value_col.schema.name, value_col.schema.value_type)
526+
}))
527+
.collect(),
528+
),
529+
description: None,
530+
},
567531
)))
568532
}
569533

0 commit comments

Comments
 (0)