Skip to content
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 236 additions & 0 deletions c/include/arrow-adbc/adbc.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,102 @@ struct ArrowArrayStream {
#endif // ARROW_C_STREAM_INTERFACE
#endif // ARROW_FLAG_DICTIONARY_ORDERED

#ifndef ARROW_C_DEVICE_DATA_INTERFACE
#define ARROW_C_DEVICE_DATA_INTERFACE

// Device type for the allocated memory
typedef int32_t ArrowDeviceType;

// CPU device, same as using ArrowArray directly
#define ARROW_DEVICE_CPU 1
// CUDA GPU Device
#define ARROW_DEVICE_CUDA 2
// Pinned CUDA CPU memory by cudaMallocHost
#define ARROW_DEVICE_CUDA_HOST 3
// OpenCL Device
#define ARROW_DEVICE_OPENCL 4
// Vulkan buffer for next-gen graphics
#define ARROW_DEVICE_VULKAN 7
// Metal for Apple GPU
#define ARROW_DEVICE_METAL 8
// Verilog simulator buffer
#define ARROW_DEVICE_VPI 9
// ROCm GPUs for AMD GPUs
#define ARROW_DEVICE_ROCM 10
// Pinned ROCm CPU memory allocated by hipMallocHost
#define ARROW_DEVICE_ROCM_HOST 11
// Reserved for extension
//
// used to quickly test extension devices, semantics
// can differ based on implementation
#define ARROW_DEVICE_EXT_DEV 12
// CUDA managed/unified memory allocated by cudaMallocManaged
#define ARROW_DEVICE_CUDA_MANAGED 13
// Unified shared memory allocated on a oneAPI
// non-partitioned device.
//
// A call to the oneAPI runtime is required to determine the
// device type, the USM allocation type and the sycl context
// that it is bound to.
#define ARROW_DEVICE_ONEAPI 14
// GPU support for next-gen WebGPU standard
#define ARROW_DEVICE_WEBGPU 15
// Qualcomm Hexagon DSP
#define ARROW_DEVICE_HEXAGON 16

struct ArrowDeviceArray {
struct ArrowArray array;
int64_t device_id;
ArrowDeviceType device_type;
void* sync_event;

// reserved bytes for future expansion
int64_t reserved[3];
};

#endif // ARROW_C_DEVICE_DATA_INTERFACE

#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
#define ARROW_C_ASYNC_STREAM_INTERFACE

struct ArrowAsyncTask {
int (*extract_data)(struct ArrowAsyncTask* self, struct ArrowDeviceArray* out);

void* private_data;
};

struct ArrowAsyncProducer {
ArrowDeviceType device_type;

void (*request)(struct ArrowAsyncProducer* self, int64_t n);
void (*cancel)(struct ArrowAsyncProducer* self);

void (*release)(struct ArrowAsyncProducer* self);
const char* additional_metadata;
void* private_data;
};

struct ArrowAsyncDeviceStreamHandler {
// consumer-specific handlers
int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
struct ArrowSchema* stream_schema);
int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self,
struct ArrowAsyncTask* task, const char* metadata);
void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self, int code,
const char* message, const char* metadata);

// release callback
void (*release)(struct ArrowAsyncDeviceStreamHandler* self);

// must be populated before calling any callbacks
struct ArrowAsyncProducer* producer;

// opaque handler-specific data
void* private_data;
};

#endif // ARROW_C_ASYNC_STREAM_INTERFACE

//! @endcond

