Skip to content

Commit 0c2bbb9

Browse files
committed
fix: support ordinal correctly and avoid per-row column name search
1 parent 79c9e04 commit 0c2bbb9

File tree

1 file changed

+173
-41
lines changed

1 file changed

+173
-41
lines changed

src/ops/sources/postgres.rs

Lines changed: 173 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::ops::sdk::*;
33
use crate::ops::shared::postgres::{bind_key_field, get_db_pool, key_value_fields_iter};
44
use crate::settings::DatabaseConnectionSpec;
55
use sqlx::postgres::types::PgInterval;
6-
use sqlx::{Column, PgPool, Row};
6+
use sqlx::{PgPool, Row};
77

88
#[derive(Debug, Deserialize)]
99
pub struct Spec {
@@ -21,6 +21,8 @@ pub struct Spec {
2121
struct PostgresTableSchema {
2222
primary_key_columns: Vec<FieldSchema>,
2323
value_columns: Vec<FieldSchema>,
24+
ordinal_field_idx: Option<usize>,
25+
ordinal_field_schema: Option<FieldSchema>,
2426
}
2527

2628
struct Executor {
@@ -58,6 +60,7 @@ async fn fetch_table_schema(
5860
pool: &PgPool,
5961
table_name: &str,
6062
included_columns: &Option<Vec<String>>,
63+
ordinal_column: &Option<String>,
6164
) -> Result<PostgresTableSchema> {
6265
// Query to get column information including primary key status
6366
let query = r#"
@@ -89,6 +92,7 @@ async fn fetch_table_schema(
8992

9093
let mut primary_key_columns = Vec::new();
9194
let mut value_columns = Vec::new();
95+
let mut ordinal_field_schema: Option<FieldSchema> = None;
9296

9397
for row in rows {
9498
let col_name: String = row.try_get::<String, _>("column_name")?;
@@ -103,12 +107,23 @@ async fn fetch_table_schema(
103107
);
104108

105109
if is_primary_key {
106-
primary_key_columns.push(field_schema);
110+
primary_key_columns.push(field_schema.clone());
107111
} else if included_columns
108112
.as_ref()
109113
.map_or(true, |cols| cols.contains(&col_name))
110114
{
111-
value_columns.push(field_schema);
115+
value_columns.push(field_schema.clone());
116+
}
117+
118+
if let Some(ord_col) = ordinal_column {
119+
if &col_name == ord_col {
120+
ordinal_field_schema = Some(field_schema.clone());
121+
if is_primary_key {
122+
api_bail!(
123+
"`ordinal_column` cannot be a primary key column. It must be one of the value columns."
124+
);
125+
}
126+
}
112127
}
113128
}
114129

@@ -119,9 +134,28 @@ async fn fetch_table_schema(
119134
api_bail!("Table `{table_name}` has no primary key defined");
120135
}
121136

137+
// If ordinal column specified, validate and compute its index within value columns if present
138+
let ordinal_field_idx = match ordinal_column {
139+
Some(ord) => {
140+
let schema = ordinal_field_schema
141+
.as_ref()
142+
.ok_or_else(|| anyhow::anyhow!("`ordinal_column` `{}` not found in table", ord))?;
143+
if !is_supported_ordinal_type(&schema.value_type.typ) {
144+
api_bail!(
145+
"Unsupported `ordinal_column` type for `{}`. Supported types: Int64, LocalDateTime, OffsetDateTime",
146+
schema.name
147+
);
148+
}
149+
value_columns.iter().position(|c| c.name == *ord)
150+
}
151+
None => None,
152+
};
153+
122154
Ok(PostgresTableSchema {
123155
primary_key_columns,
124156
value_columns,
157+
ordinal_field_idx,
158+
ordinal_field_schema,
125159
})
126160
}
127161

@@ -176,20 +210,68 @@ fn convert_pg_value_to_value(
176210
Ok(value)
177211
}
178212

213+
/// Convert a CocoIndex `Value` into an `Ordinal` if supported.
214+
/// Supported inputs:
215+
/// - Basic(Int64): interpreted directly as microseconds
216+
/// - Basic(LocalDateTime): converted to UTC micros
217+
/// - Basic(OffsetDateTime): micros since epoch
218+
/// Otherwise returns unavailable.
219+
fn is_supported_ordinal_type(t: &ValueType) -> bool {
220+
matches!(
221+
t,
222+
ValueType::Basic(BasicValueType::Int64)
223+
| ValueType::Basic(BasicValueType::LocalDateTime)
224+
| ValueType::Basic(BasicValueType::OffsetDateTime)
225+
)
226+
}
227+
228+
fn value_to_ordinal(value: &Value) -> Ordinal {
229+
match value {
230+
Value::Null => Ordinal::unavailable(),
231+
Value::Basic(basic) => match basic {
232+
crate::base::value::BasicValue::Int64(v) => Ordinal(Some(*v)),
233+
crate::base::value::BasicValue::LocalDateTime(dt) => {
234+
Ordinal(Some(dt.and_utc().timestamp_micros()))
235+
}
236+
crate::base::value::BasicValue::OffsetDateTime(dt) => {
237+
Ordinal(Some(dt.timestamp_micros()))
238+
}
239+
_ => Ordinal::unavailable(),
240+
},
241+
_ => Ordinal::unavailable(),
242+
}
243+
}
244+
179245
#[async_trait]
180246
impl SourceExecutor for Executor {
181247
async fn list(
182248
&self,
183-
_options: &SourceExecutorListOptions,
249+
options: &SourceExecutorListOptions,
184250
) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRowMetadata>>>> {
185251
let stream = try_stream! {
186252
// Build query to select primary key columns
187-
let pk_columns: Vec<String> = self.table_schema.primary_key_columns
253+
let pk_columns: Vec<String> = self
254+
.table_schema
255+
.primary_key_columns
188256
.iter()
189257
.map(|col| format!("\"{}\"", col.name))
190258
.collect();
191259

192-
let mut query = format!("SELECT {} FROM \"{}\"", pk_columns.join(", "), self.table_name);
260+
let mut select_parts = pk_columns.clone();
261+
let mut ordinal_col_index: Option<usize> = None;
262+
if options.include_ordinal
263+
&& let Some(ref ordinal_col) = self.ordinal_column
264+
{
265+
// Only append ordinal column if present.
266+
select_parts.push(format!("\"{}\"", ordinal_col));
267+
ordinal_col_index = Some(select_parts.len() - 1);
268+
}
269+
270+
let mut query = format!(
271+
"SELECT {} FROM \"{}\"",
272+
select_parts.join(", "),
273+
self.table_name
274+
);
193275

194276
// Add ordering by ordinal column if specified
195277
if let Some(ref ordinal_col) = self.ordinal_column {
@@ -206,14 +288,31 @@ impl SourceExecutor for Executor {
206288
.map(|(i, pk_col)| convert_pg_value_to_value(&row, pk_col, i))
207289
.collect::<Result<Vec<_>>>()?;
208290
if parts.iter().any(|v| v.is_null()) {
209-
Err(anyhow::anyhow!("Composite primary key contains NULL component"))?;
291+
Err(anyhow::anyhow!(
292+
"Composite primary key contains NULL component"
293+
))?;
210294
}
211295
let key = KeyValue::from_values(parts.iter())?;
212296

297+
// Compute ordinal if requested
298+
let ordinal = if options.include_ordinal {
299+
if let (Some(col_idx), Some(ord_schema)) = (
300+
ordinal_col_index,
301+
self.table_schema.ordinal_field_schema.as_ref(),
302+
) {
303+
let val = convert_pg_value_to_value(&row, ord_schema, col_idx)?;
304+
Some(value_to_ordinal(&val))
305+
} else {
306+
Some(Ordinal::unavailable())
307+
}
308+
} else {
309+
None
310+
};
311+
213312
yield vec![PartialSourceRowMetadata {
214313
key,
215314
key_aux_info: serde_json::Value::Null,
216-
ordinal: Some(Ordinal::unavailable()),
315+
ordinal,
217316
content_version_fp: None,
218317
}];
219318
}
@@ -228,18 +327,33 @@ impl SourceExecutor for Executor {
228327
options: &SourceExecutorGetOptions,
229328
) -> Result<PartialSourceRowData> {
230329
let mut qb = sqlx::QueryBuilder::new("SELECT ");
231-
if self.table_schema.value_columns.is_empty() {
232-
qb.push("1");
233-
} else {
234-
qb.push(
330+
let mut selected_columns: Vec<String> = Vec::new();
331+
332+
if options.include_value {
333+
selected_columns.extend(
235334
self.table_schema
236335
.value_columns
237336
.iter()
238-
.map(|col| format!("\"{}\"", col.name))
239-
.collect::<Vec<String>>()
240-
.join(", "),
337+
.map(|col| format!("\"{}\"", col.name)),
241338
);
242339
}
340+
341+
let ordinal_col_name = self.ordinal_column.as_ref();
342+
if options.include_ordinal {
343+
if let Some(ord_col) = ordinal_col_name {
344+
// Append ordinal column if not already provided by included value columns,
345+
// or when value columns are not selected at all
346+
if self.table_schema.ordinal_field_idx.is_none() || !options.include_value {
347+
selected_columns.push(format!("\"{}\"", ord_col));
348+
}
349+
}
350+
}
351+
352+
if selected_columns.is_empty() {
353+
qb.push("1");
354+
} else {
355+
qb.push(selected_columns.join(", "));
356+
}
243357
qb.push(" FROM \"");
244358
qb.push(&self.table_name);
245359
qb.push("\" WHERE ");
@@ -271,30 +385,12 @@ impl SourceExecutor for Executor {
271385

272386
let row_opt = qb.build().fetch_optional(&self.db_pool).await?;
273387

274-
let ordinal = if options.include_ordinal {
275-
Some(Ordinal::unavailable())
276-
} else {
277-
None
278-
};
279-
280388
let value = if options.include_value {
281-
match row_opt {
389+
match &row_opt {
282390
Some(row) => {
283-
let mut fields = Vec::new();
284-
for value_col in &self.table_schema.value_columns {
285-
// Find the column by name in the result row to get the correct index
286-
let col_index = row
287-
.columns()
288-
.iter()
289-
.position(|col| col.name() == value_col.name.as_str())
290-
.ok_or_else(|| {
291-
anyhow::anyhow!(
292-
"Column '{}' not found in result row",
293-
value_col.name
294-
)
295-
})?;
296-
297-
let value = convert_pg_value_to_value(&row, value_col, col_index)?;
391+
let mut fields = Vec::with_capacity(self.table_schema.value_columns.len());
392+
for (i, value_col) in self.table_schema.value_columns.iter().enumerate() {
393+
let value = convert_pg_value_to_value(&row, value_col, i)?;
298394
fields.push(value);
299395
}
300396
Some(SourceValue::Existence(FieldValues { fields }))
@@ -305,6 +401,32 @@ impl SourceExecutor for Executor {
305401
None
306402
};
307403

404+
let ordinal = if options.include_ordinal {
405+
match (
406+
&row_opt,
407+
self.table_schema.ordinal_field_schema.as_ref(),
408+
ordinal_col_name,
409+
) {
410+
(Some(row), Some(ord_schema), Some(_ord_col_name)) => {
411+
// Determine index without scanning the row metadata.
412+
let col_index = if options.include_value {
413+
match self.table_schema.ordinal_field_idx {
414+
Some(idx) => idx,
415+
None => self.table_schema.value_columns.len(),
416+
}
417+
} else {
418+
// Only ordinal was selected
419+
0
420+
};
421+
let val = convert_pg_value_to_value(&row, ord_schema, col_index)?;
422+
Some(value_to_ordinal(&val))
423+
}
424+
_ => Some(Ordinal::unavailable()),
425+
}
426+
} else {
427+
None
428+
};
429+
308430
Ok(PartialSourceRowData {
309431
value,
310432
ordinal,
@@ -330,8 +452,13 @@ impl SourceFactoryBase for Factory {
330452
) -> Result<EnrichedValueType> {
331453
// Fetch table schema to build dynamic output schema
332454
let db_pool = get_db_pool(spec.database.as_ref(), &context.auth_registry).await?;
333-
let table_schema =
334-
fetch_table_schema(&db_pool, &spec.table_name, &spec.included_columns).await?;
455+
let table_schema = fetch_table_schema(
456+
&db_pool,
457+
&spec.table_name,
458+
&spec.included_columns,
459+
&spec.ordinal_column,
460+
)
461+
.await?;
335462

336463
let mut struct_schema = StructSchema::default();
337464
let mut schema_builder = StructSchemaBuilder::new(&mut struct_schema);
@@ -387,8 +514,13 @@ impl SourceFactoryBase for Factory {
387514
let db_pool = get_db_pool(spec.database.as_ref(), &context.auth_registry).await?;
388515

389516
// Fetch table schema for dynamic type handling
390-
let table_schema =
391-
fetch_table_schema(&db_pool, &spec.table_name, &spec.included_columns).await?;
517+
let table_schema = fetch_table_schema(
518+
&db_pool,
519+
&spec.table_name,
520+
&spec.included_columns,
521+
&spec.ordinal_column,
522+
)
523+
.await?;
392524

393525
let executor = Executor {
394526
db_pool,

0 commit comments

Comments
 (0)