Skip to content

Commit e7a1b20

Browse files
committed
fix: fix list() interface and use Postgres instead of PostgresDb
1 parent f4b23f3 commit e7a1b20

File tree

6 files changed

+22
-24
lines changed

6 files changed

+22
-24
lines changed

examples/postgres_embedding/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ The example uses these environment variables to configure the PostgreSQL source:
107107

108108
The example demonstrates a simple flow:
109109

110-
1. **Read from Source**: Uses `cocoindex.sources.PostgresDb` to read from your existing table
110+
1. **Read from Source**: Uses `cocoindex.sources.Postgres` to read from your existing table
111111
2. **Generate Embeddings**: Processes text and creates embeddings using SentenceTransformers
112112
3. **Store Embeddings**: Exports to the CocoIndex database with automatic table creation
113113
4. **Search**: Provides interactive semantic search over the stored embeddings

examples/postgres_embedding/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ def postgres_embedding_flow(
123123
postgres_source_kwargs["ordinal_column"] = ordinal_column
124124

125125
data_scope["documents"] = flow_builder.add_source(
126-
cocoindex.sources.PostgresDb(**postgres_source_kwargs)
126+
cocoindex.sources.Postgres(**postgres_source_kwargs)
127127
)
128128

129129
document_embeddings = data_scope.add_collector()

python/cocoindex/sources.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class AzureBlob(op.SourceSpec):
6969
account_access_key: TransientAuthEntryReference[str] | None = None
7070

7171

72-
class PostgresDb(op.SourceSpec):
72+
class Postgres(op.SourceSpec):
7373
"""Import data from a PostgreSQL table."""
7474

7575
_op_category = op.OpCategory.SOURCE

src/ops/registration.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ fn register_executor_factories(registry: &mut ExecutorFactoryRegistry) -> Result
1212
sources::google_drive::Factory.register(registry)?;
1313
sources::amazon_s3::Factory.register(registry)?;
1414
sources::azure_blob::Factory.register(registry)?;
15-
sources::postgres_db::Factory.register(registry)?;
15+
sources::postgres::Factory.register(registry)?;
1616

1717
functions::parse_json::Factory.register(registry)?;
1818
functions::split_recursively::register(registry)?;

src/ops/sources/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ pub mod amazon_s3;
44
pub mod azure_blob;
55
pub mod google_drive;
66
pub mod local_file;
7-
pub mod postgres_db;
7+
pub mod postgres;

src/ops/sources/postgres_db.rs renamed to src/ops/sources/postgres.rs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,8 @@
1-
use crate::fields_value;
2-
use async_stream::try_stream;
3-
use futures::TryStreamExt;
4-
use log::info;
5-
use std::sync::Arc;
6-
7-
use crate::base::spec;
81
use crate::ops::sdk::*;
2+
3+
use crate::fields_value;
94
use crate::settings::DatabaseConnectionSpec;
10-
use async_trait::async_trait;
11-
use sqlx::Column;
12-
use sqlx::{PgPool, Row};
5+
use sqlx::{Column, PgPool, Row};
136

147
#[derive(Debug, Deserialize)]
158
pub struct Spec {
@@ -227,11 +220,11 @@ fn convert_pg_value_to_json(
227220

228221
#[async_trait]
229222
impl SourceExecutor for Executor {
230-
fn list<'a>(
231-
&'a self,
232-
_options: &'a SourceExecutorListOptions,
233-
) -> BoxStream<'a, Result<Vec<PartialSourceRowMetadata>>> {
234-
try_stream! {
223+
async fn list(
224+
&self,
225+
_options: &SourceExecutorListOptions,
226+
) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRowMetadata>>>> {
227+
let stream = try_stream! {
235228
// Build query to select primary key columns
236229
let pk_columns: Vec<String> = self.table_schema.primary_key_columns
237230
.iter()
@@ -303,6 +296,7 @@ impl SourceExecutor for Executor {
303296
key,
304297
key_aux_info: serde_json::Value::Null,
305298
ordinal: Some(Ordinal::unavailable()),
299+
content_version_fp: None,
306300
});
307301

308302
if batch.len() >= batch_size {
@@ -314,8 +308,8 @@ impl SourceExecutor for Executor {
314308
if !batch.is_empty() {
315309
yield batch;
316310
}
317-
}
318-
.boxed()
311+
};
312+
Ok(stream.boxed())
319313
}
320314

321315
async fn get_value(
@@ -520,7 +514,11 @@ impl SourceExecutor for Executor {
520514
None
521515
};
522516

523-
Ok(PartialSourceRowData { value, ordinal })
517+
Ok(PartialSourceRowData {
518+
value,
519+
ordinal,
520+
content_version_fp: None,
521+
})
524522
}
525523
}
526524

@@ -531,7 +529,7 @@ impl SourceFactoryBase for Factory {
531529
type Spec = Spec;
532530

533531
fn name(&self) -> &str {
534-
"PostgresDb"
532+
"Postgres"
535533
}
536534

537535
async fn get_output_schema(

0 commit comments

Comments
 (0)