Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions rust/core/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub trait Connection: Optionable<Option = OptionConnection> {
fn get_info(
&self,
codes: Option<HashSet<options::InfoCode>>,
) -> Result<impl RecordBatchReader + Send>;
) -> Result<Box<dyn RecordBatchReader + Send + 'static>>;

/// Get a hierarchical view of all catalogs, database schemas, tables, and
/// columns.
Expand Down Expand Up @@ -233,7 +233,7 @@ pub trait Connection: Optionable<Option = OptionConnection> {
table_name: Option<&str>,
table_type: Option<Vec<&str>>,
column_name: Option<&str>,
) -> Result<impl RecordBatchReader + Send>;
) -> Result<Box<dyn RecordBatchReader + Send + 'static>>;

/// Get the Arrow schema of a table.
///
Expand All @@ -258,7 +258,7 @@ pub trait Connection: Optionable<Option = OptionConnection> {
/// Field Name | Field Type
/// ---------------|--------------
/// table_type | utf8 not null
fn get_table_types(&self) -> Result<impl RecordBatchReader + Send>;
fn get_table_types(&self) -> Result<Box<dyn RecordBatchReader + Send + 'static>>;

/// Get the names of statistics specific to this driver.
///
Expand All @@ -273,7 +273,7 @@ pub trait Connection: Optionable<Option = OptionConnection> {
///
/// # Since
/// ADBC API revision 1.1.0
fn get_statistic_names(&self) -> Result<impl RecordBatchReader + Send>;
fn get_statistic_names(&self) -> Result<Box<dyn RecordBatchReader + Send + 'static>>;

/// Get statistics about the data distribution of table(s).
///
Expand Down Expand Up @@ -339,7 +339,7 @@ pub trait Connection: Optionable<Option = OptionConnection> {
db_schema: Option<&str>,
table_name: Option<&str>,
approximate: bool,
) -> Result<impl RecordBatchReader + Send>;
) -> Result<Box<dyn RecordBatchReader + Send + 'static>>;

/// Commit any pending transactions. Only used if autocommit is disabled.
///
Expand All @@ -358,7 +358,10 @@ pub trait Connection: Optionable<Option = OptionConnection> {
/// # Arguments
///
/// - `partition` - The partition descriptor.
fn read_partition(&self, partition: impl AsRef<[u8]>) -> Result<impl RecordBatchReader + Send>;
fn read_partition(
&self,
partition: impl AsRef<[u8]>,
) -> Result<Box<dyn RecordBatchReader + Send + 'static>>;
}

/// A handle to an ADBC statement.
Expand Down Expand Up @@ -391,10 +394,7 @@ pub trait Statement: Optionable<Option = OptionStatement> {
/// Execute a statement and get the results.
///
/// This invalidates any prior result sets.
// TODO(alexandreyc): is the Send bound absolutely necessary? same question
// for all methods that return an impl RecordBatchReader
// See: https://github.com/apache/arrow-adbc/pull/1725#discussion_r1567748242
fn execute(&mut self) -> Result<impl RecordBatchReader + Send>;
fn execute(&mut self) -> Result<Box<dyn RecordBatchReader + Send + 'static>>;

/// Execute a statement that doesn’t have a result set and get the number
/// of affected rows.
Expand Down
23 changes: 13 additions & 10 deletions rust/driver/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ impl Connection for DataFusionConnection {
fn get_info(
&self,
codes: Option<std::collections::HashSet<adbc_core::options::InfoCode>>,
) -> Result<impl RecordBatchReader + Send> {
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let mut get_info_builder = GetInfoBuilder::new();

codes.unwrap().into_iter().for_each(|f| match f {
Expand All @@ -755,7 +755,7 @@ impl Connection for DataFusionConnection {

let batch = get_info_builder.finish()?;
let reader = SingleBatchReader::new(batch);
Ok(reader)
Ok(Box::new(reader))
}

fn get_objects(
Expand All @@ -766,10 +766,10 @@ impl Connection for DataFusionConnection {
_table_name: Option<&str>,
_table_type: Option<Vec<&str>>,
_column_name: Option<&str>,
) -> Result<impl RecordBatchReader + Send> {
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let batch = GetObjectsBuilder::new().build(&self.runtime, &self.ctx, &depth)?;
let reader = SingleBatchReader::new(batch);
Ok(reader)
Ok(Box::new(reader))
}

fn get_table_schema(
Expand All @@ -781,11 +781,11 @@ impl Connection for DataFusionConnection {
todo!()
}

fn get_table_types(&self) -> Result<SingleBatchReader> {
fn get_table_types(&self) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
todo!()
}

fn get_statistic_names(&self) -> Result<SingleBatchReader> {
fn get_statistic_names(&self) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
todo!()
}

Expand All @@ -795,7 +795,7 @@ impl Connection for DataFusionConnection {
_db_schema: Option<&str>,
_table_name: Option<&str>,
_approximate: bool,
) -> Result<SingleBatchReader> {
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
todo!()
}

Expand All @@ -807,7 +807,10 @@ impl Connection for DataFusionConnection {
todo!()
}

fn read_partition(&self, _partition: impl AsRef<[u8]>) -> Result<SingleBatchReader> {
fn read_partition(
&self,
_partition: impl AsRef<[u8]>,
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
todo!()
}
}
Expand Down Expand Up @@ -901,7 +904,7 @@ impl Statement for DataFusionStatement {
todo!()
}

fn execute(&mut self) -> Result<impl RecordBatchReader + Send> {
fn execute(&mut self) -> Result<Box<dyn RecordBatchReader + Send>> {
self.runtime.block_on(async {
let df = if self.sql_query.is_some() {
self.ctx
Expand All @@ -916,7 +919,7 @@ impl Statement for DataFusionStatement {
self.ctx.execute_logical_plan(plan).await.unwrap()
};

Ok(DataFusionReader::new(df).await)
Ok(Box::new(DataFusionReader::new(df).await) as Box<dyn RecordBatchReader + Send>)
})
}

Expand Down
34 changes: 20 additions & 14 deletions rust/driver/dummy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,10 @@ impl Connection for DummyConnection {
Ok(())
}

fn get_info(&self, _codes: Option<HashSet<InfoCode>>) -> Result<impl RecordBatchReader> {
fn get_info(
&self,
_codes: Option<HashSet<InfoCode>>,
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let string_value_array = StringArray::from(vec!["MyVendorName"]);
let bool_value_array = BooleanArray::from(vec![true]);
let int64_value_array = Int64Array::from(vec![42]);
Expand Down Expand Up @@ -407,7 +410,7 @@ impl Connection for DummyConnection {
vec![Arc::new(name_array), Arc::new(value_array)],
)?;
let reader = SingleBatchReader::new(batch);
Ok(reader)
Ok(Box::new(reader))
}

fn get_objects(
Expand All @@ -418,7 +421,7 @@ impl Connection for DummyConnection {
_table_name: Option<&str>,
_table_type: Option<Vec<&str>>,
_column_name: Option<&str>,
) -> Result<impl RecordBatchReader> {
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let constraint_column_usage_array_inner = StructArray::from(vec![
(
Arc::new(Field::new("fk_catalog", DataType::Utf8, true)),
Expand Down Expand Up @@ -645,7 +648,7 @@ impl Connection for DummyConnection {
],
)?;
let reader = SingleBatchReader::new(batch);
Ok(reader)
Ok(Box::new(reader))
}

fn get_statistics(
Expand All @@ -654,7 +657,7 @@ impl Connection for DummyConnection {
_db_schema: Option<&str>,
_table_name: Option<&str>,
_approximate: bool,
) -> Result<impl RecordBatchReader> {
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let statistic_value_int64_array = Int64Array::from(Vec::<i64>::new());
let statistic_value_uint64_array = UInt64Array::from(vec![42]);
let statistic_value_float64_array = Float64Array::from(Vec::<f64>::new());
Expand Down Expand Up @@ -759,18 +762,18 @@ impl Connection for DummyConnection {
)?;

let reader = SingleBatchReader::new(batch);
Ok(reader)
Ok(Box::new(reader))
}

fn get_statistic_names(&self) -> Result<impl RecordBatchReader> {
fn get_statistic_names(&self) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let name_array = StringArray::from(vec!["sum", "min", "max"]);
let key_array = Int16Array::from(vec![0, 1, 2]);
let batch = RecordBatch::try_new(
schemas::GET_STATISTIC_NAMES_SCHEMA.clone(),
vec![Arc::new(name_array), Arc::new(key_array)],
)?;
let reader = SingleBatchReader::new(batch);
Ok(reader)
Ok(Box::new(reader))
}

fn get_table_schema(
Expand All @@ -792,17 +795,20 @@ impl Connection for DummyConnection {
}
}

fn get_table_types(&self) -> Result<impl RecordBatchReader> {
fn get_table_types(&self) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let array = Arc::new(StringArray::from(vec!["table", "view"]));
let batch = RecordBatch::try_new(schemas::GET_TABLE_TYPES_SCHEMA.clone(), vec![array])?;
let reader = SingleBatchReader::new(batch);
Ok(reader)
Ok(Box::new(reader))
}

fn read_partition(&self, _partition: impl AsRef<[u8]>) -> Result<impl RecordBatchReader> {
fn read_partition(
&self,
_partition: impl AsRef<[u8]>,
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let batch = get_table_data();
let reader = SingleBatchReader::new(batch);
Ok(reader)
Ok(Box::new(reader))
}

fn rollback(&mut self) -> Result<()> {
Expand Down Expand Up @@ -852,11 +858,11 @@ impl Statement for DummyStatement {
Ok(())
}

fn execute(&mut self) -> Result<impl RecordBatchReader> {
fn execute(&mut self) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
maybe_panic("StatementExecuteQuery");
let batch = get_table_data();
let reader = SingleBatchReader::new(batch);
Ok(reader)
Ok(Box::new(reader))
}

fn execute_partitions(&mut self) -> Result<PartitionedResult> {
Expand Down
18 changes: 12 additions & 6 deletions rust/driver/snowflake/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ impl adbc_core::Connection for Connection {
self.0.cancel()
}

fn get_info(&self, codes: Option<HashSet<InfoCode>>) -> Result<impl RecordBatchReader + Send> {
fn get_info(
&self,
codes: Option<HashSet<InfoCode>>,
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
self.0.get_info(codes)
}

Expand All @@ -86,7 +89,7 @@ impl adbc_core::Connection for Connection {
table_name: Option<&str>,
table_type: Option<Vec<&str>>,
column_name: Option<&str>,
) -> Result<impl RecordBatchReader + Send> {
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
self.0.get_objects(
depth,
catalog,
Expand All @@ -106,11 +109,11 @@ impl adbc_core::Connection for Connection {
self.0.get_table_schema(catalog, db_schema, table_name)
}

fn get_table_types(&self) -> Result<impl RecordBatchReader + Send> {
fn get_table_types(&self) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
self.0.get_table_types()
}

fn get_statistic_names(&self) -> Result<impl RecordBatchReader + Send> {
fn get_statistic_names(&self) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
self.0.get_statistic_names()
}

Expand All @@ -120,7 +123,7 @@ impl adbc_core::Connection for Connection {
db_schema: Option<&str>,
table_name: Option<&str>,
approximate: bool,
) -> Result<impl RecordBatchReader + Send> {
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
self.0
.get_statistics(catalog, db_schema, table_name, approximate)
}
Expand All @@ -133,7 +136,10 @@ impl adbc_core::Connection for Connection {
self.0.rollback()
}

fn read_partition(&self, partition: impl AsRef<[u8]>) -> Result<impl RecordBatchReader + Send> {
fn read_partition(
&self,
partition: impl AsRef<[u8]>,
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
self.0.read_partition(partition)
}
}
2 changes: 1 addition & 1 deletion rust/driver/snowflake/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl adbc_core::Statement for Statement {
self.0.bind_stream(reader)
}

fn execute(&mut self) -> Result<impl RecordBatchReader + Send> {
fn execute(&mut self) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
self.0.execute()
}

Expand Down
Loading
Loading