Skip to content

Commit ccc56eb

Browse files
chore[duckdb]: set max threads (#5020)
Set max threads on the rust side --------- Signed-off-by: Joe Isaacs <[email protected]>
1 parent fbf696c commit ccc56eb

File tree

7 files changed

+55
-40
lines changed

7 files changed

+55
-40
lines changed

vortex-duckdb/cpp/config.cpp

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,28 @@ using namespace duckdb;
1212

1313
extern "C" {
1414

15-
duckdb_state duckdb_vx_get_config_value(duckdb_config config, const char* key, duckdb_value* out_value) {
15+
duckdb_state duckdb_vx_get_config_value(duckdb_config config, const char *key, duckdb_value *out_value) {
1616
if (!config || !key || !out_value) {
1717
return DuckDBError;
1818
}
1919

2020
try {
2121
// Cast the config to DuckDB's internal config type
22-
auto* db_config = reinterpret_cast<DBConfig*>(config);
23-
22+
auto *db_config = reinterpret_cast<DBConfig *>(config);
23+
2424
if (!db_config) {
2525
return DuckDBError;
2626
}
2727

2828
std::string key_str(key);
29-
29+
3030
// First check set_variables (the primary location for config values)
3131
auto set_it = db_config->options.set_variables.find(key_str);
3232
if (set_it != db_config->options.set_variables.end()) {
3333
*out_value = reinterpret_cast<duckdb_value>(new Value(set_it->second));
3434
return DuckDBSuccess;
3535
}
36-
36+
3737
// Then check user_options
3838
auto user_it = db_config->options.user_options.find(key_str);
3939
if (user_it != db_config->options.user_options.end()) {
@@ -43,69 +43,69 @@ duckdb_state duckdb_vx_get_config_value(duckdb_config config, const char* key, d
4343

4444
// Key not found in any map
4545
return DuckDBError;
46-
46+
4747
} catch (...) {
4848
return DuckDBError;
4949
}
5050
}
5151

52-
int duckdb_vx_config_has_key(duckdb_config config, const char* key) {
52+
int duckdb_vx_config_has_key(duckdb_config config, const char *key) {
5353
if (!config || !key) {
5454
return 0;
5555
}
5656

5757
try {
58-
auto* db_config = reinterpret_cast<DBConfig*>(config);
58+
auto *db_config = reinterpret_cast<DBConfig *>(config);
5959
if (!db_config) {
6060
return 0;
6161
}
62-
62+
6363
std::string key_str(key);
64-
64+
6565
// Check if the key exists in set_variables (primary location)
6666
if (db_config->options.set_variables.find(key_str) != db_config->options.set_variables.end()) {
6767
return 1;
6868
}
69-
69+
7070
// Check if the key exists in user_options
7171
if (db_config->options.user_options.find(key_str) != db_config->options.user_options.end()) {
7272
return 1;
7373
}
7474

7575
return 0;
76-
76+
7777
} catch (...) {
7878
return 0;
7979
}
8080
}
8181

82-
char* duckdb_vx_value_to_string(duckdb_value value) {
82+
char *duckdb_vx_value_to_string(duckdb_value value) {
8383
if (!value) {
8484
return nullptr;
8585
}
8686

8787
try {
8888
// Cast the value to DuckDB's internal Value type
89-
auto* ddb_value = reinterpret_cast<Value*>(value);
90-
89+
auto *ddb_value = reinterpret_cast<Value *>(value);
90+
9191
if (!ddb_value) {
9292
return nullptr;
9393
}
9494

9595
// Use the ToString method to get the string representation
9696
std::string str_value = ddb_value->ToString();
97-
97+
9898
// Allocate memory for the C string using malloc (compatible with duckdb_free)
9999
size_t str_len = str_value.length() + 1;
100-
char* result = static_cast<char*>(malloc(str_len));
100+
char *result = static_cast<char *>(malloc(str_len));
101101
if (!result) {
102102
return nullptr;
103103
}
104-
104+
105105
// Copy the string and null terminate
106106
std::memcpy(result, str_value.c_str(), str_len);
107107
return result;
108-
108+
109109
} catch (...) {
110110
return nullptr;
111111
}

vortex-duckdb/cpp/include/duckdb_vx/table_function.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ typedef struct {
140140
bool filter_prune;
141141
bool sampling_pushdown;
142142
bool late_materialization;
143+
idx_t max_threads;
143144
} duckdb_vx_tfunc_vtab_t;
144145

145146
// A single function for configuring the DuckDB table function vtable.

vortex-duckdb/cpp/table_function.cpp

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ using namespace duckdb;
1616

1717
namespace vortex {
1818
struct CTableFunctionInfo final : TableFunctionInfo {
19-
explicit CTableFunctionInfo(const duckdb_vx_tfunc_vtab_t &vtab) : vtab(vtab) {
19+
explicit CTableFunctionInfo(const duckdb_vx_tfunc_vtab_t &vtab)
20+
: vtab(vtab), max_threads(vtab.max_threads) {
2021
}
2122

2223
duckdb_vx_tfunc_vtab_t vtab;
24+
idx_t max_threads;
2325
};
2426

2527
struct CTableBindData final : TableFunctionData {
@@ -44,13 +46,15 @@ struct CTableBindData final : TableFunctionData {
4446
};
4547

4648
struct CTableGlobalData final : GlobalTableFunctionState {
47-
explicit CTableGlobalData(unique_ptr<vortex::CData> ffi_data_p) : ffi_data(std::move(ffi_data_p)) {
49+
explicit CTableGlobalData(unique_ptr<vortex::CData> ffi_data_p, idx_t max_threads_p)
50+
: ffi_data(std::move(ffi_data_p)), max_threads(max_threads_p) {
4851
}
4952

5053
unique_ptr<vortex::CData> ffi_data;
54+
idx_t max_threads;
5155

5256
idx_t MaxThreads() const override {
53-
return GlobalTableFunctionState::MAX_THREADS;
57+
return max_threads;
5458
}
5559
};
5660

@@ -111,7 +115,8 @@ unique_ptr<GlobalTableFunctionState> c_init_global(ClientContext &context, Table
111115
}
112116

113117
return make_uniq<CTableGlobalData>(
114-
unique_ptr<vortex::CData>(reinterpret_cast<vortex::CData *>(ffi_global_data)));
118+
unique_ptr<vortex::CData>(reinterpret_cast<vortex::CData *>(ffi_global_data)),
119+
bind.info->max_threads);
115120
}
116121

117122
unique_ptr<LocalTableFunctionState> c_init_local(ExecutionContext &context, TableFunctionInitInput &input,
@@ -148,8 +153,8 @@ void c_function(ClientContext &context, TableFunctionInput &input, DataChunk &ou
148153
auto local_data = input.local_state->Cast<CTableLocalData>().ffi_data->DataPtr();
149154

150155
duckdb_vx_error error_out = nullptr;
151-
bind.info->vtab.function(ctx, bind_data, global_data, local_data, reinterpret_cast<duckdb_data_chunk>(&output),
152-
&error_out);
156+
bind.info->vtab.function(ctx, bind_data, global_data, local_data,
157+
reinterpret_cast<duckdb_data_chunk>(&output), &error_out);
153158
if (error_out) {
154159
throw InvalidInputException(IntoErrString(error_out));
155160
}
@@ -255,8 +260,8 @@ virtual_column_map_t c_get_virtual_columns(ClientContext &context, optional_ptr<
255260
}
256261

257262
extern "C" void duckdb_vx_tfunc_virtual_columns_push(duckdb_vx_tfunc_virtual_cols_result ffi_result,
258-
idx_t column_idx, const char *name_str, size_t name_len,
259-
duckdb_logical_type ffi_type) {
263+
idx_t column_idx, const char *name_str, size_t name_len,
264+
duckdb_logical_type ffi_type) {
260265
if (!ffi_result || !name_str || !ffi_type) {
261266
return;
262267
}
@@ -289,7 +294,7 @@ OperatorPartitionData c_get_partition_data(ClientContext &context, TableFunction
289294
InsertionOrderPreservingMap<string> c_to_string(TableFunctionToStringInput &input) {
290295
InsertionOrderPreservingMap<string> result;
291296
auto &bind = input.bind_data->Cast<CTableBindData>();
292-
297+
293298
// Call the Rust side to get custom string representation if available
294299
if (bind.info->vtab.to_string) {
295300
auto map = bind.info->vtab.to_string(bind.ffi_data->DataPtr());
@@ -303,7 +308,7 @@ InsertionOrderPreservingMap<string> c_to_string(TableFunctionToStringInput &inpu
303308
duckdb_vx_string_map_free(map);
304309
}
305310
}
306-
311+
307312
return result;
308313
}
309314

vortex-duckdb/cpp/vector.cpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,24 +47,26 @@ namespace vortex {
4747
// This is a complete hack to access the data buffer and pointer of a vector.
4848
class DataVector : public Vector {
4949
public:
50-
inline void SetDataBuffer(buffer_ptr<VectorBuffer> new_buffer) {
51-
buffer = std::move(new_buffer);
52-
};
50+
inline void SetDataBuffer(buffer_ptr<VectorBuffer> new_buffer) {
51+
buffer = std::move(new_buffer);
52+
};
5353

54-
inline void SetDataPtr(data_ptr_t ptr) {
55-
data = ptr;
56-
};
54+
inline void SetDataPtr(data_ptr_t ptr) {
55+
data = ptr;
56+
};
5757
};
5858

5959
} // namespace vortex
6060

61-
extern "C" void duckdb_vx_string_vector_add_vector_data_buffer(duckdb_vector ffi_vector, duckdb_vx_vector_buffer buffer) {
61+
extern "C" void duckdb_vx_string_vector_add_vector_data_buffer(duckdb_vector ffi_vector,
62+
duckdb_vx_vector_buffer buffer) {
6263
auto vector = reinterpret_cast<Vector *>(ffi_vector);
6364
auto data = reinterpret_cast<shared_ptr<vortex::ExternalVectorBuffer> *>(buffer);
6465
StringVector::AddBuffer(*vector, *data);
6566
}
6667

67-
extern "C" void duckdb_vx_vector_set_vector_data_buffer(duckdb_vector ffi_vector, duckdb_vx_vector_buffer buffer) {
68+
extern "C" void duckdb_vx_vector_set_vector_data_buffer(duckdb_vector ffi_vector,
69+
duckdb_vx_vector_buffer buffer) {
6870
auto vector = reinterpret_cast<Vector *>(ffi_vector);
6971
auto dvector = reinterpret_cast<vortex::DataVector *>(vector);
7072
auto data = reinterpret_cast<shared_ptr<vortex::ExternalVectorBuffer> *>(buffer);

vortex-duckdb/cpp/vector_buffer.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@ using namespace duckdb;
1212

1313
extern "C" duckdb_vx_vector_buffer duckdb_vx_vector_buffer_create(duckdb_vx_data buffer) {
1414
auto data = reinterpret_cast<vortex::CData *>(buffer);
15-
auto* shared_buffer = new duckdb::shared_ptr<vortex::ExternalVectorBuffer>(
16-
duckdb::make_shared_ptr<vortex::ExternalVectorBuffer>(unique_ptr<vortex::CData>(data))
17-
);
15+
auto *shared_buffer = new duckdb::shared_ptr<vortex::ExternalVectorBuffer>(
16+
duckdb::make_shared_ptr<vortex::ExternalVectorBuffer>(unique_ptr<vortex::CData>(data)));
1817
return reinterpret_cast<duckdb_vx_vector_buffer>(shared_buffer);
1918
}
2019

vortex-duckdb/src/duckdb/table_function/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ pub trait TableFunction: Sized + Debug {
5151
/// - j does not need to leave the table function at all.
5252
const FILTER_PRUNE: bool = false;
5353

54+
/// Maximum number of threads the table function can use.
55+
/// If not specified, DuckDB will use its default (GlobalTableFunctionState::MAX_THREADS).
56+
const MAX_THREADS: u64 = u64::MAX;
57+
5458
/// Returns the parameters of the table function.
5559
fn parameters() -> Vec<LogicalType> {
5660
// By default, we don't have any parameters.
@@ -180,6 +184,7 @@ impl Connection {
180184
filter_prune: T::FILTER_PRUNE,
181185
sampling_pushdown: false,
182186
late_materialization: false,
187+
max_threads: T::MAX_THREADS,
183188
};
184189

185190
duckdb_try!(

vortex-duckdb/src/scan.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub struct VortexBindData {
4040
column_names: Vec<String>,
4141
column_types: Vec<LogicalType>,
4242
runtime: CurrentThreadRuntime,
43+
max_threads: u64,
4344
}
4445

4546
impl Clone for VortexBindData {
@@ -53,6 +54,7 @@ impl Clone for VortexBindData {
5354
column_names: self.column_names.clone(),
5455
column_types: self.column_types.clone(),
5556
runtime: self.runtime.clone(),
57+
max_threads: self.max_threads,
5658
}
5759
}
5860
}
@@ -258,6 +260,7 @@ impl TableFunction for VortexTableFunction {
258260
column_names,
259261
column_types,
260262
runtime,
263+
max_threads: u64::MAX,
261264
})
262265
}
263266

0 commit comments

Comments
 (0)