Skip to content

Commit 65829db

Browse files
committed
feat(odbc): introduce OdbcBufferSettings for performance tuning
This commit adds OdbcBufferSettings to configure buffer settings for ODBC connections, allowing users to optimize memory usage and performance during data fetching. The new settings include batch size and maximum column size, with corresponding methods in OdbcConnectOptions for easy configuration.
1 parent 9ba2926 commit 65829db

File tree

4 files changed

+168
-26
lines changed

4 files changed

+168
-26
lines changed

sqlx-core/src/odbc/connection/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::connection::Connection;
22
use crate::error::Error;
33
use crate::odbc::{
4-
Odbc, OdbcArguments, OdbcColumn, OdbcConnectOptions, OdbcQueryResult, OdbcRow, OdbcTypeInfo,
4+
Odbc, OdbcArguments, OdbcBufferSettings, OdbcColumn, OdbcConnectOptions, OdbcQueryResult,
5+
OdbcRow, OdbcTypeInfo,
56
};
67
use crate::transaction::Transaction;
78
use either::Either;
@@ -67,12 +68,14 @@ pub(super) fn decode_column_name<T: ColumnNameDecode>(name: T, index: u16) -> St
6768
pub struct OdbcConnection {
6869
pub(crate) conn: SharedConnection<'static>,
6970
pub(crate) stmt_cache: HashMap<Arc<str>, SharedPreparedStatement>,
71+
pub(crate) buffer_settings: OdbcBufferSettings,
7072
}
7173

7274
impl std::fmt::Debug for OdbcConnection {
7375
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
7476
f.debug_struct("OdbcConnection")
7577
.field("conn", &self.conn)
78+
.field("buffer_settings", &self.buffer_settings)
7679
.finish()
7780
}
7881
}
@@ -108,6 +111,7 @@ impl OdbcConnection {
108111
Ok(Self {
109112
conn: shared_conn,
110113
stmt_cache: HashMap::new(),
114+
buffer_settings: options.buffer_settings,
111115
})
112116
}
113117

@@ -162,9 +166,10 @@ impl OdbcConnection {
162166
};
163167

