Skip to content

Commit 6161d2f

Browse files
xuliangsfa-assistant
authored andcommitted
[Adapters][Salesforce] another set of improvements (#5694)
* alias database to data_space in project configs * Update dbt_project.yml: example usage for data_space config Update examples to showcase data tests support * add data_transform_run_timeout in profiles.yml * allows configuring statement options from config; update the profiles.yml as an example * needs to provide a default, otherwise it causes issues downstream due to null yaml value is converted to string "null" * include a select * example, but this won't work; see the e2e/main.go example in salesforce adbc * prepare to use adapter.load_dataframe to support salesforce seed * fix a syntax error * disable this model * bump the sf version * nit GitOrigin-RevId: 708429f6135f17a35d710e4a75ca4b346f355f0a
1 parent 731a98c commit 6161d2f

File tree

15 files changed

+163
-31
lines changed

15 files changed

+163
-31
lines changed

crates/dbt-fusion-adapter/src/base_adapter.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -423,9 +423,17 @@ pub trait BaseAdapter: fmt::Display + fmt::Debug + AdapterTyping + Send + Sync {
423423
) -> Result<Value, MinijinjaError>;
424424

425425
/// load_dataframe
426-
fn load_dataframe(&self, _state: &State, _args: &[Value]) -> Result<Value, MinijinjaError> {
427-
unimplemented!("only available with BigQuery adapter")
428-
}
426+
#[allow(clippy::too_many_arguments)]
427+
fn load_dataframe(
428+
&self,
429+
_state: &State,
430+
_database: &str,
431+
_schema: &str,
432+
_table_name: &str,
433+
_agate_table: Arc<AgateTable>,
434+
_file_path: &str,
435+
_field_delimiter: &str,
436+
) -> Result<Value, MinijinjaError>;
429437

430438
/// upload_file
431439
fn upload_file(&self, _state: &State, _args: &[Value]) -> Result<Value, MinijinjaError> {

crates/dbt-fusion-adapter/src/bridge_adapter.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1644,12 +1644,28 @@ impl BaseAdapter for BridgeAdapter {
16441644
}
16451645

16461646
#[tracing::instrument(skip(self, state), level = "trace")]
1647-
fn load_dataframe(&self, state: &State, args: &[Value]) -> Result<Value, MinijinjaError> {
1647+
fn load_dataframe(
1648+
&self,
1649+
state: &State,
1650+
database: &str,
1651+
schema: &str,
1652+
table_name: &str,
1653+
agate_table: Arc<AgateTable>,
1654+
file_path: &str,
1655+
field_delimiter: &str,
1656+
) -> Result<Value, MinijinjaError> {
16481657
let mut conn = self.borrow_tlocal_connection(node_id_from_state(state))?;
16491658
let query_ctx = query_ctx_from_state(state)?.with_desc("load_dataframe");
1650-
let result = self
1651-
.typed_adapter
1652-
.load_dataframe(&query_ctx, conn.as_mut(), args)?;
1659+
let result = self.typed_adapter.load_dataframe(
1660+
&query_ctx,
1661+
conn.as_mut(),
1662+
database,
1663+
schema,
1664+
table_name,
1665+
agate_table,
1666+
file_path,
1667+
field_delimiter,
1668+
)?;
16531669
Ok(result)
16541670
}
16551671
}

crates/dbt-fusion-adapter/src/funcs.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,45 @@ pub fn dispatch_adapter_calls(
147147
"update_columns" => adapter.update_columns(state, args),
148148
"update_table_description" => adapter.update_table_description(state, args),
149149
"alter_table_add_columns" => adapter.alter_table_add_columns(state, args),
150-
"load_dataframe" => adapter.load_dataframe(state, args),
150+
"load_dataframe" => {
151+
let iter = ArgsIter::new(
152+
name,
153+
&[
154+
"database",
155+
"schema",
156+
"table_name",
157+
"field_path",
158+
"agate_table",
159+
"field_delimiter",
160+
],
161+
args,
162+
);
163+
let database = iter.next_arg::<&str>()?;
164+
let schema = iter.next_arg::<&str>()?;
165+
let table_name = iter.next_arg::<&str>()?;
166+
let file_path = iter.next_arg::<&str>()?;
167+
let agate_table = iter
168+
.next_arg::<&Value>()?
169+
.downcast_object::<AgateTable>()
170+
.ok_or_else(|| {
171+
MinijinjaError::new(
172+
MinijinjaErrorKind::InvalidOperation,
173+
"agate_table must be an agate.Table",
174+
)
175+
})?;
176+
let field_delimiter = iter.next_arg::<&str>()?;
177+
iter.finish()?;
178+
179+
adapter.load_dataframe(
180+
state,
181+
database,
182+
schema,
183+
table_name,
184+
agate_table,
185+
file_path,
186+
field_delimiter,
187+
)
188+
}
151189
"upload_file" => adapter.upload_file(state, args),
152190
"get_bq_table" => adapter.get_bq_table(state, args),
153191
"describe_relation" => adapter.describe_relation(state, args),

crates/dbt-fusion-adapter/src/parse/adapter.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,16 @@ impl BaseAdapter for ParseAdapter {
592592
Ok(none_value())
593593
}
594594

595-
fn load_dataframe(&self, _state: &State, _args: &[Value]) -> Result<Value, MinijinjaError> {
595+
fn load_dataframe(
596+
&self,
597+
_state: &State,
598+
_database: &str,
599+
_schema: &str,
600+
_table_name: &str,
601+
_agate_table: Arc<AgateTable>,
602+
_file_path: &str,
603+
_field_delimiter: &str,
604+
) -> Result<Value, MinijinjaError> {
596605
Ok(none_value())
597606
}
598607

crates/dbt-fusion-adapter/src/sql_engine.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::base_adapter::{AdapterFactory, backend_of};
44
use crate::config::AdapterConfig;
55
use crate::errors::{AdapterError, AdapterErrorKind, AdapterResult};
66
use crate::query_comment::{EMPTY_CONFIG, QueryCommentConfig};
7+
use crate::record_and_replay::{RecordEngine, ReplayEngine};
78
use crate::sql_types::{NaiveTypeFormatterImpl, TypeFormatter};
89
use crate::stmt_splitter::StmtSplitter;
910

@@ -32,8 +33,6 @@ use std::sync::RwLock;
3233
use std::sync::{Arc, LazyLock};
3334
use std::{thread, time::Duration};
3435

35-
use super::record_and_replay::{RecordEngine, ReplayEngine};
36-
3736
type Options = Vec<(String, OptionValue)>;
3837

3938
/// Naive statement splitter used in the MockAdapter

crates/dbt-fusion-adapter/src/typed_adapter.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use dbt_schemas::schemas::project::ModelConfig;
2727
use dbt_schemas::schemas::relations::base::{BaseRelation, ComponentName};
2828
use dbt_schemas::schemas::relations::relation_configs::BaseRelationConfig;
2929
use dbt_schemas::schemas::{CommonAttributes, InternalDbtNodeAttributes};
30+
use dbt_xdbc::salesforce::DATA_TRANSFORM_RUN_TIMEOUT;
3031
use dbt_xdbc::{Connection, QueryCtx};
3132
use minijinja::{State, Value, args};
3233

@@ -106,12 +107,34 @@ pub trait TypedBaseAdapter: fmt::Debug + Send + Sync + AdapterTyping {
106107
} else {
107108
engine.split_statements(&sql, dialect)
108109
};
109-
let options = options
110+
111+
// Configure options
112+
let mut options = options
110113
.unwrap_or_default()
111114
.into_iter()
112115
.map(|(key, value)| (key, OptionValue::String(value)))
113116
.collect::<Vec<_>>();
114117

118+
// Configure warehouse specific options
119+
#[allow(clippy::single_match)]
120+
match self.adapter_type() {
121+
AdapterType::Salesforce => {
122+
if let Some(timeout) = engine.config("data_transform_run_timeout") {
123+
let timeout = timeout.parse::<i64>().map_err(|e| {
124+
AdapterError::new(
125+
AdapterErrorKind::Configuration,
126+
format!("data_transform_run_timeout must be an integer string: {e}",),
127+
)
128+
})?;
129+
options.push((
130+
DATA_TRANSFORM_RUN_TIMEOUT.to_string(),
131+
OptionValue::Int(timeout),
132+
));
133+
}
134+
}
135+
_ => {}
136+
}
137+
115138
let mut last_batch = None;
116139
for statement in statements {
117140
last_batch = Some(execute_query_with_retry(
@@ -627,13 +650,19 @@ pub trait TypedBaseAdapter: fmt::Debug + Send + Sync + AdapterTyping {
627650
}
628651

629652
/// load dataframe only used by bigquery adapter
653+
#[allow(clippy::too_many_arguments)]
630654
fn load_dataframe(
631655
&self,
632656
_query_ctx: &QueryCtx,
633657
_conn: &'_ mut dyn Connection,
634-
_args: &[Value],
658+
_database: &str,
659+
_schema: &str,
660+
_table_name: &str,
661+
_agate_table: Arc<AgateTable>,
662+
_file_path: &str,
663+
_field_delimiter: &str,
635664
) -> AdapterResult<Value> {
636-
unimplemented!("only available with BigQuery adapter")
665+
unimplemented!("only available with BigQuery or Salesforce adapter")
637666
}
638667

639668
/// alter_table_add_columns
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{% macro salesforce__persist_docs(relation, model, for_relation, for_columns) -%}
2+
-- noop
3+
{% endmacro %}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
2+
{% macro salesforce__create_csv_table(model, agate_table) %}
3+
-- noop
4+
{% endmacro %}
5+
6+
{% macro salesforce__reset_csv_table(model, full_refresh, old_relation, agate_table) %}
7+
{{ adapter.drop_relation(old_relation) }}
8+
{% endmacro %}
9+
10+
{% macro salesforce__load_csv_rows(model, agate_table) %}
11+
{%- set delimiter = model['config'].get('delimiter', ',') -%}
12+
{# only comma delimiter is supported for salesforce #}
13+
{{ adapter.load_dataframe(
14+
model['database'],
15+
model['schema'],
16+
model['alias'],
17+
model['project_root'] | string ~ model['original_file_path'] | string,
18+
agate_table,
19+
',',
20+
) }}
21+
{% endmacro %}

crates/dbt-schemas/src/schemas/profiles.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ impl DbConfig {
213213
],
214214
DbConfig::Databricks(_) => &["host", "http_path", "schema"],
215215
// TODO: Salesforce connection keys
216-
DbConfig::Salesforce(_) => &[],
216+
DbConfig::Salesforce(_) => &["login_url", "database", "data_transform_run_timeout"],
217217
// TODO: Trino and Datafusion connection keys
218218
DbConfig::Trino(_) => &[],
219219
DbConfig::Datafusion(_) => &[],
@@ -736,12 +736,18 @@ pub struct SalesforceDbConfig {
736736
pub private_key_path: Option<PathBuf>,
737737
pub login_url: Option<String>,
738738
pub username: Option<String>,
739+
#[serde(default = "default_data_transform_run_timeout")]
740+
pub data_transform_run_timeout: Option<i64>,
739741
}
740742

741743
fn default_salesforce_database() -> Option<String> {
742744
Some("default".to_string())
743745
}
744746

747+
fn default_data_transform_run_timeout() -> Option<i64> {
748+
Some(180000) // 3 mins
749+
}
750+
745751
#[derive(Serialize, JsonSchema)]
746752
#[serde(untagged)]
747753
#[serde(rename_all = "snake_case")]

crates/dbt-schemas/src/schemas/project/configs/data_test_config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::schemas::serde::{StringOrArrayOfStrings, bool_or_string_bool, u64_or_
2525
pub struct ProjectDataTestConfig {
2626
#[serde(rename = "+alias")]
2727
pub alias: Option<String>,
28-
#[serde(rename = "+database", alias = "+project")]
28+
#[serde(rename = "+database", alias = "+project", alias = "+data_space")]
2929
pub database: Option<String>,
3030
#[serde(default, rename = "+enabled", deserialize_with = "bool_or_string_bool")]
3131
pub enabled: Option<bool>,

0 commit comments

Comments
 (0)