Skip to content

Commit 6de5c12

Browse files
committed
feat(rust)!: add '_ bound to RecordBatchReader results
This avoids inadvertently tying the result lifetime to the lifetimes of any input arguments. We discussed more drastic changes like changing the result to have 'static or even using Box<Reader + 'static>, but this opts for the most conservative change. Closes #2694.
1 parent 11205f1 commit 6de5c12

File tree

5 files changed

+47
-29
lines changed

5 files changed

+47
-29
lines changed

rust/core/src/lib.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ pub trait Connection: Optionable<Option = OptionConnection> {
164164
fn get_info(
165165
&self,
166166
codes: Option<HashSet<options::InfoCode>>,
167-
) -> Result<impl RecordBatchReader + Send>;
167+
) -> Result<impl RecordBatchReader + Send + '_>;
168168

169169
/// Get a hierarchical view of all catalogs, database schemas, tables, and
170170
/// columns.
@@ -272,7 +272,7 @@ pub trait Connection: Optionable<Option = OptionConnection> {
272272
table_name: Option<&str>,
273273
table_type: Option<Vec<&str>>,
274274
column_name: Option<&str>,
275-
) -> Result<impl RecordBatchReader + Send>;
275+
) -> Result<impl RecordBatchReader + Send + '_>;
276276

