Skip to content

Commit 0f3486c

Browse files
committed
feat: postgres source directly return values in list()
1 parent a77abab commit 0f3486c

File tree

1 file changed

+93
-102
lines changed

1 file changed

+93
-102
lines changed

src/ops/sources/postgres.rs

Lines changed: 93 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,80 @@ struct Executor {
3939
table_schema: PostgresTableSchema,
4040
}
4141

42+
impl Executor {
43+
/// Append value and ordinal columns to the provided columns vector.
44+
/// Returns the optional index of the ordinal column in the final selection.
45+
fn build_selected_columns(
46+
&self,
47+
columns: &mut Vec<String>,
48+
options: &SourceExecutorReadOptions,
49+
) -> Option<usize> {
50+
let base_len = columns.len();
51+
if options.include_value {
52+
columns.extend(
53+
self.table_schema
54+
.value_columns
55+
.iter()
56+
.map(|col| format!("\"{}\"", col.schema.name)),
57+
);
58+
}
59+
60+
if options.include_ordinal {
61+
if let Some(ord_schema) = &self.table_schema.ordinal_field_schema {
62+
if options.include_value {
63+
if let Some(val_idx) = self.table_schema.ordinal_field_idx {
64+
return Some(base_len + val_idx);
65+
}
66+
}
67+
columns.push(format!("\"{}\"", ord_schema.schema.name));
68+
return Some(columns.len() - 1);
69+
}
70+
}
71+
72+
None
73+
}
74+
75+
/// Decode all value columns from a row, starting at the given index offset.
76+
fn decode_row_data(
77+
&self,
78+
row: &sqlx::postgres::PgRow,
79+
options: &SourceExecutorReadOptions,
80+
ordinal_col_index: Option<usize>,
81+
value_start_idx: usize,
82+
) -> Result<PartialSourceRowData> {
83+
let value = if options.include_value {
84+
let mut fields = Vec::with_capacity(self.table_schema.value_columns.len());
85+
for (i, info) in self.table_schema.value_columns.iter().enumerate() {
86+
let value = (info.decoder)(row, value_start_idx + i)?;
87+
fields.push(value);
88+
}
89+
Some(SourceValue::Existence(FieldValues { fields }))
90+
} else {
91+
None
92+
};
93+
94+
let ordinal = if options.include_ordinal {
95+
if let (Some(idx), Some(ord_schema)) = (
96+
ordinal_col_index,
97+
self.table_schema.ordinal_field_schema.as_ref(),
98+
) {
99+
let val = (ord_schema.decoder)(row, idx)?;
100+
Some(value_to_ordinal(&val))
101+
} else {
102+
Some(Ordinal::unavailable())
103+
}
104+
} else {
105+
None
106+
};
107+
108+
Ok(PartialSourceRowData {
109+
value,
110+
ordinal,
111+
content_version_fp: None,
112+
})
113+
}
114+
}
115+
42116
/// Map PostgreSQL data types to CocoIndex BasicValueType and a decoder function
43117
fn map_postgres_type_to_cocoindex_and_decoder(pg_type: &str) -> (BasicValueType, PgValueDecoder) {
44118
match pg_type {
@@ -309,29 +383,18 @@ impl SourceExecutor for Executor {
309383
options: &SourceExecutorReadOptions,
310384
) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRow>>>> {
311385
let stream = try_stream! {
312-
// Build query to select primary key columns
386+
// Build selection including PKs (for keys), and optionally values and ordinal
313387
let pk_columns: Vec<String> = self
314388
.table_schema
315389
.primary_key_columns
316390
.iter()
317391
.map(|col| format!("\"{}\"", col.schema.name))
318392
.collect();
393+
let pk_count = pk_columns.len();
394+
let mut select_parts = pk_columns;
395+
let ordinal_col_index = self.build_selected_columns(&mut select_parts, options);
319396

320-
let mut select_parts = pk_columns.clone();
321-
let mut ordinal_col_index: Option<usize> = None;
322-
if options.include_ordinal
323-
&& let Some(ord_schema) = &self.table_schema.ordinal_field_schema
324-
{
325-
// Only append ordinal column if present.
326-
select_parts.push(format!("\"{}\"", ord_schema.schema.name));
327-
ordinal_col_index = Some(select_parts.len() - 1);
328-
}
329-
330-
let mut query = format!(
331-
"SELECT {} FROM \"{}\"",
332-
select_parts.join(", "),
333-
self.table_name
334-
);
397+
let mut query = format!("SELECT {} FROM \"{}\"", select_parts.join(", "), self.table_name);
335398

336399
// Add ordering by ordinal column if specified
337400
if let Some(ord_schema) = &self.table_schema.ordinal_field_schema {
@@ -340,41 +403,21 @@ impl SourceExecutor for Executor {
340403

341404
let mut rows = sqlx::query(&query).fetch(&self.db_pool);
342405
while let Some(row) = rows.try_next().await? {
343-
let parts = self
344-
.table_schema
345-
.primary_key_columns
406+
// Decode key from PKs (selected first)
407+
let parts = self.table_schema.primary_key_columns
346408
.iter()
347409
.enumerate()
348410
.map(|(i, info)| (info.decoder)(&row, i)?.into_key())
349411
.collect::<Result<Box<[KeyValue]>>>()?;
350412
let key = FullKeyValue(parts);
351413

352-
// Compute ordinal if requested
353-
let ordinal = if options.include_ordinal {
354-
if let (Some(col_idx), Some(_ord_schema)) = (
355-
ordinal_col_index,
356-
self.table_schema.ordinal_field_schema.as_ref(),
357-
) {
358-
let val = match self.table_schema.ordinal_field_idx {
359-
Some(idx) => (self.table_schema.value_columns[idx].decoder)(&row, col_idx)?,
360-
None => (self.table_schema.ordinal_field_schema.as_ref().unwrap().decoder)(&row, col_idx)?,
361-
};
362-
Some(value_to_ordinal(&val))
363-
} else {
364-
Some(Ordinal::unavailable())
365-
}
366-
} else {
367-
None
368-
};
414+
// Decode value and ordinal
415+
let data = self.decode_row_data(&row, options, ordinal_col_index, pk_count)?;
369416

370417
yield vec![PartialSourceRow {
371418
key,
372419
key_aux_info: serde_json::Value::Null,
373-
data: PartialSourceRowData {
374-
ordinal,
375-
content_version_fp: None,
376-
value: None,
377-
},
420+
data,
378421
}];
379422
}
380423
};
@@ -389,25 +432,7 @@ impl SourceExecutor for Executor {
389432
) -> Result<PartialSourceRowData> {
390433
let mut qb = sqlx::QueryBuilder::new("SELECT ");
391434
let mut selected_columns: Vec<String> = Vec::new();
392-
393-
if options.include_value {
394-
selected_columns.extend(
395-
self.table_schema
396-
.value_columns
397-
.iter()
398-
.map(|col| format!("\"{}\"", col.schema.name)),
399-
);
400-
}
401-
402-
if options.include_ordinal {
403-
if let Some(ord_schema) = &self.table_schema.ordinal_field_schema {
404-
// Append ordinal column if not already provided by included value columns,
405-
// or when value columns are not selected at all
406-
if self.table_schema.ordinal_field_idx.is_none() || !options.include_value {
407-
selected_columns.push(format!("\"{}\"", ord_schema.schema.name));
408-
}
409-
}
410-
}
435+
let ordinal_col_index = self.build_selected_columns(&mut selected_columns, options);
411436

412437
if selected_columns.is_empty() {
413438
qb.push("1");
@@ -443,50 +468,16 @@ impl SourceExecutor for Executor {
443468
}
444469

445470
let row_opt = qb.build().fetch_optional(&self.db_pool).await?;
446-
447-
let value = if options.include_value {
448-
match &row_opt {
449-
Some(row) => {
450-
let mut fields = Vec::with_capacity(self.table_schema.value_columns.len());
451-
for (i, info) in self.table_schema.value_columns.iter().enumerate() {
452-
let value = (info.decoder)(&row, i)?;
453-
fields.push(value);
454-
}
455-
Some(SourceValue::Existence(FieldValues { fields }))
456-
}
457-
None => Some(SourceValue::NonExistence),
458-
}
459-
} else {
460-
None
461-
};
462-
463-
let ordinal = if options.include_ordinal {
464-
match (&row_opt, &self.table_schema.ordinal_field_schema) {
465-
(Some(row), Some(ord_schema)) => {
466-
// Determine index without scanning the row metadata.
467-
let col_index = if options.include_value {
468-
match self.table_schema.ordinal_field_idx {
469-
Some(idx) => idx,
470-
None => self.table_schema.value_columns.len(),
471-
}
472-
} else {
473-
// Only ordinal was selected
474-
0
475-
};
476-
let val = (ord_schema.decoder)(&row, col_index)?;
477-
Some(value_to_ordinal(&val))
478-
}
479-
_ => Some(Ordinal::unavailable()),
480-
}
481-
} else {
482-
None
471+
let data = match &row_opt {
472+
Some(row) => self.decode_row_data(&row, options, ordinal_col_index, 0)?,
473+
None => PartialSourceRowData {
474+
value: Some(SourceValue::NonExistence),
475+
ordinal: Some(Ordinal::unavailable()),
476+
content_version_fp: None,
477+
},
483478
};
484479

485-
Ok(PartialSourceRowData {
486-
value,
487-
ordinal,
488-
content_version_fp: None,
489-
})
480+
Ok(data)
490481
}
491482
}
492483

0 commit comments

Comments
 (0)