/// @}
Expand Down Expand Up @@ -423,6 +519,14 @@ const struct AdbcError* AdbcErrorFromArrayStream(struct ArrowArrayStream* stream
/// \since ADBC API revision 1.1.0
#define ADBC_VERSION_1_1_0 1001000

/// \brief ADBC revision 1.2.0.
///
/// When passed to an AdbcDriverInitFunc(), the driver parameter must
/// point to an AdbcDriver.
///
/// \since ADBC API revision 1.2.0
#define ADBC_VERSION_1_2_0 1002000

/// \brief Canonical option value for enabling an option.
///
/// For use as the value in SetOption calls.
Expand Down Expand Up @@ -1135,6 +1239,44 @@ struct ADBC_EXPORT AdbcDriver {
struct AdbcError*);

/// @}

/// \defgroup adbc-1.2.0 ADBC API Revision 1.2.0
///
/// Functions added in ADBC 1.2.0. For backwards compatibility,
/// these members must not be accessed unless the version passed to
/// the AdbcDriverInitFunc is greater than or equal to
/// ADBC_VERSION_1_2_0.
///
/// For an earlier driver being loaded by a 1.2.0 driver manager: the
/// 1.2.0 manager will allocate the new, expanded AdbcDriver struct
/// and attempt to have the driver initialize it with
/// ADBC_VERSION_1_2_0. This must return an error, after which the
/// driver will try again with ADBC_VERSION_1_1_0, and so on. The
/// driver must not access the new fields, which will carry undefined
/// values.
///
/// For a 1.2.0 driver being loaded by an earlier driver manager: the
/// earlier manager will allocate the old AdbcDriver struct and attempt
/// to have the driver initialize it with the earlier version. The driver
/// must not access the new fields, and should initialize the old fields.
///
/// @{

AdbcStatusCode (*StatementNextResultSet)(struct AdbcStatement*,
struct ArrowArrayStream*, int64_t*,
struct AdbcError*);
AdbcStatusCode (*StatementExecuteQueryAsync)(struct AdbcStatement*,
struct ArrowAsyncDeviceStreamHandler*,
int64_t*, struct AdbcError*);
AdbcStatusCode (*StatementNextResultSetAsync)(struct AdbcStatement*,
struct ArrowAsyncDeviceStreamHandler*,
int64_t*, struct AdbcError*);
AdbcStatusCode (*ConnectionReadPartitionAsync)(struct AdbcConnection*, const uint8_t*,
size_t,
struct ArrowAsyncDeviceStreamHandler*,
struct AdbcError*);

/// @}
};

/// \brief The size of the AdbcDriver structure in ADBC 1.0.0.
Expand Down Expand Up @@ -1939,6 +2081,32 @@ AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection,
struct ArrowArrayStream* out,
struct AdbcError* error);

/// \brief Asynchronously read a partition of a query. The results can
/// then be read independently.
///
/// A partition can be retrieved from AdbcPartitions.
///
/// This AdbcConnection must outlive the ArrowAsyncDeviceStreamHandler.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to comment on the lifetime of serialized_partition too - does it need to live until the call returns or until the callback finishes?

///
/// \since ADBC API revision 1.2.0
///
/// \param[in] connection The connection to use. This does not have to be
/// the same connection that the partition was created on.
/// \param[in] serialized_partition The serialized partition descriptor.
/// \param[in] serialized_length The length of the serialized partition.
/// \param[in] handler The async device stream handler whose callbacks will
/// be used to deliver results.
/// \param[out] error Error details, if an error occurs.
///
/// \return ADBC_STATUS_NOT_IMPLEMENTED if the driver does not support async
/// execution/partition reading, ADBC_STATUS_OK if the execution has started
/// successfully.
ADBC_EXPORT
AdbcStatusCode AdbcConnectionReadPartitionAsync(
struct AdbcConnection* connection, const uint8_t* serialized_partition,
size_t serialized_length, struct ArrowAsyncDeviceStreamHandler* handler,
struct AdbcError* error);

/// @}

/// \defgroup adbc-connection-transaction Transaction Semantics
Expand Down Expand Up @@ -2013,6 +2181,74 @@ AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
struct ArrowArrayStream* out,
int64_t* rows_affected, struct AdbcError* error);

