Skip to content

Commit 48a38a8

Browse files
Support streaming write of files duckdb ext (#3227)
1 parent 228bf14 commit 48a38a8

File tree

11 files changed

+305
-163
lines changed

11 files changed

+305
-163
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

duckdb-vortex/src/include/vortex_common.hpp

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,40 @@
66

77
#include <duckdb/common/unique_ptr.hpp>
88

9+
struct DType {
10+
explicit DType(vx_dtype *dtype): dtype(dtype) {}
11+
12+
static duckdb::unique_ptr<DType> FromDuckDBTable(
13+
const std::vector<duckdb_logical_type> &column_types,
14+
const std::vector<unsigned char> &column_nullable,
15+
const std::vector<const char *> &column_names
16+
) {
17+
D_ASSERT(column_names.size() == column_nullable.size());
18+
D_ASSERT(column_names.size() == column_types.size());
19+
20+
vx_error *error = nullptr;
21+
auto dtype = vx_duckdb_logical_type_to_dtype(
22+
column_types.data(),
23+
column_nullable.data(),
24+
column_names.data(),
25+
column_names.size(),
26+
&error
27+
);
28+
HandleError(error);
29+
30+
return duckdb::make_uniq<DType>(dtype);
31+
}
32+
33+
34+
~DType() {
35+
if (dtype != nullptr) {
36+
vx_dtype_free(dtype);
37+
}
38+
}
39+
40+
vx_dtype *dtype;
41+
};
42+
943
struct VortexConversionCache {
1044
explicit VortexConversionCache(const unsigned long cache_id) : cache(vx_conversion_cache_create(cache_id)) {
1145
}
@@ -32,6 +66,10 @@ struct VortexFileReader {
3266
return duckdb::make_uniq<VortexFileReader>(file);
3367
}
3468

69+
struct DType DType() {
70+
return ::DType(vx_file_dtype(file));
71+
}
72+
3573
vx_file_reader *file;
3674
};
3775

@@ -45,6 +83,13 @@ struct VortexArray {
4583
}
4684
}
4785

