Skip to content

Commit 81e5d98

Browse files
felipecrvfa-assistant
authored andcommitted
dbt-adapter: Hide the dependency on try_format_type behind a dyn trait (#5783)
* dbt-adapter: Hide the dependency on try_format_type behind a dyn trait This means concrete adapter implementations don't depend directly on `sdf_frontend` for the type formatting function anymore. - Introduce the `TypeFormatter` trait to dbt-fusion-adapter and provide an implementation for it in dbt-adapter - Inject the `TypeFormatter` from the factory implementation all the way to the `SqlEngine` - Remove `convert_type_inner` and implement `convert_type` fully by delegating to the `TypeFormatter` trait - Introduce the `ReplayAdapter` trait that is implemented by `DbtReplayAdapter` - Change `is_replay() -> bool` to `as_replay() -> Option<&dyn ReplayAdapter>` - Add a `NaiveTypeFormatterImpl` so it can be passed in the construction of `ParseAdapter` * cargo fmt GitOrigin-RevId: 8faadd690d6f037501267bcbab834c15a1bd47f3
1 parent 0dcf5b8 commit 81e5d98

File tree

10 files changed

+150
-15
lines changed

10 files changed

+150
-15
lines changed

crates/dbt-fusion-adapter/src/bridge_adapter.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,8 @@ impl BaseAdapter for BridgeAdapter {
500500
#[tracing::instrument(skip(self, state), level = "trace")]
501501
fn rename_relation(&self, state: &State, args: &[Value]) -> Result<Value, MinijinjaError> {
502502
self.cache_renamed(state, args)?;
503-
if self.typed_adapter.is_replay() {
503+
if self.typed_adapter.as_replay().is_some() {
504+
// TODO: move this logic to the [ReplayAdapter]
504505
let iter = ArgsIter::new(
505506
current_function_name!(),
506507
&["from_relation", "to_relation"],
@@ -714,7 +715,8 @@ impl BaseAdapter for BridgeAdapter {
714715
identifier: &str,
715716
) -> Result<Value, MinijinjaError> {
716717
// Skip cache in replay mode
717-
if !self.typed_adapter.is_replay() {
718+
let is_replay = self.typed_adapter.as_replay().is_some();
719+
if !is_replay {
718720
let temp_relation = relation_object::create_relation(
719721
self.typed_adapter.adapter_type(),
720722
database.to_string(),
@@ -811,7 +813,8 @@ impl BaseAdapter for BridgeAdapter {
811813
let relation = parser.get::<Value>("relation")?;
812814
let relation = downcast_value_to_dyn_base_relation(&relation)?;
813815

814-
if self.typed_adapter.is_replay() {
816+
if self.typed_adapter.as_replay().is_some() {
817+
// TODO: move this logic to the [ReplayAdapter]
815818
return match self.typed_adapter.get_columns_in_relation(state, relation) {
816819
Ok(result) => Ok(Value::from(result)),
817820
Err(e) => Err(MinijinjaError::new(
@@ -899,7 +902,8 @@ impl BaseAdapter for BridgeAdapter {
899902
let schema = parser.get::<String>("schema")?;
900903

901904
// Replay fast-path: consult trace-derived cache if available
902-
if self.typed_adapter.is_replay() {
905+
if self.typed_adapter.as_replay().is_some() {
906+
// TODO: move this logic to the [ReplayAdapter]
903907
if let Some(exists) = self
904908
.typed_adapter
905909
.schema_exists_from_trace(&database, &schema)

crates/dbt-fusion-adapter/src/parse/adapter.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::parse::relation::EmptyRelation;
99
use crate::query_comment::QueryCommentConfig;
1010
use crate::relation_object::{RelationObject, create_relation};
1111
use crate::response::AdapterResponse;
12+
use crate::sql_types::TypeFormatter;
1213
use crate::stmt_splitter::NaiveStmtSplitter;
1314
use crate::typed_adapter::TypedBaseAdapter;
1415
use crate::{AdapterResult, SqlEngine};
@@ -98,6 +99,7 @@ impl ParseAdapter {
9899
adapter_type: AdapterType,
99100
config: dbt_serde_yaml::Mapping,
100101
package_quoting: DbtQuoting,
102+
type_formatter: Box<dyn TypeFormatter>,
101103
token: CancellationToken,
102104
) -> Self {
103105
let backend = backend_of(adapter_type);
@@ -114,6 +116,7 @@ impl ParseAdapter {
114116
adapter_factory,
115117
stmt_splitter,
116118
query_comment,
119+
type_formatter,
117120
token.clone(),
118121
);
119122

crates/dbt-fusion-adapter/src/record_and_replay.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::config::AdapterConfig;
33
use crate::errors::AdapterResult;
44
use crate::query_comment::QueryCommentConfig;
55
use crate::sql_engine::SqlEngine;
6+
use crate::sql_types::TypeFormatter;
67
use crate::stmt_splitter::StmtSplitter;
78

89
use adbc_core::error::{Error as AdbcError, Result as AdbcResult, Status as AdbcStatus};
@@ -134,6 +135,10 @@ impl RecordEngine {
134135
self.0.engine.query_comment()
135136
}
136137

138+
pub(crate) fn type_formatter(&self) -> &dyn TypeFormatter {
139+
self.0.engine.type_formatter()
140+
}
141+
137142
pub fn cancellation_token(&self) -> CancellationToken {
138143
self.0.engine.cancellation_token()
139144
}
@@ -368,6 +373,7 @@ struct ReplayEngineInner {
368373
adapter_factory: Arc<dyn AdapterFactory>,
369374
stmt_splitter: Arc<dyn StmtSplitter>,
370375
query_comment: QueryCommentConfig,
376+
type_formatter: Box<dyn TypeFormatter>,
371377
/// Global CLI cancellation token
372378
cancellation_token: CancellationToken,
373379
}
@@ -382,13 +388,15 @@ impl ReplayEngineInner {
382388
pub struct ReplayEngine(Arc<ReplayEngineInner>);
383389

384390
impl ReplayEngine {
391+
#[allow(clippy::too_many_arguments)]
385392
pub fn new(
386393
backend: Backend,
387394
path: PathBuf,
388395
config: AdapterConfig,
389396
adapter_factory: Arc<dyn AdapterFactory>,
390397
stmt_splitter: Arc<dyn StmtSplitter>,
391398
query_comment: QueryCommentConfig,
399+
type_formatter: Box<dyn TypeFormatter>,
392400
token: CancellationToken,
393401
) -> Self {
394402
let inner = ReplayEngineInner {
@@ -398,6 +406,7 @@ impl ReplayEngine {
398406
adapter_factory,
399407
stmt_splitter,
400408
query_comment,
409+
type_formatter,
401410
cancellation_token: token,
402411
};
403412
ReplayEngine(Arc::new(inner))
@@ -428,6 +437,10 @@ impl ReplayEngine {
428437
&self.0.query_comment
429438
}
430439

440+
pub(crate) fn type_formatter(&self) -> &dyn TypeFormatter {
441+
self.0.type_formatter.as_ref()
442+
}
443+
431444
pub fn cancellation_token(&self) -> CancellationToken {
432445
self.0.cancellation_token.clone()
433446
}

crates/dbt-fusion-adapter/src/sql_engine.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::base_adapter::{AdapterFactory, backend_of};
44
use crate::config::AdapterConfig;
55
use crate::errors::{AdapterError, AdapterErrorKind, AdapterResult};
66
use crate::query_comment::{EMPTY_CONFIG, QueryCommentConfig};
7+
use crate::sql_types::{NaiveTypeFormatterImpl, TypeFormatter};
78
use crate::stmt_splitter::StmtSplitter;
89

910
use adbc_core::options::{OptionStatement, OptionValue};
@@ -42,6 +43,13 @@ type Options = Vec<(String, OptionValue)>;
4243
static NAIVE_STMT_SPLITTER: LazyLock<Arc<dyn StmtSplitter>> =
4344
LazyLock::new(|| Arc::new(crate::stmt_splitter::NaiveStmtSplitter));
4445

46+
/// Naive type formatter used in the MockAdapter
47+
///
48+
/// IMPORTANT: not suitable for production use. DEFAULTS TO SNOWFLAKE ALSO.
49+
/// TODO: remove when the full formatter is available to this crate.
50+
static NAIVE_TYPE_FORMATTER: LazyLock<Box<dyn TypeFormatter>> =
51+
LazyLock::new(|| Box::new(NaiveTypeFormatterImpl::new(AdapterType::Snowflake)));
52+
4553
#[derive(Default)]
4654
struct IdentityHasher {
4755
hash: u64,
@@ -96,6 +104,8 @@ pub struct ActualEngine {
96104
splitter: Arc<dyn StmtSplitter>,
97105
/// Query comment config
98106
query_comment: QueryCommentConfig,
107+
/// Type formatter for the dilect this engine is for
108+
pub type_formatter: Box<dyn TypeFormatter>,
99109
/// Global CLI cancellation token
100110
cancellation_token: CancellationToken,
101111
}
@@ -107,6 +117,7 @@ impl ActualEngine {
107117
adapter_factory: Arc<dyn AdapterFactory>,
108118
splitter: Arc<dyn StmtSplitter>,
109119
query_comment: QueryCommentConfig,
120+
type_formatter: Box<dyn TypeFormatter>,
110121
token: CancellationToken,
111122
) -> Self {
112123
let threads = config
@@ -127,7 +138,9 @@ impl ActualEngine {
127138
semaphore: Arc::new(Semaphore::new(permits)),
128139
adapter_factory,
129140
splitter,
141+
type_formatter,
130142
query_comment,
143+
131144
cancellation_token: token,
132145
}
133146
}
@@ -216,6 +229,7 @@ impl SqlEngine {
216229
adapter_factory: Arc<dyn AdapterFactory>,
217230
stmt_splitter: Arc<dyn StmtSplitter>,
218231
query_comment: QueryCommentConfig,
232+
type_formatter: Box<dyn TypeFormatter>,
219233
token: CancellationToken,
220234
) -> Arc<Self> {
221235
let engine = ActualEngine::new(
@@ -224,19 +238,22 @@ impl SqlEngine {
224238
adapter_factory,
225239
stmt_splitter,
226240
query_comment,
241+
type_formatter,
227242
token,
228243
);
229244
Arc::new(SqlEngine::Warehouse(Arc::new(engine)))
230245
}
231246

232247
/// Create a new [`SqlEngine::Replay`] based on the given path and adapter type.
248+
#[allow(clippy::too_many_arguments)]
233249
pub fn new_for_replaying(
234250
backend: Backend,
235251
path: PathBuf,
236252
config: AdapterConfig,
237253
adapter_factory: Arc<dyn AdapterFactory>,
238254
stmt_splitter: Arc<dyn StmtSplitter>,
239255
query_comment: QueryCommentConfig,
256+
type_formatter: Box<dyn TypeFormatter>,
240257
token: CancellationToken,
241258
) -> Arc<Self> {
242259
let engine = ReplayEngine::new(
@@ -246,6 +263,7 @@ impl SqlEngine {
246263
adapter_factory,
247264
stmt_splitter,
248265
query_comment,
266+
type_formatter,
249267
token,
250268
);
251269
Arc::new(SqlEngine::Replay(engine))
@@ -267,6 +285,15 @@ impl SqlEngine {
267285
}
268286
}
269287

288+
pub fn type_formatter(&self) -> &dyn TypeFormatter {
289+
match self {
290+
SqlEngine::Warehouse(engine) => engine.type_formatter.as_ref(),
291+
SqlEngine::Record(engine) => engine.type_formatter(),
292+
SqlEngine::Replay(engine) => engine.type_formatter(),
293+
SqlEngine::Mock(_adapter_type) => NAIVE_TYPE_FORMATTER.as_ref(),
294+
}
295+
}
296+
270297
/// Split SQL statements using the provided dialect
271298
///
272299
/// This method handles the splitting of SQL statements based on the dialect's rules.

crates/dbt-fusion-adapter/src/sql_types.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,59 @@
11
use std::borrow::Cow;
22

3+
use crate::AdapterResult;
4+
use crate::base_adapter::backend_of;
35
use crate::errors::{AdapterError, AdapterErrorKind};
46
use arrow_schema::DataType;
57
use dbt_common::adapter::AdapterType;
8+
use dbt_xdbc::sql::types::SqlType;
9+
10+
pub trait TypeFormatter: Send + Sync {
11+
/// Picks a SQL type for a given Arrow DataType and renders it as SQL.
12+
///
13+
/// The implementation is dialect-specific.
14+
fn format_arrow_type_as_sql(&self, data_type: &DataType, out: &mut String)
15+
-> AdapterResult<()>;
16+
17+
/// Renders a given SqlType as SQL.
18+
///
19+
/// The implementation is dialect-specific.
20+
fn format_sql_type(&self, sql_type: SqlType, out: &mut String) -> AdapterResult<()>;
21+
}
22+
23+
pub struct NaiveTypeFormatterImpl(AdapterType, dbt_xdbc::Backend);
24+
25+
impl NaiveTypeFormatterImpl {
26+
pub fn new(adapter_type: AdapterType) -> Self {
27+
let backend = backend_of(adapter_type);
28+
Self(adapter_type, backend)
29+
}
30+
}
31+
32+
impl TypeFormatter for NaiveTypeFormatterImpl {
33+
fn format_arrow_type_as_sql(
34+
&self,
35+
data_type: &DataType,
36+
out: &mut String,
37+
) -> AdapterResult<()> {
38+
let adapter_type = self.0;
39+
let hint: SqlTypeHint = data_type.try_into()?;
40+
// TODO: handle has_decimal_places correctly
41+
let has_decimal_places = false;
42+
let res = sql_type_hint_to_str(hint, has_decimal_places, adapter_type);
43+
out.push_str(res.as_ref());
44+
Ok(())
45+
}
46+
47+
fn format_sql_type(&self, sql_type: SqlType, out: &mut String) -> AdapterResult<()> {
48+
let backend = self.1;
49+
sql_type.write(backend, out).map_err(|e| {
50+
AdapterError::new(
51+
AdapterErrorKind::NotSupported,
52+
format!("Failed to convert SQL type {sql_type:?}. Error: {e}"),
53+
)
54+
})
55+
}
56+
}
657

758
pub enum SqlTypeHint {
859
Integer,

crates/dbt-fusion-adapter/src/typed_adapter.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -364,15 +364,16 @@ pub trait TypedBaseAdapter: fmt::Debug + Send + Sync + AdapterTyping {
364364
}
365365
}
366366

367-
fn convert_type_inner(&self, _state: &State, data_type: &DataType) -> AdapterResult<String>;
368-
369367
/// Convert type.
370368
fn convert_type(
371369
&self,
372370
state: &State,
373371
table: Arc<AgateTable>,
374372
col_idx: i64,
375373
) -> AdapterResult<String> {
374+
// XXX: Core uses the flattened agate table types. Here we use the original arrow
375+
// schema containing the original table types including nested types. This might
376+
// be what Core developers expected to get from Python agate types as well. (?)
376377
let schema = table.original_record_batch().schema();
377378
let data_type = schema.field(col_idx as usize).data_type();
378379

@@ -387,7 +388,16 @@ pub trait TypedBaseAdapter: fmt::Debug + Send + Sync + AdapterTyping {
387388
data_type
388389
};
389390

390-
self.convert_type_inner(state, data_type)
391+
if let Some(replay_adapter) = self.as_replay() {
392+
// XXX: isn't the point of replay adapter to compare what it does against the actual code?
393+
return replay_adapter.replay_convert_type(state, data_type);
394+
}
395+
396+
let mut out = String::new();
397+
self.engine()
398+
.type_formatter()
399+
.format_arrow_type_as_sql(data_type, &mut out)?;
400+
Ok(out)
391401
}
392402

393403
/// Expand the to_relation table's column types to match the schema of from_relation
@@ -948,9 +958,9 @@ pub trait TypedBaseAdapter: fmt::Debug + Send + Sync + AdapterTyping {
948958
}
949959
}
950960

951-
/// Convenience to check if this [TypedBaseAdapter] implementer is used for replaying recordings
952-
fn is_replay(&self) -> bool {
953-
false
961+
/// This adapter as the replay adapter if it is one, None otherwise.
962+
fn as_replay(&self) -> Option<&dyn ReplayAdapter> {
963+
None
954964
}
955965

956966
/// Optional fast-path for replay adapters: return schema existence from the trace
@@ -959,3 +969,10 @@ pub trait TypedBaseAdapter: fmt::Debug + Send + Sync + AdapterTyping {
959969
None
960970
}
961971
}
972+
973+
/// Abstract interface for the concrete replay adapter implementation.
974+
///
975+
/// NOTE: this is a growing interface that is currently growing.
976+
pub trait ReplayAdapter: TypedBaseAdapter {
977+
fn replay_convert_type(&self, state: &State, data_type: &DataType) -> AdapterResult<String>;
978+
}

crates/dbt-jinja-utils/src/environment_builder.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,7 @@ mod tests {
490490
use dbt_common::adapter::AdapterType;
491491
use dbt_common::cancellation::never_cancels;
492492
use dbt_fusion_adapter::ParseAdapter;
493+
use dbt_fusion_adapter::sql_types::NaiveTypeFormatterImpl;
493494
use dbt_schemas::schemas::relations::DEFAULT_DBT_QUOTING;
494495
use minijinja::{
495496
constants::MACRO_DISPATCH_ORDER, context, dispatch_object::THREAD_LOCAL_DEPENDENCIES,
@@ -603,6 +604,7 @@ all okay!");
603604
AdapterType::Postgres,
604605
dbt_serde_yaml::Mapping::default(),
605606
DEFAULT_DBT_QUOTING,
607+
Box::new(NaiveTypeFormatterImpl::new(AdapterType::Postgres)),
606608
never_cancels(),
607609
);
608610
let builder: JinjaEnvBuilder = JinjaEnvBuilder::new()
@@ -689,6 +691,7 @@ all okay!");
689691
AdapterType::Postgres,
690692
dbt_serde_yaml::Mapping::default(),
691693
DEFAULT_DBT_QUOTING,
694+
Box::new(NaiveTypeFormatterImpl::new(AdapterType::Postgres)),
692695
never_cancels(),
693696
);
694697
let builder: JinjaEnvBuilder = JinjaEnvBuilder::new()
@@ -750,6 +753,7 @@ all okay!");
750753
AdapterType::Postgres,
751754
dbt_serde_yaml::Mapping::default(),
752755
DEFAULT_DBT_QUOTING,
756+
Box::new(NaiveTypeFormatterImpl::new(AdapterType::Postgres)),
753757
never_cancels(),
754758
);
755759
let env = JinjaEnvBuilder::new()
@@ -864,6 +868,7 @@ all okay!");
864868
AdapterType::Postgres,
865869
dbt_serde_yaml::Mapping::default(),
866870
DEFAULT_DBT_QUOTING,
871+
Box::new(NaiveTypeFormatterImpl::new(AdapterType::Postgres)),
867872
never_cancels(),
868873
);
869874

0 commit comments

Comments
 (0)