Skip to content

Commit 6ac8f07

Browse files
committed
simplify key decoding logic
1 parent 1d82fde commit 6ac8f07

File tree

1 file changed

+11
-26
lines changed

1 file changed

+11
-26
lines changed

src/ops/sources/postgres.rs

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -205,32 +205,17 @@ impl SourceExecutor for Executor {
205205

206206
let mut rows = sqlx::query(&query).fetch(&self.db_pool);
207207
while let Some(row) = rows.try_next().await? {
208-
// Handle both single and composite primary keys
209-
let key = if self.table_schema.primary_key_columns.len() == 1 {
210-
// Single primary key - extract directly
211-
let pk_col = &self.table_schema.primary_key_columns[0];
212-
let v = convert_pg_value_to_value(&row, pk_col, 0)?;
213-
if v.is_null() {
214-
Err(anyhow::anyhow!(
215-
"Primary key value is NULL for column `{}`",
216-
pk_col.name
217-
))?;
218-
}
219-
v.into_key()?
220-
} else {
221-
// Composite primary key - combine each part
222-
let parts = self
223-
.table_schema
224-
.primary_key_columns
225-
.iter()
226-
.enumerate()
227-
.map(|(i, pk_col)| convert_pg_value_to_value(&row, pk_col, i))
228-
.collect::<Result<Vec<_>>>()?;
229-
if parts.iter().any(|v| v.is_null()) {
230-
Err(anyhow::anyhow!("Composite primary key contains NULL component"))?;
231-
}
232-
KeyValue::from_values(parts.iter())?
233-
};
208+
let parts = self
209+
.table_schema
210+
.primary_key_columns
211+
.iter()
212+
.enumerate()
213+
.map(|(i, pk_col)| convert_pg_value_to_value(&row, pk_col, i))
214+
.collect::<Result<Vec<_>>>()?;
215+
if parts.iter().any(|v| v.is_null()) {
216+
Err(anyhow::anyhow!("Composite primary key contains NULL component"))?;
217+
}
218+
let key = KeyValue::from_values(parts.iter())?;
234219

235220
yield vec![PartialSourceRowMetadata {
236221
key,

0 commit comments

Comments
 (0)