86+
static duckdb::unique_ptr<VortexArray> FromDuckDBChunk(DType &dtype, duckdb::DataChunk &chunk) {
87+
vx_error *error;
88+
auto array = vx_duckdb_chunk_to_array(reinterpret_cast<duckdb_data_chunk>(&chunk), dtype.dtype, &error);
89+
HandleError(error);
90+
return duckdb::make_uniq<VortexArray>(array);
91+
}
92+
4893
idx_t ToDuckDBVector(idx_t current_row, duckdb_data_chunk output, const VortexConversionCache *cache) const {
4994
vx_error *error;
5095
auto idx = vx_array_to_duckdb_chunk(array, current_row, output, cache->cache, &error);
@@ -75,3 +120,40 @@ struct VortexArrayStream {
75120

76121
vx_array_stream *array_stream;
77122
};
123+
124+
struct ArrayStreamSink {
125+
explicit ArrayStreamSink(vx_array_sink *sink, duckdb::unique_ptr<DType> dtype) : sink(sink), dtype(std::move(dtype)) {
126+
}
127+
128+
static duckdb::unique_ptr<ArrayStreamSink> Create(std::string file_path, duckdb::unique_ptr<DType> &&dtype) {
129+
vx_error *error = nullptr;
130+
auto sink = vx_array_sink_open_file(file_path.c_str(), dtype->dtype, &error);
131+
HandleError(error);
132+
133+
return duckdb::make_uniq<ArrayStreamSink>(sink, std::move(dtype));
134+
}
135+
136+
void PushChunk(duckdb::DataChunk &chunk) {
137+
vx_error *error = nullptr;
138+
auto array = VortexArray::FromDuckDBChunk(*dtype, chunk);
139+
vx_array_sink_push(sink, array->array, &error);
140+
HandleError(error);
141+
}
142+
143+
void Close() {
144+
vx_error *error;
145+
vx_array_sink_close(sink, &error);
146+
HandleError(error);
147+
148+
this->sink = nullptr;
149+
}
150+
151+
~ArrayStreamSink() {
152+
// "should dctor a sink, before closing it
153+
D_ASSERT(sink == nullptr);
154+
}
155+
156+
157+
vx_array_sink *sink;
158+
duckdb::unique_ptr<DType> dtype;
159+
};

duckdb-vortex/src/vortex_scan.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,17 +129,17 @@ void CreateFilterExpression(google::protobuf::Arena &arena, vector<std::string>
129129
}
130130

131131
/// Extracts schema information from a Vortex file's data type.
132-
static void ExtractVortexSchema(const vx_dtype *file_dtype, vector<LogicalType> &column_types,
132+
static void ExtractVortexSchema(DType &file_dtype, vector<LogicalType> &column_types,
133133
vector<string> &column_names) {
134-
uint32_t field_count = vx_dtype_field_count(file_dtype);
134+
uint32_t field_count = vx_dtype_field_count(file_dtype.dtype);
135135
for (uint32_t idx = 0; idx < field_count; idx++) {
136136
char name_buffer[512];
137137
int name_len = 0;
138138

139-
vx_dtype_field_name(file_dtype, idx, name_buffer, &name_len);
139+
vx_dtype_field_name(file_dtype.dtype, idx, name_buffer, &name_len);
140140
std::string field_name(name_buffer, name_len);
141141

142-
vx_dtype *field_dtype = vx_dtype_field_dtype(file_dtype, idx);
142+
vx_dtype *field_dtype = vx_dtype_field_dtype(file_dtype.dtype, idx);
143143
vx_error *error = nullptr;
144144
auto duckdb_type = vx_dtype_to_duckdb_logical_type(field_dtype, &error);
145145
HandleError(error);
@@ -179,8 +179,8 @@ static unique_ptr<VortexFileReader> OpenFile(const std::string &filename, vector
179179
}
180180

181181
// This Ptr is owned by the file
182-
const vx_dtype *file_dtype = vx_file_dtype(file->file);
183-
if (vx_dtype_get(file_dtype) != DTYPE_STRUCT) {
182+
auto file_dtype = file->DType();
183+
if (vx_dtype_get(file_dtype.dtype) != DTYPE_STRUCT) {
184184
vx_file_reader_free(file->file);
185185
throw FatalException("Vortex file does not contain a struct array as a top-level dtype");
186186
}

duckdb-vortex/src/vortex_write.cpp

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include "duckdb/function/copy_function.hpp"
1111
#include "duckdb/parser/constraints/not_null_constraint.hpp"
1212

13+
// TODO(joe): enable multi-threaded writes, see `VortexWriteSink`.
14+
1315
namespace duckdb {
1416

1517
struct VortexWriteBindData : public TableFunctionData {
@@ -21,9 +23,7 @@ struct VortexWriteBindData : public TableFunctionData {
2123
};
2224

2325
struct VortexWriteGlobalData : public GlobalFunctionData {
24-
std::string file_name;
25-
std::unique_ptr<VortexFileReader> file;
26-
unique_ptr<VortexArray> array;
26+
unique_ptr<ArrayStreamSink> sink;
2727
};
2828

2929
struct VortexWriteLocalData : public LocalFunctionData {};
@@ -33,16 +33,12 @@ void VortexWriteSink(ExecutionContext &context, FunctionData &bind_data, GlobalF
3333
auto &global_state = gstate.Cast<VortexWriteGlobalData>();
3434
auto bind = bind_data.Cast<VortexWriteBindData>();
3535

36-
auto chunk = DataChunk();
37-
chunk.Initialize(Allocator::Get(context.client), bind.sql_types);
38-
3936
for (auto i = 0u; i < input.ColumnCount(); i++) {
4037
input.data[i].Flatten(input.size());
4138
}
42-
43-
auto new_array = vx_array_append_duckdb_chunk(
44-
global_state.array->array, reinterpret_cast<duckdb_data_chunk>(&input), bind.column_nullable.data());
45-
global_state.array = make_uniq<VortexArray>(new_array);
39+
// TODO(joe): go to a model of combining local chunked into arrays of a specific size
40+
// before push each of these larger chunks into the global_state
41+
global_state.sink->PushChunk(input);
4642
}
4743

4844
std::vector<idx_t> TableNullability(ClientContext &context, const string &catalog_name, const string &schema,
@@ -92,7 +88,6 @@ void RegisterVortexWriteFunction(DatabaseInstance &instance) {
9288
const string &file_path) -> unique_ptr<GlobalFunctionData> {
9389
auto &bind = bind_data.Cast<VortexWriteBindData>();
9490
auto gstate = make_uniq<VortexWriteGlobalData>();
95-
gstate->file_name = file_path;
9691

9792
auto column_names = std::vector<const char *>();
9893
for (const auto &col_id : bind.column_names) {
@@ -104,12 +99,8 @@ void RegisterVortexWriteFunction(DatabaseInstance &instance) {
10499
column_types.push_back(reinterpret_cast<duckdb_logical_type>(&col_type));
105100
}
106101

107-
vx_error *error = nullptr;
108-
auto array = vx_array_create_empty_from_duckdb_table(column_types.data(), bind.column_nullable.data(),
109-
column_names.data(), column_names.size(), &error);
110-
HandleError(error);
111-
112-
gstate->array = make_uniq<VortexArray>(array);
102+
auto dtype = DType::FromDuckDBTable(column_types, bind.column_nullable, column_names);
103+
gstate->sink = ArrayStreamSink::Create(file_path, std::move(dtype));
113104
return std::move(gstate);
114105
};
115106
function.copy_to_initialize_local = [](ExecutionContext &context,
@@ -119,9 +110,7 @@ void RegisterVortexWriteFunction(DatabaseInstance &instance) {
119110
function.copy_to_sink = VortexWriteSink;
120111
function.copy_to_finalize = [](ClientContext &context, FunctionData &bind_data, GlobalFunctionData &gstate) {
121112
auto &global_state = gstate.Cast<VortexWriteGlobalData>();
122-
vx_error *error;
123-
vx_file_write_array(global_state.file_name.c_str(), global_state.array->array, &error);
124-
HandleError(error);
113+
global_state.sink->Close();
125114
};
126115
function.execution_mode = [](bool preserve_insertion_order,
127116
bool supports_batch_index) -> CopyFunctionExecutionMode {

vortex-ffi/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ paste = { workspace = true }
2727
prost = { workspace = true }
2828
simplelog = { workspace = true }
2929
tokio = { workspace = true, features = ["rt-multi-thread"] }
30+
tokio-stream = { workspace = true }
3031
url = { workspace = true, features = [] }
3132
vortex = { workspace = true, features = ["object_store", "proto"] }
3233
vortex-duckdb = { workspace = true, optional = true }

vortex-ffi/cinclude/vortex.h

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ typedef struct vx_dtype vx_dtype;
9494
*/
9595
typedef struct vx_array vx_array;
9696

97+
/**
98+
* The `sink` interface is used to collect array chunks and place them into a resource
99+
* (e.g. an array stream or file (`vx_array_sink_open_file`)).
100+
*/
101+
typedef struct vx_array_sink vx_array_sink;
102+
97103
/**
98104
* FFI-exposed stream interface.
99105
*/
@@ -284,6 +290,17 @@ duckdb_logical_type vx_dtype_to_duckdb_logical_type(struct vx_dtype *dtype,
284290
struct vx_error **error);
285291
#endif
286292

293+
#if defined(ENABLE_DUCKDB_FFI)
294+
/**
295+
* Converts a DuckDB type into a vortex type
296+
*/
297+
struct vx_dtype *vx_duckdb_logical_type_to_dtype(const duckdb_logical_type *column_types,
298+
const unsigned char *column_nullable,
299+
const char *const *column_names,
300+
int column_count,
301+
struct vx_error **error);
302+
#endif
303+
287304
#if defined(ENABLE_DUCKDB_FFI)
288305
/**
289306
* Back a single chunk of the array as a duckdb data chunk.
@@ -300,22 +317,11 @@ unsigned int vx_array_to_duckdb_chunk(struct vx_array *stream,
300317

301318
#if defined(ENABLE_DUCKDB_FFI)
302319
/**
303-
* Returns an empty vortex array constructed from three arrays of len `len`, the (types, null, names).
304-
*/
305-
struct vx_array *vx_array_create_empty_from_duckdb_table(const duckdb_logical_type *type_array,
306-
const unsigned char *nullable,
307-
const char *const *names,
308-
int len,
309-
struct vx_error **error);
310-
#endif
311-
312-
#if defined(ENABLE_DUCKDB_FFI)
313-
/**
314-
* Requires a vortex array, a duckdb data chunk and a nullable array (equal to len(chunk.columns)).
320+
* Pushed a single duckdb chunk into a file sink.
315321
*/
316-
struct vx_array *vx_array_append_duckdb_chunk(struct vx_array *array,
317-
duckdb_data_chunk chunk,
318-
const unsigned char *nullable);
322+
struct vx_array *vx_duckdb_chunk_to_array(duckdb_data_chunk chunk,
323+
struct vx_dtype *dtype,
324+
struct vx_error **error);
319325
#endif
320326

321327
#if defined(ENABLE_DUCKDB_FFI)
@@ -345,19 +351,14 @@ void vx_error_free(struct vx_error *error);
345351
struct vx_file_reader *vx_file_open_reader(const struct vx_file_open_options *options,
346352
struct vx_error **error);
347353

348-
void vx_file_write_array(const char *path, struct vx_array *ffi_array, struct vx_error **error);
349-
350354
struct vx_file_statistics *vx_file_extract_statistics(struct vx_file_reader *file);
351355

352356
void vx_file_statistics_free(struct vx_file_statistics *stat);
353357

354358
/**
355-
* Get a readonly pointer to the DType of the data inside of the file.
356-
*
357-
* The pointer's lifetime is tied to the lifetime of the underlying file, so it should not be
358-
* dereferenced after the file has been freed.
359+
* Get the DType of the data inside of the file.
359360
*/
360-
const struct vx_dtype *vx_file_dtype(const struct vx_file_reader *file);
361+
struct vx_dtype *vx_file_dtype(const struct vx_file_reader *file);
361362

362363
/**
363364
* Build a new `vx_array_stream` that return a series of `vx_array`s scan over a `vx_file`.
@@ -382,6 +383,27 @@ void vx_file_reader_free(struct vx_file_reader *file);
382383
*/
383384
void vx_init_logging(enum vx_log_level level);
384385

386+
/**
387+
* Opens a writable array stream, where sink is used to push values into the stream.
388+
* To close the stream close the sink with `vx_array_sink_close`.
389+
*/
390+
struct vx_array_sink *vx_array_sink_open_file(const char *path,
391+
const struct vx_dtype *dtype,
392+
struct vx_error **error);
393+
394+
/**
395+
* Pushed a single array chunk into a file sink.
396+
*/
397+
void vx_array_sink_push(struct vx_array_sink *sink,
398+
const struct vx_array *array,
399+
struct vx_error **error);
400+
401+
/**
402+
* Closes an array sink, must be called to ensure all the values pushed to the sink are written
403+
* to the external resource.
404+
*/
405+
void vx_array_sink_close(struct vx_array_sink *sink, struct vx_error **error);
406+
385407
/**
386408
* Gets the dtype from an array `stream`, if the stream is finished the `DType` is null
387409
*/

0 commit comments

Comments
 (0)