164168
let conn = Arc::clone(&self.conn);
169+
let buffer_settings = self.buffer_settings;
165170
sqlx_rt::spawn(sqlx_rt::spawn_blocking(move || {
166171
let mut conn = conn.lock().expect("failed to lock connection");
167-
if let Err(e) = execute_sql(&mut conn, maybe_prepared, args, &tx) {
172+
if let Err(e) = execute_sql(&mut conn, maybe_prepared, args, &tx, buffer_settings) {
168173
let _ = tx.send(Err(e));
169174
}
170175
}));

sqlx-core/src/odbc/connection/odbc_bridge.rs

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,30 @@
11
use super::decode_column_name;
22
use crate::error::Error;
33
use crate::odbc::{
4-
connection::MaybePrepared, ColumnData, OdbcArgumentValue, OdbcArguments, OdbcBatch, OdbcColumn,
5-
OdbcQueryResult, OdbcRow, OdbcTypeInfo,
4+
connection::MaybePrepared, ColumnData, OdbcArgumentValue, OdbcArguments, OdbcBatch,
5+
OdbcBufferSettings, OdbcColumn, OdbcQueryResult, OdbcRow, OdbcTypeInfo,
66
};
77
use either::Either;
88
use flume::{SendError, Sender};
99
use odbc_api::buffers::{AnySlice, BufferDesc, ColumnarAnyBuffer};
1010
use odbc_api::handles::{AsStatementRef, Nullability, Statement};
1111
use odbc_api::DataType;
1212
use odbc_api::{Cursor, IntoParameter, ResultSetMetadata};
13-
use std::cmp::min;
1413
use std::sync::Arc;
1514

1615
// Bulk fetch implementation using columnar buffers instead of row-by-row fetching
1716
// This provides significant performance improvements by fetching rows in batches
1817
// and avoiding the slow `next_row()` method from odbc-api
19-
const BATCH_SIZE: usize = 128;
20-
const DEFAULT_TEXT_LEN: usize = 512;
21-
const DEFAULT_BINARY_LEN: usize = 1024;
22-
const DEFAULT_NUMERIC_TEXT_LEN: usize = 128;
23-
const MIN_TEXT_LEN: usize = 1024;
24-
const MAX_TEXT_LEN: usize = 4096;
2518

2619
struct ColumnBinding {
2720
column: OdbcColumn,
2821
buffer_desc: BufferDesc,
2922
}
3023

31-
fn build_bindings<C>(cursor: &mut C) -> Result<Vec<ColumnBinding>, Error>
24+
fn build_bindings<C>(
25+
cursor: &mut C,
26+
buffer_settings: &OdbcBufferSettings,
27+
) -> Result<Vec<ColumnBinding>, Error>
3228
where
3329
C: ResultSetMetadata,
3430
{
@@ -40,7 +36,13 @@ where
4036
.col_nullability(index as u16)
4137
.unwrap_or(Nullability::Unknown)
4238
.could_be_nullable();
43-
let buffer_desc = map_buffer_desc(cursor, index as u16, &column.type_info, nullable)?;
39+
let buffer_desc = map_buffer_desc(
40+
cursor,
41+
index as u16,
42+
&column.type_info,
43+
nullable,
44+
buffer_settings,
45+
)?;
4446
bindings.push(ColumnBinding {
4547
column,
4648
buffer_desc,
@@ -67,21 +69,22 @@ pub fn execute_sql(
6769
maybe_prepared: MaybePrepared,
6870
args: Option<OdbcArguments>,
6971
tx: &ExecuteSender,
72+
buffer_settings: OdbcBufferSettings,
7073
) -> Result<(), Error> {
7174
let params = prepare_parameters(args);
7275

7376
let affected = match maybe_prepared {
7477
MaybePrepared::Prepared(prepared) => {
7578
let mut prepared = prepared.lock().expect("prepared statement lock");
7679
if let Some(cursor) = prepared.execute(&params[..])? {
77-
handle_cursor(cursor, tx);
80+
handle_cursor(cursor, tx, buffer_settings);
7881
}
7982
extract_rows_affected(&mut *prepared)
8083
}
8184
MaybePrepared::NotPrepared(sql) => {
8285
let mut preallocated = conn.preallocate().map_err(Error::from)?;
8386
if let Some(cursor) = preallocated.execute(&sql, &params[..])? {
84-
handle_cursor(cursor, tx);
87+
handle_cursor(cursor, tx, buffer_settings);
8588
}
8689
extract_rows_affected(&mut preallocated)
8790
}
@@ -127,19 +130,19 @@ fn to_param(arg: OdbcArgumentValue) -> Box<dyn odbc_api::parameter::InputParamet
127130
}
128131
}
129132

130-
fn handle_cursor<C>(mut cursor: C, tx: &ExecuteSender)
133+
fn handle_cursor<C>(mut cursor: C, tx: &ExecuteSender, buffer_settings: OdbcBufferSettings)
131134
where
132135
C: Cursor + ResultSetMetadata,
133136
{
134-
let bindings = match build_bindings(&mut cursor) {
137+
let bindings = match build_bindings(&mut cursor, &buffer_settings) {
135138
Ok(b) => b,
136139
Err(e) => {
137140
send_error(tx, e);
138141
return;
139142
}
140143
};
141144

142-
match stream_rows(cursor, bindings, tx) {
145+
match stream_rows(cursor, bindings, tx, buffer_settings) {
143146
Ok(true) => {
144147
let _ = send_done(tx, 0);
145148
}
@@ -181,6 +184,7 @@ fn map_buffer_desc<C>(
181184
_column_index: u16,
182185
type_info: &OdbcTypeInfo,
183186
nullable: bool,
187+
buffer_settings: &OdbcBufferSettings,
184188
) -> Result<BufferDesc, Error>
185189
where
186190
C: ResultSetMetadata,
@@ -200,9 +204,14 @@ where
200204
| DataType::Varbinary { length }
201205
| DataType::LongVarbinary { length } => BufferDesc::Binary {
202206
length: if let Some(length) = length {
203-
length.get().clamp(MIN_TEXT_LEN, MAX_TEXT_LEN)
207+
if length.get() < 255 {
208+
// Some drivers report 255 for max length
209+
length.get()
210+
} else {
211+
buffer_settings.max_column_size
212+
}
204213
} else {
205-
MAX_TEXT_LEN
214+
buffer_settings.max_column_size
206215
},
207216
},
208217
DataType::Char { length }
@@ -216,16 +225,20 @@ where
216225
..
217226
} => BufferDesc::Text {
218227
max_str_len: if let Some(length) = length {
219-
length.get().clamp(MIN_TEXT_LEN, MAX_TEXT_LEN)
228+
if length.get() < 255 {
229+
length.get()
230+
} else {
231+
buffer_settings.max_column_size
232+
}
220233
} else {
221-
MAX_TEXT_LEN
234+
buffer_settings.max_column_size
222235
},
223236
},
224237
DataType::Unknown => BufferDesc::Text {
225-
max_str_len: MAX_TEXT_LEN,
238+
max_str_len: buffer_settings.max_column_size,
226239
},
227240
DataType::Decimal { .. } | DataType::Numeric { .. } => BufferDesc::Text {
228-
max_str_len: min(DEFAULT_NUMERIC_TEXT_LEN, MAX_TEXT_LEN),
241+
max_str_len: buffer_settings.max_column_size,
229242
},
230243
};
231244

@@ -236,12 +249,13 @@ fn stream_rows<C>(
236249
cursor: C,
237250
bindings: Vec<ColumnBinding>,
238251
tx: &ExecuteSender,
252+
buffer_settings: OdbcBufferSettings,
239253
) -> Result<bool, Error>
240254
where
241255
C: Cursor + ResultSetMetadata,
242256
{
243257
let buffer_descriptions: Vec<_> = bindings.iter().map(|b| b.buffer_desc).collect();
244-
let buffer = ColumnarAnyBuffer::from_descs(BATCH_SIZE, buffer_descriptions);
258+
let buffer = ColumnarAnyBuffer::from_descs(buffer_settings.batch_size, buffer_descriptions);
245259
let mut row_set_cursor = cursor.bind_buffer(buffer)?;
246260

247261
let mut receiver_open = true;

sqlx-core/src/odbc/mod.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,27 @@
2020
//! ```text
2121
//! odbc:DSN=MyDataSource
2222
//! ```
23+
//!
24+
//! ## Buffer Configuration
25+
//!
26+
//! You can configure buffer settings for performance tuning:
27+
//!
28+
//! ```rust
29+
//! use sqlx::odbc::{OdbcConnectOptions, OdbcBufferSettings};
30+
//!
31+
//! let mut opts = OdbcConnectOptions::from_str("DSN=MyDataSource")?;
32+
//!
33+
//! // Configure for high-throughput scenarios
34+
//! opts.buffer_settings(OdbcBufferSettings {
35+
//! batch_size: 256, // Fetch 256 rows at once
36+
//! max_column_size: 2048, // Limit text columns to 2048 chars
37+
//! });
38+
//!
39+
//! // Or configure individual settings
40+
//! opts.batch_size(512)
41+
//! .max_column_size(1024);
42+
//! # Ok::<(), sqlx::Error>(())
43+
//! ```
2344
2445
use crate::executor::Executor;
2546

@@ -41,7 +62,7 @@ pub use arguments::{OdbcArgumentValue, OdbcArguments};
4162
pub use column::OdbcColumn;
4263
pub use connection::OdbcConnection;
4364
pub use database::Odbc;
44-
pub use options::OdbcConnectOptions;
65+
pub use options::{OdbcBufferSettings, OdbcConnectOptions};
4566
pub use query_result::OdbcQueryResult;
4667
pub use row::{OdbcBatch, OdbcRow};
4768
pub use statement::{OdbcStatement, OdbcStatementMetadata};

sqlx-core/src/odbc/options/mod.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,123 @@ use std::time::Duration;
88

99
use crate::odbc::OdbcConnection;
1010

11+
/// Configuration for ODBC buffer settings that control memory usage and performance characteristics.
12+
///
13+
/// These settings affect how SQLx fetches and processes data from ODBC data sources. Careful tuning
14+
/// of these parameters can significantly impact memory usage and query performance.
15+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16+
pub struct OdbcBufferSettings {
17+
/// The number of rows to fetch in each batch during bulk operations.
18+
///
19+
/// **Performance Impact:**
20+
/// - Higher values reduce the number of round-trips to the database but increase memory usage
21+
/// - Lower values reduce memory usage but may increase latency due to more frequent fetches
22+
/// - Typical range: 32-512 rows
23+
///
24+
/// **Memory Impact:**
25+
/// - Each batch allocates buffers for `batch_size * number_of_columns` cells
26+
/// - For wide result sets, this can consume significant memory
27+
///
28+
/// **Default:** 128 rows
29+
pub batch_size: usize,
30+
31+
/// The maximum size (in characters) for text and binary columns when the database doesn't specify a length.
32+
///
33+
/// **Performance Impact:**
34+
/// - Higher values ensure large text fields are fully captured but increase memory allocation
35+
/// - Lower values may truncate data but reduce memory pressure
36+
/// - Affects VARCHAR, NVARCHAR, TEXT, and BLOB column types
37+
///
38+
/// **Memory Impact:**
39+
/// - Directly controls buffer size for variable-length columns
40+
/// - Setting too high can waste memory; setting too low can cause data truncation
41+
/// - Consider your data characteristics when tuning this value
42+
pub max_column_size: usize,
43+
}
44+
45+
impl Default for OdbcBufferSettings {
46+
fn default() -> Self {
47+
Self {
48+
batch_size: 128,
49+
max_column_size: 4096,
50+
}
51+
}
52+
}
53+
1154
#[derive(Clone)]
1255
pub struct OdbcConnectOptions {
1356
pub(crate) conn_str: String,
1457
pub(crate) log_settings: LogSettings,
58+
pub(crate) buffer_settings: OdbcBufferSettings,
1559
}
1660

1761
impl OdbcConnectOptions {
1862
pub fn connection_string(&self) -> &str {
1963
&self.conn_str
2064
}
65+
66+
/// Sets the buffer configuration for this connection.
67+
///
68+
/// The buffer settings control memory usage and performance characteristics
69+
/// when fetching data from ODBC data sources.
70+
///
71+
/// # Example
72+
/// ```rust
73+
/// use sqlx::odbc::{OdbcConnectOptions, OdbcBufferSettings};
74+
///
75+
/// let mut opts = OdbcConnectOptions::from_str("DSN=MyDataSource")?;
76+
/// opts.buffer_settings(OdbcBufferSettings {
77+
/// batch_size: 256,
78+
/// max_column_size: 2048,
79+
/// });
80+
/// # Ok::<(), sqlx::Error>(())
81+
/// ```
82+
pub fn buffer_settings(&mut self, settings: OdbcBufferSettings) -> &mut Self {
83+
self.buffer_settings = settings;
84+
self
85+
}
86+
87+
/// Sets the batch size for bulk fetch operations.
88+
///
89+
/// This controls how many rows are fetched at once during query execution.
90+
/// Higher values can improve performance for large result sets but use more memory.
91+
///
92+
/// # Panics
93+
/// Panics if `batch_size` is 0.
94+
pub fn batch_size(&mut self, batch_size: usize) -> &mut Self {
95+
assert!(batch_size > 0, "batch_size must be greater than 0");
96+
self.buffer_settings.batch_size = batch_size;
97+
self
98+
}
99+
100+
/// Sets the maximum column size for text and binary data.
101+
///
102+
/// This controls the buffer size allocated for columns when the database
103+
/// doesn't specify a maximum length. Larger values ensure complete data
104+
/// capture but increase memory usage.
105+
///
106+
/// # Panics
107+
/// Panics if `max_column_size` is less than 1024 or greater than 4096.
108+
pub fn max_column_size(&mut self, max_column_size: usize) -> &mut Self {
109+
assert!(
110+
(1024..=4096).contains(&max_column_size),
111+
"max_column_size must be between 1024 and 4096"
112+
);
113+
self.buffer_settings.max_column_size = max_column_size;
114+
self
115+
}
116+
117+
/// Returns the current buffer settings for this connection.
118+
pub fn buffer_settings_ref(&self) -> &OdbcBufferSettings {
119+
&self.buffer_settings
120+
}
21121
}
22122

23123
impl Debug for OdbcConnectOptions {
24124
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
25125
f.debug_struct("OdbcConnectOptions")
26126
.field("conn_str", &"<redacted>")
127+
.field("buffer_settings", &self.buffer_settings)
27128
.finish()
28129
}
29130
}
@@ -51,6 +152,7 @@ impl FromStr for OdbcConnectOptions {
51152
Ok(Self {
52153
conn_str,
53154
log_settings: LogSettings::default(),
155+
buffer_settings: OdbcBufferSettings::default(),
54156
})
55157
}
56158
}

0 commit comments

Comments
 (0)