Skip to content

Commit c3912dc

Browse files
committed
refactor: create a consolidated get_db_pool() shared by source/target
1 parent e7a1b20 commit c3912dc

File tree

6 files changed

+27
-41
lines changed

6 files changed

+27
-41
lines changed

src/ops/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ mod factory_bases;
66
mod functions;
77
mod sources;
88
mod targets;
9+
mod shared;
910

1011
mod registration;
1112
pub(crate) use registration::*;

src/ops/sdk.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ pub(crate) use crate::prelude::*;
22

33
use crate::builder::plan::AnalyzedFieldReference;
44
use crate::builder::plan::AnalyzedLocalFieldReference;
5-
use std::collections::BTreeMap;
65

76
pub use super::factory_bases::*;
87
pub use super::interface::*;

src/ops/shared/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod postgres;

src/ops/shared/postgres.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use crate::prelude::*;
2+
3+
use crate::settings::DatabaseConnectionSpec;
4+
use sqlx::PgPool;
5+
6+
pub async fn get_db_pool(
7+
db_ref: Option<&spec::AuthEntryReference<DatabaseConnectionSpec>>,
8+
auth_registry: &AuthRegistry,
9+
) -> Result<PgPool> {
10+
let lib_context = get_lib_context()?;
11+
let db_conn_spec = db_ref
12+
.as_ref()
13+
.map(|db_ref| auth_registry.get(db_ref))
14+
.transpose()?;
15+
let db_pool = match db_conn_spec {
16+
Some(db_conn_spec) => lib_context.db_pools.get_pool(&db_conn_spec).await?,
17+
None => lib_context.require_builtin_db_pool()?.clone(),
18+
};
19+
Ok(db_pool)
20+
}

src/ops/sources/postgres.rs

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::ops::sdk::*;
22

33
use crate::fields_value;
4+
use crate::ops::shared::postgres::get_db_pool;
45
use crate::settings::DatabaseConnectionSpec;
56
use sqlx::{Column, PgPool, Row};
67

@@ -33,8 +34,6 @@ struct PostgresTableSchema {
3334
struct Executor {
3435
db_pool: PgPool,
3536
table_name: String,
36-
#[allow(dead_code)] // Used indirectly during schema discovery, kept for completeness
37-
included_columns: Option<Vec<String>>,
3837
ordinal_column: Option<String>,
3938
table_schema: PostgresTableSchema,
4039
}
@@ -538,7 +537,7 @@ impl SourceFactoryBase for Factory {
538537
context: &FlowInstanceContext,
539538
) -> Result<EnrichedValueType> {
540539
// Fetch table schema to build dynamic output schema
541-
let db_pool = create_db_pool(spec.database.as_ref(), &context.auth_registry).await?;
540+
let db_pool = get_db_pool(spec.database.as_ref(), &context.auth_registry).await?;
542541
let table_schema =
543542
fetch_table_schema(&db_pool, &spec.table_name, &spec.included_columns).await?;
544543

@@ -613,7 +612,7 @@ impl SourceFactoryBase for Factory {
613612
spec: Spec,
614613
context: Arc<FlowInstanceContext>,
615614
) -> Result<Box<dyn SourceExecutor>> {
616-
let db_pool = create_db_pool(spec.database.as_ref(), &context.auth_registry).await?;
615+
let db_pool = get_db_pool(spec.database.as_ref(), &context.auth_registry).await?;
617616

618617
// Fetch table schema for dynamic type handling
619618
let table_schema =
@@ -622,7 +621,6 @@ impl SourceFactoryBase for Factory {
622621
let executor = Executor {
623622
db_pool,
624623
table_name: spec.table_name.clone(),
625-
included_columns: spec.included_columns.clone(),
626624
ordinal_column: spec.ordinal_column.clone(),
627625
table_schema,
628626
};
@@ -643,20 +641,3 @@ impl SourceFactoryBase for Factory {
643641
Ok(Box::new(executor))
644642
}
645643
}
646-
647-
/// Create a PostgreSQL connection pool (reused from postgres target)
648-
async fn create_db_pool(
649-
db_ref: Option<&spec::AuthEntryReference<DatabaseConnectionSpec>>,
650-
auth_registry: &AuthRegistry,
651-
) -> Result<PgPool> {
652-
let lib_context = crate::lib_context::get_lib_context()?;
653-
let db_conn_spec = db_ref
654-
.as_ref()
655-
.map(|db_ref| auth_registry.get(db_ref))
656-
.transpose()?;
657-
let db_pool = match db_conn_spec {
658-
Some(db_conn_spec) => lib_context.db_pools.get_pool(&db_conn_spec).await?,
659-
None => lib_context.require_builtin_db_pool()?.clone(),
660-
};
661-
Ok(db_pool)
662-
}

src/ops/targets/postgres.rs

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
use crate::prelude::*;
1+
use crate::ops::sdk::*;
22

33
use super::shared::table_columns::{
44
TableColumnsSchema, TableMainSetupAction, TableUpsertionAction, check_table_compatibility,
55
};
66
use crate::base::spec::{self, *};
7-
use crate::ops::sdk::*;
7+
use crate::ops::shared::postgres::get_db_pool;
88
use crate::settings::DatabaseConnectionSpec;
99
use async_trait::async_trait;
1010
use indexmap::{IndexMap, IndexSet};
@@ -634,22 +634,6 @@ impl SetupChange {
634634
}
635635
}
636636

637-
async fn get_db_pool(
638-
db_ref: Option<&spec::AuthEntryReference<DatabaseConnectionSpec>>,
639-
auth_registry: &AuthRegistry,
640-
) -> Result<PgPool> {
641-
let lib_context = get_lib_context()?;
642-
let db_conn_spec = db_ref
643-
.as_ref()
644-
.map(|db_ref| auth_registry.get(db_ref))
645-
.transpose()?;
646-
let db_pool = match db_conn_spec {
647-
Some(db_conn_spec) => lib_context.db_pools.get_pool(&db_conn_spec).await?,
648-
None => lib_context.require_builtin_db_pool()?.clone(),
649-
};
650-
Ok(db_pool)
651-
}
652-
653637
#[async_trait]
654638
impl TargetFactoryBase for Factory {
655639
type Spec = Spec;

0 commit comments

Comments
 (0)