Skip to content

Commit 238c76d

Browse files
committed
refactor(odbc): enhance column binding and buffer settings handling
This commit refines the column binding logic in the OdbcBridge by removing unnecessary parameters and improving the handling of maximum column sizes. It also updates the OdbcConnectOptions to assert that the max_column_size is greater than zero, ensuring valid configurations. Additionally, tests are modified to accommodate the new function signatures, enhancing the robustness of data handling in ODBC connections.
1 parent fc62d61 commit 238c76d

File tree

3 files changed

+63
-74
lines changed

3 files changed

+63
-74
lines changed

sqlx-core/src/odbc/connection/odbc_bridge.rs

Lines changed: 38 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,16 @@ use std::sync::Arc;
1616
// Bulk fetch implementation using columnar buffers instead of row-by-row fetching
1717
// This provides significant performance improvements by fetching rows in batches
1818
// and avoiding the slow `next_row()` method from odbc-api
19-
19+
#[derive(Debug)]
2020
struct ColumnBinding {
2121
column: OdbcColumn,
2222
buffer_desc: BufferDesc,
2323
}
2424

25-
fn build_bindings<C>(
25+
fn build_bindings<C: Cursor>(
2626
cursor: &mut C,
27-
buffer_settings: &OdbcBufferSettings,
28-
) -> Result<Vec<ColumnBinding>, Error>
29-
where
30-
C: ResultSetMetadata,
31-
{
27+
max_column_size: usize,
28+
) -> Result<Vec<ColumnBinding>, Error> {
3229
let column_count = cursor.num_result_cols().unwrap_or(0);
3330
let mut bindings = Vec::with_capacity(column_count as usize);
3431
for index in 1..=column_count {
@@ -37,18 +34,18 @@ where
3734
.col_nullability(index as u16)
3835
.unwrap_or(Nullability::Unknown)
3936
.could_be_nullable();
40-
let buffer_desc = map_buffer_desc(
41-
cursor,
42-
index as u16,
43-
&column.type_info,
44-
nullable,
45-
buffer_settings,
46-
)?;
37+
let buffer_desc = map_buffer_desc(&column.type_info, nullable, max_column_size)?;
4738
bindings.push(ColumnBinding {
4839
column,
4940
buffer_desc,
5041
});
5142
}
43+
dbg!(&bindings);
44+
log::trace!(
45+
"built {} ODBC batch column bindings: {:?}",
46+
bindings.len(),
47+
bindings
48+
);
5249
Ok(bindings)
5350
}
5451

@@ -131,14 +128,15 @@ fn to_param(arg: OdbcArgumentValue) -> Box<dyn odbc_api::parameter::InputParamet
131128
}
132129
}
133130

