Skip to content

Commit 1d82fde

Browse files
committed
simplify conversion from pg->coco values, no detour
1 parent 1012af2 commit 1d82fde

File tree

1 file changed

+50
-112
lines changed

1 file changed

+50
-112
lines changed

src/ops/sources/postgres.rs

Lines changed: 50 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::ops::sdk::*;
33
use crate::fields_value;
44
use crate::ops::shared::postgres::get_db_pool;
55
use crate::settings::DatabaseConnectionSpec;
6+
use sqlx::postgres::types::PgInterval;
67
use sqlx::{Column, PgPool, Row};
78

89
#[derive(Debug, Deserialize)]
@@ -135,86 +136,51 @@ async fn fetch_table_schema(
135136
})
136137
}
137138

138-
/// Convert a PostgreSQL row value to a serde_json::Value based on the column type
139-
fn convert_pg_value_to_json(
139+
/// Convert a PostgreSQL row value directly into Value (Basic or Null)
140+
fn convert_pg_value_to_value(
140141
row: &sqlx::postgres::PgRow,
141142
column: &ColumnInfo,
142143
col_index: usize,
143-
) -> Result<serde_json::Value> {
144-
// Check for null values in a type-agnostic way based on the actual column type
145-
let is_null = match &column.value_type {
146-
BasicValueType::Uuid => row.try_get::<Option<uuid::Uuid>, _>(col_index)?.is_none(),
147-
BasicValueType::Int64 => row.try_get::<Option<i64>, _>(col_index)?.is_none(),
148-
BasicValueType::Bool => row.try_get::<Option<bool>, _>(col_index)?.is_none(),
149-
BasicValueType::Float32 => row.try_get::<Option<f32>, _>(col_index)?.is_none(),
150-
BasicValueType::Float64 => row.try_get::<Option<f64>, _>(col_index)?.is_none(),
151-
_ => row.try_get::<Option<String>, _>(col_index)?.is_none(), // Default to string check
152-
};
153-
154-
if is_null {
155-
return Ok(serde_json::Value::Null);
156-
}
157-
158-
match &column.value_type {
159-
BasicValueType::Bytes => {
160-
let bytes: Vec<u8> = row.try_get(col_index)?;
161-
Ok(serde_json::to_value(bytes)?)
162-
}
163-
BasicValueType::Str => {
164-
let s: String = row.try_get(col_index)?;
165-
Ok(serde_json::Value::String(s))
166-
}
167-
BasicValueType::Bool => {
168-
let b: bool = row.try_get(col_index)?;
169-
Ok(serde_json::Value::Bool(b))
170-
}
171-
BasicValueType::Int64 => {
172-
let i: i64 = row.try_get(col_index)?;
173-
Ok(serde_json::Value::Number(i.into()))
174-
}
175-
BasicValueType::Float32 => {
176-
let f: f32 = row.try_get(col_index)?;
177-
Ok(serde_json::to_value(f)?)
178-
}
179-
BasicValueType::Float64 => {
180-
let f: f64 = row.try_get(col_index)?;
181-
Ok(serde_json::to_value(f)?)
182-
}
183-
BasicValueType::Uuid => {
184-
let uuid: uuid::Uuid = row.try_get(col_index)?;
185-
Ok(serde_json::Value::String(uuid.to_string()))
144+
) -> Result<Value> {
145+
let value = match &column.value_type {
146+
BasicValueType::Bytes => Value::from(row.try_get::<Option<Vec<u8>>, _>(col_index)?),
147+
BasicValueType::Str => Value::from(row.try_get::<Option<String>, _>(col_index)?),
148+
BasicValueType::Bool => Value::from(row.try_get::<Option<bool>, _>(col_index)?),
149+
BasicValueType::Int64 => Value::from(row.try_get::<Option<i64>, _>(col_index)?),
150+
BasicValueType::Float32 => Value::from(row.try_get::<Option<f32>, _>(col_index)?),
151+
BasicValueType::Float64 => Value::from(row.try_get::<Option<f64>, _>(col_index)?),
152+
BasicValueType::Range => {
153+
Value::from(row.try_get::<Option<serde_json::Value>, _>(col_index)?)
186154
}
155+
BasicValueType::Uuid => Value::from(row.try_get::<Option<uuid::Uuid>, _>(col_index)?),
187156
BasicValueType::Date => {
188-
let date: chrono::NaiveDate = row.try_get(col_index)?;
189-
Ok(serde_json::Value::String(date.to_string()))
157+
Value::from(row.try_get::<Option<chrono::NaiveDate>, _>(col_index)?)
190158
}
191159
BasicValueType::Time => {
192-
let time: chrono::NaiveTime = row.try_get(col_index)?;
193-
Ok(serde_json::Value::String(time.to_string()))
160+
Value::from(row.try_get::<Option<chrono::NaiveTime>, _>(col_index)?)
194161
}
195162
BasicValueType::LocalDateTime => {
196-
let dt: chrono::NaiveDateTime = row.try_get(col_index)?;
197-
Ok(serde_json::Value::String(dt.to_string()))
163+
Value::from(row.try_get::<Option<chrono::NaiveDateTime>, _>(col_index)?)
198164
}
199165
BasicValueType::OffsetDateTime => {
200-
let dt: chrono::DateTime<chrono::Utc> = row.try_get(col_index)?;
201-
Ok(serde_json::Value::String(dt.to_rfc3339()))
166+
Value::from(row.try_get::<Option<chrono::DateTime<chrono::FixedOffset>>, _>(col_index)?)
202167
}
203168
BasicValueType::TimeDelta => {
204-
// PostgreSQL interval to string representation
205-
let interval: String = row.try_get(col_index)?;
206-
Ok(serde_json::Value::String(interval))
169+
let opt_iv = row.try_get::<Option<PgInterval>, _>(col_index)?;
170+
let opt_dur = opt_iv.map(|iv| {
171+
let approx_days = iv.days as i64 + (iv.months as i64) * 30;
172+
chrono::Duration::microseconds(iv.microseconds)
173+
+ chrono::Duration::days(approx_days)
174+
});
175+
Value::from(opt_dur)
207176
}
208177
BasicValueType::Json => {
209-
let json: serde_json::Value = row.try_get(col_index)?;
210-
Ok(json)
178+
Value::from(row.try_get::<Option<serde_json::Value>, _>(col_index)?)
211179
}
212-
_ => {
213-
// Fallback: convert to string
214-
let s: String = row.try_get(col_index)?;
215-
Ok(serde_json::Value::String(s))
216-
}
217-
}
180+
// Fallback: treat as JSON
181+
_ => Value::from(row.try_get::<Option<serde_json::Value>, _>(col_index)?),
182+
};
183+
Ok(value)
218184
}
219185

220186
#[async_trait]
@@ -243,47 +209,27 @@ impl SourceExecutor for Executor {
243209
let key = if self.table_schema.primary_key_columns.len() == 1 {
244210
// Single primary key - extract directly
245211
let pk_col = &self.table_schema.primary_key_columns[0];
246-
let json_value = convert_pg_value_to_json(&row, pk_col, 0)?;
247-
248-
match &pk_col.value_type {
249-
BasicValueType::Str => KeyValue::Str(json_value.as_str().unwrap_or("").to_string().into()),
250-
BasicValueType::Int64 => KeyValue::Int64(json_value.as_i64().unwrap_or(0)),
251-
BasicValueType::Uuid => {
252-
let uuid_str = json_value.as_str().unwrap_or("");
253-
KeyValue::Uuid(uuid::Uuid::parse_str(uuid_str).unwrap_or_default())
254-
},
255-
_ => {
256-
// For other types, convert to string representation
257-
KeyValue::Str(json_value.to_string().into())
258-
}
212+
let v = convert_pg_value_to_value(&row, pk_col, 0)?;
213+
if v.is_null() {
214+
Err(anyhow::anyhow!(
215+
"Primary key value is NULL for column `{}`",
216+
pk_col.name
217+
))?;
259218
}
219+
v.into_key()?
260220
} else {
261-
// Composite primary key - create a struct with individual KeyValue fields
262-
let mut key_values = Vec::new();
263-
for (i, pk_col) in self.table_schema.primary_key_columns.iter().enumerate() {
264-
let json_value = convert_pg_value_to_json(&row, pk_col, i)?;
265-
266-
// Convert each primary key column to appropriate KeyValue type
267-
let key_value = match &pk_col.value_type {
268-
BasicValueType::Str => KeyValue::Str(json_value.as_str().unwrap_or("").to_string().into()),
269-
BasicValueType::Int64 => KeyValue::Int64(json_value.as_i64().unwrap_or(0)),
270-
BasicValueType::Uuid => {
271-
let uuid_str = json_value.as_str().unwrap_or("");
272-
KeyValue::Uuid(uuid::Uuid::parse_str(uuid_str).unwrap_or_default())
273-
},
274-
BasicValueType::Bool => KeyValue::Bool(json_value.as_bool().unwrap_or(false)),
275-
BasicValueType::Date => {
276-
let date_str = json_value.as_str().unwrap_or("");
277-
KeyValue::Date(chrono::NaiveDate::parse_from_str(date_str, "%Y-%m-%d").unwrap_or_default())
278-
},
279-
_ => {
280-
// For other types, convert to string representation
281-
KeyValue::Str(json_value.to_string().into())
282-
}
283-
};
284-
key_values.push(key_value);
221+
// Composite primary key - combine each part
222+
let parts = self
223+
.table_schema
224+
.primary_key_columns
225+
.iter()
226+
.enumerate()
227+
.map(|(i, pk_col)| convert_pg_value_to_value(&row, pk_col, i))
228+
.collect::<Result<Vec<_>>>()?;
229+
if parts.iter().any(|v| v.is_null()) {
230+
Err(anyhow::anyhow!("Composite primary key contains NULL component"))?;
285231
}
286-
KeyValue::Struct(key_values)
232+
KeyValue::from_values(parts.iter())?
287233
};
288234

289235
yield vec![PartialSourceRowMetadata {
@@ -479,16 +425,8 @@ impl SourceExecutor for Executor {
479425
)
480426
})?;
481427

482-
let json_value = convert_pg_value_to_json(&row, value_col, col_index)?;
483-
484-
// Convert each column value to a field, similar to Google Drive
485-
match json_value {
486-
serde_json::Value::String(s) => fields.push(s.into()),
487-
serde_json::Value::Number(n) => fields.push(n.to_string().into()),
488-
serde_json::Value::Bool(b) => fields.push(b.to_string().into()),
489-
serde_json::Value::Null => fields.push("".to_string().into()),
490-
_ => fields.push(json_value.to_string().into()),
491-
}
428+
let value = convert_pg_value_to_value(&row, value_col, col_index)?;
429+
fields.push(value);
492430
}
493431
Some(SourceValue::Existence(FieldValues { fields }))
494432
}

0 commit comments

Comments
 (0)