/// \brief Retrieve the next result set from a multi-statement execution
///
/// This invalidates any prior result sets. This AdbcStatement must
/// outlive the returned ArrowArrayStream.
///
/// \since ADBC API revision 1.2.0
///
/// \param[in] statement The statement to retrieve the next result set from.
/// \param[out] out The results. Pass NULL if the client does not
/// expect a result set.
/// \param[out] rows_affected The number of rows affected if known,
/// else -1. Pass NULL if the client does not want this information.
/// \param[out] error An optional location to return an error
/// message if necessary.
///
/// \return ADBC_STATUS_NOT_IMPLEMENTED if the driver does not support
/// multi-result set execution, ADBC_STATUS_OK if next result set is
/// being returned successfully. ADBC_STATUS_NOT_FOUND is returned
/// when there are no more result sets.
ADBC_EXPORT
AdbcStatusCode AdbcStatementNextResultSet(struct AdbcStatement* statement,
Copy link
Contributor

@CurtHagenlocher CurtHagenlocher Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if this also worked for "schema-only" evaluation. (I started down this road last year and my work-in-progress was at main...CurtHagenlocher:arrow-adbc:MoreResults.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have AdbcStatementExecuteSchema for schema-only evaluation, so you're suggesting a NextResultSet function for that one too?

struct ArrowArrayStream* out,
int64_t* rows_affected,
struct AdbcError* error);

/// \brief Execute a statement asynchronously and get the results.
///
/// This will invalidate any prior result sets. This AdbcStatement must
/// outlive the returned ArrowAsyncDeviceStreamHandler.
///
/// \since ADBC API revision 1.2.0
///
/// \param[in] statement The statement to execute.
/// \param[in] handler The async stream handler whose callbacks to use
/// to deliver results.
/// \param[out] rows_affected The number of rows affected if known, else -1. Pass
/// NULL if the client does not want this information.
/// \param[out] error An optional location to return an error message if necessary.
///
/// \return ADBC_STATUS_NOT_IMPLEMENTED if the driver does not support async execution,
/// ADBC_STATUS_OK if the execution has started successfully.
ADBC_EXPORT
AdbcStatusCode AdbcStatementExecuteQueryAsync(
struct AdbcStatement* statement, struct ArrowAsyncDeviceStreamHandler* handler,
int64_t* rows_affected, struct AdbcError* error);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would rows_affected be populated? Maybe it should be included in schema metadata or something instead...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it could go in ArrowAsyncProducer's additional_metadata. Or, provide an AdbcAsyncProducerGetMetadata(ArrowAsyncProducer*) like how we have an AdbcErrorFromArrayStream. (Speaking of which, we need an equivalent of that for the async stream.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that's a good point. Since it's an asynchronous execution it wouldn't be populated before returning. I guess we can define a canonical metadata key to indicate total rows?


/// \brief Retrieve the next result set from a multi-statement execution
/// asynchronously.
///
/// This will invalidate any prior result sets. This AdbcStatement must
/// outlive the returned ArrowAsyncDeviceStreamHandler.
///
/// \since ADBC API revision 1.2.0
///
/// \param[in] statement The statement to retrieve the next result set from.
/// \param[in] handler The async stream handler whose callbacks to use to deliver results.
/// \param[out] rows_affected The number of rows affected if known, else -1. Pass
/// NULL if the client does not want this information.
/// \param[out] error An optional location to return an error message if necessary.
///
/// \return ADBC_STATUS_NOT_IMPLEMENTED if the driver does not support async execution,
/// ADBC_STATUS_OK if next result set is being returned successfully.
/// ADBC_STATUS_NOT_FOUND is returned when there are no more result sets.
ADBC_EXPORT
AdbcStatusCode AdbcStatementNextResultSetAsync(
struct AdbcStatement* statement, struct ArrowAsyncDeviceStreamHandler* handler,
int64_t* rows_affected, struct AdbcError* error);

/// \brief Get the schema of the result set of a query without
/// executing it.
///
Expand Down
Loading