134-
fn handle_cursor<C>(mut cursor: C, tx: &ExecuteSender, buffer_settings: OdbcBufferSettings)
135-
where
136-
C: Cursor + ResultSetMetadata,
137-
{
131+
fn handle_cursor<C: Cursor + ResultSetMetadata>(
132+
mut cursor: C,
133+
tx: &ExecuteSender,
134+
buffer_settings: OdbcBufferSettings,
135+
) {
138136
match buffer_settings.max_column_size {
139-
Some(_) => {
137+
Some(max_column_size) => {
140138
// Buffered mode - use batch fetching with columnar buffers
141-
let bindings = match build_bindings(&mut cursor, &buffer_settings) {
139+
let bindings = match build_bindings(&mut cursor, max_column_size) {
142140
Ok(b) => b,
143141
Err(e) => {
144142
send_error(tx, e);
@@ -197,34 +195,17 @@ where
197195
}
198196
}
199197

200-
fn map_buffer_desc<C>(
201-
_cursor: &mut C,
202-
_column_index: u16,
198+
fn map_buffer_desc(
203199
type_info: &OdbcTypeInfo,
204200
nullable: bool,
205-
buffer_settings: &OdbcBufferSettings,
206-
) -> Result<BufferDesc, Error>
207-
where
208-
C: ResultSetMetadata,
209-
{
201+
max_column_size: usize,
202+
) -> Result<BufferDesc, Error> {
210203
use odbc_api::DataType;
211204

205+
// Some drivers report datatype lengths that are smaller than the actual data,
206+
// so we cannot use it to build the BufferDesc.
212207
let data_type = type_info.data_type();
213-
214-
// Helper function to determine buffer length with fallback
215-
let max_column_size = buffer_settings.max_column_size.unwrap_or(4096);
216-
217-
let buffer_length = |length: Option<std::num::NonZeroUsize>| {
218-
if let Some(length) = length {
219-
if length.get() < 255 {
220-
length.get()
221-
} else {
222-
max_column_size
223-
}
224-
} else {
225-
max_column_size
226-
}
227-
};
208+
let max_str_len = max_column_size;
228209

229210
let buffer_desc = match data_type {
230211
// Integer types - all map to I64
@@ -241,31 +222,22 @@ where
241222
DataType::Time { .. } => BufferDesc::Time { nullable },
242223
DataType::Timestamp { .. } => BufferDesc::Timestamp { nullable },
243224
// Binary types
244-
DataType::Binary { length }
245-
| DataType::Varbinary { length }
246-
| DataType::LongVarbinary { length } => BufferDesc::Binary {
247-
length: buffer_length(length),
248-
},
225+
DataType::Binary { .. } | DataType::Varbinary { .. } | DataType::LongVarbinary { .. } => {
226+
BufferDesc::Binary {
227+
length: max_column_size,
228+
}
229+
}
249230
// Text types
250-
DataType::Char { length }
251-
| DataType::WChar { length }
252-
| DataType::Varchar { length }
253-
| DataType::WVarchar { length }
254-
| DataType::LongVarchar { length }
255-
| DataType::WLongVarchar { length }
256-
| DataType::Other {
257-
column_size: length,
258-
..
259-
} => BufferDesc::Text {
260-
max_str_len: buffer_length(length),
261-
},
231+
DataType::Char { .. }
232+
| DataType::WChar { .. }
233+
| DataType::Varchar { .. }
234+
| DataType::WVarchar { .. }
235+
| DataType::LongVarchar { .. }
236+
| DataType::WLongVarchar { .. }
237+
| DataType::Other { .. } => BufferDesc::Text { max_str_len },
262238
// Fallback cases
263-
DataType::Unknown => BufferDesc::Text {
264-
max_str_len: max_column_size,
265-
},
266-
DataType::Decimal { .. } | DataType::Numeric { .. } => BufferDesc::Text {
267-
max_str_len: max_column_size,
268-
},
239+
DataType::Unknown => BufferDesc::Text { max_str_len },
240+
DataType::Decimal { .. } | DataType::Numeric { .. } => BufferDesc::Text { max_str_len },
269241
};
270242

271243
Ok(buffer_desc)

sqlx-core/src/odbc/options/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,13 @@ impl OdbcConnectOptions {
140140
///
141141
/// - When set to `Some(value)`: Enables buffered mode with batch fetching
142142
/// - When set to `None`: Enables unbuffered mode with row-by-row processing
143+
///
144+
/// # Panics
145+
/// Panics if `max_column_size` is less than 0.
143146
pub fn max_column_size(&mut self, max_column_size: Option<usize>) -> &mut Self {
147+
if let Some(size) = max_column_size {
148+
assert!(size > 0, "max_column_size must be greater than 0");
149+
}
144150
self.buffer_settings.max_column_size = max_column_size;
145151
self
146152
}

tests/odbc/odbc.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ async fn test_with_buffer_settings<F, T>(
440440
test_fn: F,
441441
) -> anyhow::Result<()>
442442
where
443-
F: Fn(OdbcConnection) -> T,
443+
F: Fn(OdbcConnection, OdbcBufferSettings) -> T,
444444
T: std::future::Future<Output = anyhow::Result<()>>,
445445
{
446446
use sqlx_oldapi::odbc::OdbcConnectOptions;
@@ -451,7 +451,7 @@ where
451451
opts.buffer_settings(buf_settings);
452452

453453
let conn = OdbcConnection::connect_with(&opts).await?;
454-
test_fn(conn).await?;
454+
test_fn(conn, buf_settings).await?;
455455
}
456456
Ok(())
457457
}
@@ -472,7 +472,7 @@ async fn it_handles_binary_data() -> anyhow::Result<()> {
472472
},
473473
];
474474

475-
test_with_buffer_settings(&buffer_settings, |mut conn| async move {
475+
test_with_buffer_settings(&buffer_settings, |mut conn, buf_settings| async move {
476476
let stmt = conn.prepare("SELECT ? AS binary_data").await?;
477477
let row = stmt
478478
.query_as::<(Vec<u8>,)>()
@@ -482,7 +482,10 @@ async fn it_handles_binary_data() -> anyhow::Result<()> {
482482
.expect("query failed")
483483
.expect("row expected");
484484

485-
assert_eq!(row.0, binary_data);
485+
assert_eq!(
486+
row.0, binary_data,
487+
"failed to query {binary_data:?} with buffer settings {buf_settings:?}"
488+
);
486489
Ok(())
487490
})
488491
.await
@@ -504,7 +507,7 @@ async fn it_handles_text_as_utf8_binary() -> anyhow::Result<()> {
504507
},
505508
];
506509

507-
test_with_buffer_settings(&buffer_settings, |mut conn| async move {
510+
test_with_buffer_settings(&buffer_settings, |mut conn, buf_settings| async move {
508511
let stmt = conn.prepare("SELECT ? AS text_data").await?;
509512
let row = stmt
510513
.query_as::<(Vec<u8>,)>()
@@ -514,7 +517,11 @@ async fn it_handles_text_as_utf8_binary() -> anyhow::Result<()> {
514517
.expect("query failed")
515518
.expect("row expected");
516519

517-
assert_eq!(row.0, text.as_bytes());
520+
assert_eq!(
521+
row.0,
522+
text.as_bytes(),
523+
"failed to query {text} with buffer settings {buf_settings:?}"
524+
);
518525
Ok(())
519526
})
520527
.await
@@ -1178,7 +1185,7 @@ async fn it_works_with_buffered_and_unbuffered_mode() -> anyhow::Result<()> {
11781185
},
11791186
];
11801187

1181-
test_with_buffer_settings(&buffer_settings, |mut conn| async move {
1188+
test_with_buffer_settings(&buffer_settings, |mut conn, buf_settings| async move {
11821189
let select = (0..count)
11831190
.map(|i| format!("SELECT {i} AS n, '{}' as aas", "a".repeat(i)))
11841191
.collect::<Vec<_>>()
@@ -1207,7 +1214,11 @@ async fn it_works_with_buffered_and_unbuffered_mode() -> anyhow::Result<()> {
12071214
.to_owned()
12081215
.try_decode::<String>()
12091216
.expect("SELECT aas should be a string");
1210-
assert_eq!(aas, "a".repeat(i));
1217+
assert_eq!(
1218+
aas,
1219+
"a".repeat(i),
1220+
"failed to query {i} with buffer settings {buf_settings:?}"
1221+
);
12111222
}
12121223
Ok(())
12131224
})

0 commit comments

Comments
 (0)