Skip to content

Commit fc43b6e

Browse files
committed
feat(odbc): implement unbuffered mode for ODBC connections
This commit introduces an unbuffered mode in OdbcBufferSettings, allowing for more memory-efficient row processing by fetching rows one at a time. The implementation includes updates to the OdbcBridge and related functions to handle both buffered and unbuffered modes effectively. Additionally, tests are added to verify the correct functionality of the unbuffered mode.
1 parent 6fa10ae commit fc43b6e

File tree

4 files changed

+310
-50
lines changed

4 files changed

+310
-50
lines changed

sqlx-core/src/odbc/connection/odbc_bridge.rs

Lines changed: 208 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -133,21 +133,36 @@ fn handle_cursor<C>(mut cursor: C, tx: &ExecuteSender, buffer_settings: OdbcBuff
133133
where
134134
C: Cursor + ResultSetMetadata,
135135
{
136-
let bindings = match build_bindings(&mut cursor, &buffer_settings) {
137-
Ok(b) => b,
138-
Err(e) => {
139-
send_error(tx, e);
140-
return;
141-
}
142-
};
136+
match buffer_settings {
137+
OdbcBufferSettings::Buffered { .. } => {
138+
let bindings = match build_bindings(&mut cursor, &buffer_settings) {
139+
Ok(b) => b,
140+
Err(e) => {
141+
send_error(tx, e);
142+
return;
143+
}
144+
};
143145

144-
match stream_rows(cursor, bindings, tx, buffer_settings) {
145-
Ok(true) => {
146-
let _ = send_done(tx, 0);
146+
match stream_rows(cursor, bindings, tx, buffer_settings) {
147+
Ok(true) => {
148+
let _ = send_done(tx, 0);
149+
}
150+
Ok(false) => {}
151+
Err(e) => {
152+
send_error(tx, e);
153+
}
154+
}
147155
}
148-
Ok(false) => {}
149-
Err(e) => {
150-
send_error(tx, e);
156+
OdbcBufferSettings::Unbuffered => {
157+
match stream_rows(cursor, Vec::new(), tx, buffer_settings) {
158+
Ok(true) => {
159+
let _ = send_done(tx, 0);
160+
}
161+
Ok(false) => {}
162+
Err(e) => {
163+
send_error(tx, e);
164+
}
165+
}
151166
}
152167
}
153168
}
@@ -193,15 +208,22 @@ where
193208
let data_type = type_info.data_type();
194209

195210
// Helper function to determine buffer length with fallback
211+
let max_column_size = match buffer_settings {
212+
OdbcBufferSettings::Buffered {
213+
max_column_size, ..
214+
} => *max_column_size,
215+
OdbcBufferSettings::Unbuffered => 4096, // Default value for unbuffered mode
216+
};
217+
196218
let buffer_length = |length: Option<std::num::NonZeroUsize>| {
197219
if let Some(length) = length {
198220
if length.get() < 255 {
199221
length.get()
200222
} else {
201-
buffer_settings.max_column_size
223+
max_column_size
202224
}
203225
} else {
204-
buffer_settings.max_column_size
226+
max_column_size
205227
}
206228
};
207229

@@ -240,10 +262,10 @@ where
240262
},
241263
// Fallback cases
242264
DataType::Unknown => BufferDesc::Text {
243-
max_str_len: buffer_settings.max_column_size,
265+
max_str_len: max_column_size,
244266
},
245267
DataType::Decimal { .. } | DataType::Numeric { .. } => BufferDesc::Text {
246-
max_str_len: buffer_settings.max_column_size,
268+
max_str_len: max_column_size,
247269
},
248270
};
249271

@@ -256,11 +278,31 @@ fn stream_rows<C>(
256278
tx: &ExecuteSender,
257279
buffer_settings: OdbcBufferSettings,
258280
) -> Result<bool, Error>
281+
where
282+
C: Cursor + ResultSetMetadata,
283+
{
284+
match buffer_settings {
285+
OdbcBufferSettings::Buffered { batch_size, .. } => {
286+
stream_rows_buffered(cursor, bindings, tx, batch_size)
287+
}
288+
OdbcBufferSettings::Unbuffered => {
289+
// For unbuffered mode, we don't need bindings since we build columns dynamically
290+
stream_rows_unbuffered(cursor, tx)
291+
}
292+
}
293+
}
294+
295+
fn stream_rows_buffered<C>(
296+
cursor: C,
297+
bindings: Vec<ColumnBinding>,
298+
tx: &ExecuteSender,
299+
batch_size: usize,
300+
) -> Result<bool, Error>
259301
where
260302
C: Cursor + ResultSetMetadata,
261303
{
262304
let buffer_descriptions: Vec<_> = bindings.iter().map(|b| b.buffer_desc).collect();
263-
let buffer = ColumnarAnyBuffer::from_descs(buffer_settings.batch_size, buffer_descriptions);
305+
let buffer = ColumnarAnyBuffer::from_descs(batch_size, buffer_descriptions);
264306
let mut row_set_cursor = cursor.bind_buffer(buffer)?;
265307

266308
let mut receiver_open = true;
@@ -303,6 +345,52 @@ where
303345
Ok(receiver_open)
304346
}
305347

348+
fn stream_rows_unbuffered<C>(mut cursor: C, tx: &ExecuteSender) -> Result<bool, Error>
349+
where
350+
C: Cursor + ResultSetMetadata,
351+
{
352+
let mut receiver_open = true;
353+
354+
// For unbuffered mode, we need to build column information for each row
355+
let column_count = cursor.num_result_cols().unwrap_or(0);
356+
let mut columns = Vec::with_capacity(column_count as usize);
357+
358+
for index in 1..=column_count {
359+
columns.push(create_column(&mut cursor, index as u16));
360+
}
361+
362+
let col_arc: Arc<[OdbcColumn]> = Arc::from(columns);
363+
364+
while let Some(mut cursor_row) = cursor.next_row()? {
365+
// Create a single-row batch for this row
366+
let column_data: Vec<_> = (0..column_count)
367+
.map(|col_index| {
368+
let column = &col_arc[col_index as usize];
369+
// Convert CursorRow data to ColumnData format
370+
// Column indices are 1-based in odbc-api
371+
create_column_data_from_cursor_row(&mut cursor_row, (col_index + 1) as u16, column)
372+
})
373+
.collect();
374+
375+
let odbc_batch = Arc::new(OdbcBatch {
376+
columns: Arc::clone(&col_arc),
377+
column_data,
378+
});
379+
380+
let row = OdbcRow {
381+
row_index: 0, // Single row in this batch
382+
batch: Arc::clone(&odbc_batch),
383+
};
384+
385+
if send_row(tx, row).is_err() {
386+
receiver_open = false;
387+
break;
388+
}
389+
}
390+
391+
Ok(receiver_open)
392+
}
393+
306394
fn create_column_data(slice: AnySlice<'_>, column: &OdbcColumn) -> Arc<ColumnData> {
307395
use crate::odbc::value::convert_any_slice_to_value_vec;
308396

@@ -313,3 +401,105 @@ fn create_column_data(slice: AnySlice<'_>, column: &OdbcColumn) -> Arc<ColumnDat
313401
nulls,
314402
})
315403
}
404+
405+
fn create_column_data_from_cursor_row(
406+
cursor_row: &mut odbc_api::CursorRow<'_>,
407+
column_index: u16,
408+
column: &OdbcColumn,
409+
) -> Arc<ColumnData> {
410+
use crate::odbc::value::OdbcValueVec;
411+
use odbc_api::DataType;
412+
413+
let data_type = column.type_info.data_type();
414+
415+
// Create single-element vectors for this row
416+
let (values, nulls) = match data_type {
417+
DataType::TinyInt => {
418+
let mut value = 0i8;
419+
let is_null = cursor_row.get_data(column_index, &mut value).is_err();
420+
(OdbcValueVec::TinyInt(vec![value]), vec![is_null])
421+
}
422+
DataType::SmallInt => {
423+
let mut value = 0i16;
424+
let is_null = cursor_row.get_data(column_index, &mut value).is_err();
425+
(OdbcValueVec::SmallInt(vec![value]), vec![is_null])
426+
}
427+
DataType::Integer => {
428+
let mut value = 0i32;
429+
let is_null = cursor_row.get_data(column_index, &mut value).is_err();
430+
(OdbcValueVec::Integer(vec![value]), vec![is_null])
431+
}
432+
DataType::BigInt => {
433+
let mut value = 0i64;
434+
let is_null = cursor_row.get_data(column_index, &mut value).is_err();
435+
(OdbcValueVec::BigInt(vec![value]), vec![is_null])
436+
}
437+
DataType::Real => {
438+
let mut value = 0.0f32;
439+
let is_null = cursor_row.get_data(column_index, &mut value).is_err();
440+
(OdbcValueVec::Real(vec![value]), vec![is_null])
441+
}
442+
DataType::Float { .. } | DataType::Double => {
443+
let mut value = 0.0f64;
444+
let is_null = cursor_row.get_data(column_index, &mut value).is_err();
445+
(OdbcValueVec::Double(vec![value]), vec![is_null])
446+
}
447+
DataType::Bit => {
448+
let mut value = odbc_api::Bit(0);
449+
let is_null = cursor_row.get_data(column_index, &mut value).is_err();
450+
(OdbcValueVec::Bit(vec![value]), vec![is_null])
451+
}
452+
DataType::Date => {
453+
let mut value = odbc_api::sys::Date::default();
454+
let is_null = cursor_row.get_data(column_index, &mut value).is_err();
455+
(OdbcValueVec::Date(vec![value]), vec![is_null])
456+
}
457+
DataType::Time { .. } => {
458+
let mut value = odbc_api::sys::Time::default();
459+
let is_null = cursor_row.get_data(column_index, &mut value).is_err();
460+
(OdbcValueVec::Time(vec![value]), vec![is_null])
461+
}
462+
DataType::Timestamp { .. } => {
463+
let mut value = odbc_api::sys::Timestamp::default();
464+
let is_null = cursor_row.get_data(column_index, &mut value).is_err();
465+
(OdbcValueVec::Timestamp(vec![value]), vec![is_null])
466+
}
467+
DataType::Char { .. }
468+
| DataType::Varchar { .. }
469+
| DataType::LongVarchar { .. }
470+
| DataType::WChar { .. }
471+
| DataType::WVarchar { .. }
472+
| DataType::WLongVarchar { .. }
473+
| DataType::Binary { .. }
474+
| DataType::Varbinary { .. }
475+
| DataType::LongVarbinary { .. }
476+
| DataType::Other { .. }
477+
| DataType::Unknown
478+
| DataType::Decimal { .. }
479+
| DataType::Numeric { .. } => {
480+
// For text and binary data, use get_text
481+
let mut buf = Vec::new();
482+
match cursor_row.get_text(column_index, &mut buf) {
483+
Ok(true) => {
484+
// Successfully got text, convert to string
485+
let text = String::from_utf8_lossy(&buf).to_string();
486+
(OdbcValueVec::Text(vec![Some(text)]), vec![false])
487+
}
488+
Ok(false) => {
489+
// NULL value
490+
(OdbcValueVec::Text(vec![None]), vec![true])
491+
}
492+
Err(_) => {
493+
// Error, treat as NULL
494+
(OdbcValueVec::Text(vec![None]), vec![true])
495+
}
496+
}
497+
}
498+
};
499+
500+
Arc::new(ColumnData {
501+
values,
502+
type_info: column.type_info.clone(),
503+
nulls,
504+
})
505+
}

sqlx-core/src/odbc/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
//! let mut opts = OdbcConnectOptions::from_str("DSN=MyDataSource")?;
3333
//!
3434
//! // Configure for high-throughput scenarios
35-
//! opts.buffer_settings(OdbcBufferSettings {
35+
//! opts.buffer_settings(OdbcBufferSettings::Buffered {
3636
//! batch_size: 256, // Fetch 256 rows at once
3737
//! max_column_size: 2048, // Limit text columns to 2048 chars
3838
//! });

0 commit comments

Comments
 (0)