277277
/// Get the Arrow schema of a table.
278278
///
@@ -297,7 +297,7 @@ pub trait Connection: Optionable<Option = OptionConnection> {
297297
/// Field Name | Field Type
298298
/// ---------------|--------------
299299
/// table_type | utf8 not null
300-
fn get_table_types(&self) -> Result<impl RecordBatchReader + Send>;
300+
fn get_table_types(&self) -> Result<impl RecordBatchReader + Send + '_>;
301301

302302
/// Get the names of statistics specific to this driver.
303303
///
@@ -312,7 +312,7 @@ pub trait Connection: Optionable<Option = OptionConnection> {
312312
///
313313
/// # Since
314314
/// ADBC API revision 1.1.0
315-
fn get_statistic_names(&self) -> Result<impl RecordBatchReader + Send>;
315+
fn get_statistic_names(&self) -> Result<impl RecordBatchReader + Send + '_>;
316316

317317
/// Get statistics about the data distribution of table(s).
318318
///
@@ -378,7 +378,7 @@ pub trait Connection: Optionable<Option = OptionConnection> {
378378
db_schema: Option<&str>,
379379
table_name: Option<&str>,
380380
approximate: bool,
381-
) -> Result<impl RecordBatchReader + Send>;
381+
) -> Result<impl RecordBatchReader + Send + '_>;
382382

383383
/// Commit any pending transactions. Only used if autocommit is disabled.
384384
///
@@ -397,7 +397,10 @@ pub trait Connection: Optionable<Option = OptionConnection> {
397397
/// # Arguments
398398
///
399399
/// - `partition` - The partition descriptor.
400-
fn read_partition(&self, partition: impl AsRef<[u8]>) -> Result<impl RecordBatchReader + Send>;
400+
fn read_partition(
401+
&self,
402+
partition: impl AsRef<[u8]>,
403+
) -> Result<impl RecordBatchReader + Send + '_>;
401404
}
402405

403406
/// A handle to an ADBC statement.
@@ -433,7 +436,7 @@ pub trait Statement: Optionable<Option = OptionStatement> {
433436
// TODO(alexandreyc): is the Send bound absolutely necessary? same question
434437
// for all methods that return an impl RecordBatchReader
435438
// See: https://github.com/apache/arrow-adbc/pull/1725#discussion_r1567748242
436-
fn execute(&mut self) -> Result<impl RecordBatchReader + Send>;
439+
fn execute(&mut self) -> Result<impl RecordBatchReader + Send + '_>;
437440

438441
/// Execute a statement that doesn’t have a result set and get the number
439442
/// of affected rows.

rust/driver/datafusion/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,7 @@ impl Connection for DataFusionConnection {
742742
fn get_info(
743743
&self,
744744
codes: Option<std::collections::HashSet<adbc_core::options::InfoCode>>,
745-
) -> Result<impl RecordBatchReader + Send> {
745+
) -> Result<impl RecordBatchReader + Send + '_> {
746746
let mut get_info_builder = GetInfoBuilder::new();
747747

748748
codes.unwrap().into_iter().for_each(|f| match f {
@@ -766,7 +766,7 @@ impl Connection for DataFusionConnection {
766766
_table_name: Option<&str>,
767767
_table_type: Option<Vec<&str>>,
768768
_column_name: Option<&str>,
769-
) -> Result<impl RecordBatchReader + Send> {
769+
) -> Result<impl RecordBatchReader + Send + '_> {
770770
let batch = GetObjectsBuilder::new().build(&self.runtime, &self.ctx, &depth)?;
771771
let reader = SingleBatchReader::new(batch);
772772
Ok(reader)
@@ -901,7 +901,7 @@ impl Statement for DataFusionStatement {
901901
todo!()
902902
}
903903

904-
fn execute(&mut self) -> Result<impl RecordBatchReader + Send> {
904+
fn execute(&mut self) -> Result<impl RecordBatchReader + Send + '_> {
905905
self.runtime.block_on(async {
906906
let df = if self.sql_query.is_some() {
907907
self.ctx

rust/driver/dummy/src/lib.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ impl Connection for DummyConnection {
418418
_table_name: Option<&str>,
419419
_table_type: Option<Vec<&str>>,
420420
_column_name: Option<&str>,
421-
) -> Result<impl RecordBatchReader> {
421+
) -> Result<impl RecordBatchReader + Send + '_> {
422422
let constraint_column_usage_array_inner = StructArray::from(vec![
423423
(
424424
Arc::new(Field::new("fk_catalog", DataType::Utf8, true)),
@@ -654,7 +654,7 @@ impl Connection for DummyConnection {
654654
_db_schema: Option<&str>,
655655
_table_name: Option<&str>,
656656
_approximate: bool,
657-
) -> Result<impl RecordBatchReader> {
657+
) -> Result<impl RecordBatchReader + Send + '_> {
658658
let statistic_value_int64_array = Int64Array::from(Vec::<i64>::new());
659659
let statistic_value_uint64_array = UInt64Array::from(vec![42]);
660660
let statistic_value_float64_array = Float64Array::from(Vec::<f64>::new());
@@ -762,7 +762,7 @@ impl Connection for DummyConnection {
762762
Ok(reader)
763763
}
764764

765-
fn get_statistic_names(&self) -> Result<impl RecordBatchReader> {
765+
fn get_statistic_names(&self) -> Result<impl RecordBatchReader + Send + '_> {
766766
let name_array = StringArray::from(vec!["sum", "min", "max"]);
767767
let key_array = Int16Array::from(vec![0, 1, 2]);
768768
let batch = RecordBatch::try_new(
@@ -792,14 +792,17 @@ impl Connection for DummyConnection {
792792
}
793793
}
794794

795-
fn get_table_types(&self) -> Result<impl RecordBatchReader> {
795+
fn get_table_types(&self) -> Result<impl RecordBatchReader + Send + '_> {
796796
let array = Arc::new(StringArray::from(vec!["table", "view"]));
797797
let batch = RecordBatch::try_new(schemas::GET_TABLE_TYPES_SCHEMA.clone(), vec![array])?;
798798
let reader = SingleBatchReader::new(batch);
799799
Ok(reader)
800800
}
801801

802-
fn read_partition(&self, _partition: impl AsRef<[u8]>) -> Result<impl RecordBatchReader> {
802+
fn read_partition(
803+
&self,
804+
_partition: impl AsRef<[u8]>,
805+
) -> Result<impl RecordBatchReader + Send + '_> {
803806
let batch = get_table_data();
804807
let reader = SingleBatchReader::new(batch);
805808
Ok(reader)
@@ -852,7 +855,7 @@ impl Statement for DummyStatement {
852855
Ok(())
853856
}
854857

855-
fn execute(&mut self) -> Result<impl RecordBatchReader> {
858+
fn execute(&mut self) -> Result<impl RecordBatchReader + Send + '_> {
856859
maybe_panic("StatementExecuteQuery");
857860
let batch = get_table_data();
858861
let reader = SingleBatchReader::new(batch);

rust/driver/snowflake/src/connection.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,10 @@ impl adbc_core::Connection for Connection {
7474
self.0.cancel()
7575
}
7676

77-
fn get_info(&self, codes: Option<HashSet<InfoCode>>) -> Result<impl RecordBatchReader + Send> {
77+
fn get_info(
78+
&self,
79+
codes: Option<HashSet<InfoCode>>,
80+
) -> Result<impl RecordBatchReader + Send + '_> {
7881
self.0.get_info(codes)
7982
}
8083

@@ -86,7 +89,7 @@ impl adbc_core::Connection for Connection {
8689
table_name: Option<&str>,
8790
table_type: Option<Vec<&str>>,
8891
column_name: Option<&str>,
89-
) -> Result<impl RecordBatchReader + Send> {
92+
) -> Result<impl RecordBatchReader + Send + '_> {
9093
self.0.get_objects(
9194
depth,
9295
catalog,
@@ -106,11 +109,11 @@ impl adbc_core::Connection for Connection {
106109
self.0.get_table_schema(catalog, db_schema, table_name)
107110
}
108111

109-
fn get_table_types(&self) -> Result<impl RecordBatchReader + Send> {
112+
fn get_table_types(&self) -> Result<impl RecordBatchReader + Send + '_> {
110113
self.0.get_table_types()
111114
}
112115

113-
fn get_statistic_names(&self) -> Result<impl RecordBatchReader + Send> {
116+
fn get_statistic_names(&self) -> Result<impl RecordBatchReader + Send + '_> {
114117
self.0.get_statistic_names()
115118
}
116119

@@ -120,7 +123,7 @@ impl adbc_core::Connection for Connection {
120123
db_schema: Option<&str>,
121124
table_name: Option<&str>,
122125
approximate: bool,
123-
) -> Result<impl RecordBatchReader + Send> {
126+
) -> Result<impl RecordBatchReader + Send + '_> {
124127
self.0
125128
.get_statistics(catalog, db_schema, table_name, approximate)
126129
}
@@ -133,7 +136,10 @@ impl adbc_core::Connection for Connection {
133136
self.0.rollback()
134137
}
135138

136-
fn read_partition(&self, partition: impl AsRef<[u8]>) -> Result<impl RecordBatchReader + Send> {
139+
fn read_partition(
140+
&self,
141+
partition: impl AsRef<[u8]>,
142+
) -> Result<impl RecordBatchReader + Send + '_> {
137143
self.0.read_partition(partition)
138144
}
139145
}

rust/driver_manager/src/lib.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1329,7 +1329,10 @@ impl Connection for ManagedConnection {
13291329
check_status(status, error)
13301330
}
13311331

1332-
fn get_info(&self, codes: Option<HashSet<InfoCode>>) -> Result<impl RecordBatchReader> {
1332+
fn get_info(
1333+
&self,
1334+
codes: Option<HashSet<InfoCode>>,
1335+
) -> Result<impl RecordBatchReader + Send + '_> {
13331336
let mut stream = FFI_ArrowArrayStream::empty();
13341337
let codes: Option<Vec<u32>> =
13351338
codes.map(|codes| codes.iter().map(|code| code.into()).collect());
@@ -1363,7 +1366,7 @@ impl Connection for ManagedConnection {
13631366
table_name: Option<&str>,
13641367
table_type: Option<Vec<&str>>,
13651368
column_name: Option<&str>,
1366-
) -> Result<impl RecordBatchReader> {
1369+
) -> Result<impl RecordBatchReader + Send + '_> {
13671370
let catalog = catalog.map(CString::new).transpose()?;
13681371
let db_schema = db_schema.map(CString::new).transpose()?;
13691372
let table_name = table_name.map(CString::new).transpose()?;
@@ -1424,7 +1427,7 @@ impl Connection for ManagedConnection {
14241427
db_schema: Option<&str>,
14251428
table_name: Option<&str>,
14261429
approximate: bool,
1427-
) -> Result<impl RecordBatchReader> {
1430+
) -> Result<impl RecordBatchReader + Send + '_> {
14281431
if let AdbcVersion::V100 = self.driver_version() {
14291432
return Err(Error::with_message_and_status(
14301433
ERR_STATISTICS_UNSUPPORTED,
@@ -1461,7 +1464,7 @@ impl Connection for ManagedConnection {
14611464
Ok(reader)
14621465
}
14631466

1464-
fn get_statistic_names(&self) -> Result<impl RecordBatchReader> {
1467+
fn get_statistic_names(&self) -> Result<impl RecordBatchReader + Send + '_> {
14651468
if let AdbcVersion::V100 = self.driver_version() {
14661469
return Err(Error::with_message_and_status(
14671470
ERR_STATISTICS_UNSUPPORTED,
@@ -1512,7 +1515,7 @@ impl Connection for ManagedConnection {
15121515
Ok((&schema).try_into()?)
15131516
}
15141517

1515-
fn get_table_types(&self) -> Result<impl RecordBatchReader> {
1518+
fn get_table_types(&self) -> Result<impl RecordBatchReader + Send + '_> {
15161519
let mut stream = FFI_ArrowArrayStream::empty();
15171520
let driver = self.ffi_driver();
15181521
let mut connection = self.inner.connection.lock().unwrap();
@@ -1524,7 +1527,10 @@ impl Connection for ManagedConnection {
15241527
Ok(reader)
15251528
}
15261529

1527-
fn read_partition(&self, partition: impl AsRef<[u8]>) -> Result<impl RecordBatchReader> {
1530+
fn read_partition(
1531+
&self,
1532+
partition: impl AsRef<[u8]>,
1533+
) -> Result<impl RecordBatchReader + Send + '_> {
15281534
let mut stream = FFI_ArrowArrayStream::empty();
15291535
let driver = self.ffi_driver();
15301536
let mut connection = self.inner.connection.lock().unwrap();
@@ -1651,7 +1657,7 @@ impl Statement for ManagedStatement {
16511657
check_status(status, error)
16521658
}
16531659

1654-
fn execute(&mut self) -> Result<impl RecordBatchReader> {
1660+
fn execute(&mut self) -> Result<impl RecordBatchReader + Send + '_> {
16551661
let driver = self.ffi_driver();
16561662
let mut statement = self.inner.statement.lock().unwrap();
16571663
let mut error = adbc_ffi::FFI_AdbcError::with_driver(driver);

0 commit comments

Comments
 (0)