Skip to content

Commit 1012af2

Browse files
committed
directly keep cocoindex types in Executor
1 parent c3912dc commit 1012af2

File tree

2 files changed

+27
-41
lines changed

2 files changed

+27
-41
lines changed

src/ops/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ pub mod registry;
44
// All operations
55
mod factory_bases;
66
mod functions;
7+
mod shared;
78
mod sources;
89
mod targets;
9-
mod shared;
1010

1111
mod registration;
1212
pub(crate) use registration::*;

src/ops/sources/postgres.rs

Lines changed: 26 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub struct Spec {
2020
#[derive(Debug, Clone)]
2121
struct ColumnInfo {
2222
name: String,
23-
data_type: String,
23+
value_type: BasicValueType,
2424
is_nullable: bool,
2525
is_primary_key: bool,
2626
}
@@ -99,34 +99,34 @@ async fn fetch_table_schema(
9999
let mut value_columns = Vec::new();
100100

101101
for row in rows {
102+
let col_name: String = row.try_get::<String, _>("column_name")?;
103+
let pg_type_str: String = row.try_get::<String, _>("data_type")?;
104+
let is_nullable: bool = row.try_get::<String, _>("is_nullable")? == "YES";
105+
let is_primary_key: bool = row.try_get::<bool, _>("is_primary_key")?;
106+
102107
let column_info = ColumnInfo {
103-
name: row.try_get::<String, _>("column_name")?,
104-
data_type: row.try_get::<String, _>("data_type")?,
105-
is_nullable: row.try_get::<String, _>("is_nullable")? == "YES",
106-
is_primary_key: row.try_get::<bool, _>("is_primary_key")?,
108+
name: col_name,
109+
value_type: map_postgres_type_to_cocoindex(&pg_type_str),
110+
is_nullable,
111+
is_primary_key,
107112
};
108113

109114
// Always include primary key columns
110115
if column_info.is_primary_key {
111116
primary_key_columns.push(column_info.clone());
112117
} else {
113118
// For value columns, check if filtering is enabled
114-
let should_include = match included_columns {
115-
Some(included_list) => included_list.contains(&column_info.name),
116-
None => true, // Include all columns if no filter specified
117-
};
118-
119-
if should_include {
119+
if included_columns
120+
.as_ref()
121+
.map_or(true, |cols| cols.contains(&column_info.name))
122+
{
120123
value_columns.push(column_info);
121124
}
122125
}
123126
}
124127

125128
if primary_key_columns.is_empty() {
126-
return Err(anyhow::anyhow!(
127-
"Table '{}' has no primary key defined",
128-
table_name
129-
));
129+
api_bail!("Table `{table_name}` has no primary key defined");
130130
}
131131

132132
Ok(PostgresTableSchema {
@@ -142,7 +142,7 @@ fn convert_pg_value_to_json(
142142
col_index: usize,
143143
) -> Result<serde_json::Value> {
144144
// Check for null values in a type-agnostic way based on the actual column type
145-
let is_null = match map_postgres_type_to_cocoindex(&column.data_type) {
145+
let is_null = match &column.value_type {
146146
BasicValueType::Uuid => row.try_get::<Option<uuid::Uuid>, _>(col_index)?.is_none(),
147147
BasicValueType::Int64 => row.try_get::<Option<i64>, _>(col_index)?.is_none(),
148148
BasicValueType::Bool => row.try_get::<Option<bool>, _>(col_index)?.is_none(),
@@ -155,7 +155,7 @@ fn convert_pg_value_to_json(
155155
return Ok(serde_json::Value::Null);
156156
}
157157

158-
match map_postgres_type_to_cocoindex(&column.data_type) {
158+
match &column.value_type {
159159
BasicValueType::Bytes => {
160160
let bytes: Vec<u8> = row.try_get(col_index)?;
161161
Ok(serde_json::to_value(bytes)?)
@@ -237,20 +237,15 @@ impl SourceExecutor for Executor {
237237
query.push_str(&format!(" ORDER BY \"{}\"", ordinal_col));
238238
}
239239

240-
info!("Executing query: {}", query);
241-
242240
let mut rows = sqlx::query(&query).fetch(&self.db_pool);
243-
let mut batch: Vec<PartialSourceRowMetadata> = Vec::new();
244-
let batch_size = 1000; // Process in batches
245-
246241
while let Some(row) = rows.try_next().await? {
247242
// Handle both single and composite primary keys
248243
let key = if self.table_schema.primary_key_columns.len() == 1 {
249244
// Single primary key - extract directly
250245
let pk_col = &self.table_schema.primary_key_columns[0];
251246
let json_value = convert_pg_value_to_json(&row, pk_col, 0)?;
252247

253-
match map_postgres_type_to_cocoindex(&pk_col.data_type) {
248+
match &pk_col.value_type {
254249
BasicValueType::Str => KeyValue::Str(json_value.as_str().unwrap_or("").to_string().into()),
255250
BasicValueType::Int64 => KeyValue::Int64(json_value.as_i64().unwrap_or(0)),
256251
BasicValueType::Uuid => {
@@ -269,7 +264,7 @@ impl SourceExecutor for Executor {
269264
let json_value = convert_pg_value_to_json(&row, pk_col, i)?;
270265

271266
// Convert each primary key column to appropriate KeyValue type
272-
let key_value = match map_postgres_type_to_cocoindex(&pk_col.data_type) {
267+
let key_value = match &pk_col.value_type {
273268
BasicValueType::Str => KeyValue::Str(json_value.as_str().unwrap_or("").to_string().into()),
274269
BasicValueType::Int64 => KeyValue::Int64(json_value.as_i64().unwrap_or(0)),
275270
BasicValueType::Uuid => {
@@ -291,21 +286,12 @@ impl SourceExecutor for Executor {
291286
KeyValue::Struct(key_values)
292287
};
293288

294-
batch.push(PartialSourceRowMetadata {
289+
yield vec![PartialSourceRowMetadata {
295290
key,
296291
key_aux_info: serde_json::Value::Null,
297292
ordinal: Some(Ordinal::unavailable()),
298293
content_version_fp: None,
299-
});
300-
301-
if batch.len() >= batch_size {
302-
yield batch;
303-
batch = Vec::new();
304-
}
305-
}
306-
307-
if !batch.is_empty() {
308-
yield batch;
294+
}];
309295
}
310296
};
311297
Ok(stream.boxed())
@@ -326,7 +312,7 @@ impl SourceExecutor for Executor {
326312
.collect();
327313
let simple_query = if self.table_schema.primary_key_columns.len() == 1 {
328314
let pk_col = &self.table_schema.primary_key_columns[0];
329-
let key_condition = match map_postgres_type_to_cocoindex(&pk_col.data_type) {
315+
let key_condition = match &pk_col.value_type {
330316
BasicValueType::Uuid => {
331317
// For UUID keys, extract the UUID value directly
332318
let uuid_val = match key {
@@ -383,7 +369,7 @@ impl SourceExecutor for Executor {
383369
.zip(key_values.iter())
384370
.enumerate()
385371
{
386-
let condition = match map_postgres_type_to_cocoindex(&pk_col.data_type) {
372+
let condition = match &pk_col.value_type {
387373
BasicValueType::Uuid => {
388374
let uuid_val = match key_value {
389375
KeyValue::Uuid(uuid) => uuid,
@@ -550,7 +536,7 @@ impl SourceFactoryBase for Factory {
550536
if table_schema.primary_key_columns.len() == 1 {
551537
// Single primary key - first field is the key
552538
let pk_col = &table_schema.primary_key_columns[0];
553-
let cocoindex_type = map_postgres_type_to_cocoindex(&pk_col.data_type);
539+
let cocoindex_type = pk_col.value_type.clone();
554540
let field_type = if pk_col.is_nullable {
555541
make_output_type(cocoindex_type).with_nullable(true)
556542
} else {
@@ -564,7 +550,7 @@ impl SourceFactoryBase for Factory {
564550
let mut key_builder = StructSchemaBuilder::new(&mut key_struct_schema);
565551

566552
for pk_col in &table_schema.primary_key_columns {
567-
let cocoindex_type = map_postgres_type_to_cocoindex(&pk_col.data_type);
553+
let cocoindex_type = pk_col.value_type.clone();
568554
let field_type = if pk_col.is_nullable {
569555
make_output_type(cocoindex_type).with_nullable(true)
570556
} else {
@@ -583,7 +569,7 @@ impl SourceFactoryBase for Factory {
583569

584570
// Add value columns as fields (these match what get_value() returns)
585571
for value_col in &table_schema.value_columns {
586-
let cocoindex_type = map_postgres_type_to_cocoindex(&value_col.data_type);
572+
let cocoindex_type = value_col.value_type.clone();
587573
let field_type = if value_col.is_nullable {
588574
make_output_type(cocoindex_type).with_nullable(true)
589575
} else {

0 commit comments

Comments
 (0)