forked from marcboeker/go-duckdb
-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy patharrow.go
More file actions
303 lines (257 loc) · 6.87 KB
/
arrow.go
File metadata and controls
303 lines (257 loc) · 6.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
//go:build duckdb_arrow
package duckdb
/*
#include <stdlib.h>
#include <stdint.h>
#ifndef ARROW_C_DATA_INTERFACE
#define ARROW_C_DATA_INTERFACE
#define ARROW_FLAG_DICTIONARY_ORDERED 1
#define ARROW_FLAG_NULLABLE 2
#define ARROW_FLAG_MAP_KEYS_SORTED 4
struct ArrowSchema {
// Array type description
const char* format;
const char* name;
const char* metadata;
int64_t flags;
int64_t n_children;
struct ArrowSchema** children;
struct ArrowSchema* dictionary;
// Release callback
void (*release)(struct ArrowSchema*);
// Opaque producer-specific data
void* private_data;
};
struct ArrowArray {
// Array data description
int64_t length;
int64_t null_count;
int64_t offset;
int64_t n_buffers;
int64_t n_children;
const void** buffers;
struct ArrowArray** children;
struct ArrowArray* dictionary;
// Release callback
void (*release)(struct ArrowArray*);
// Opaque producer-specific data
void* private_data;
};
struct ArrowArrayStream {
void (*get_schema)(struct ArrowArrayStream*);
void (*get_next)(struct ArrowArrayStream*);
void (*get_last_error)(struct ArrowArrayStream*);
void (*release)(struct ArrowArrayStream*);
void* private_data;
};
#endif // ARROW_C_DATA_INTERFACE
*/
import "C"
import (
"context"
"database/sql/driver"
"errors"
"fmt"
"sync"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/cdata"
"github.com/duckdb/duckdb-go/v2/arrowmapping"
"github.com/duckdb/duckdb-go/v2/mapping"
)
// Arrow exposes DuckDB Apache Arrow interface.
// https://duckdb.org/docs/api/c/api#arrow-interface
type Arrow struct {
conn *Conn
}
// NewArrowFromConn returns a new Arrow from a DuckDB driver connection.
func NewArrowFromConn(driverConn driver.Conn) (*Arrow, error) {
conn, ok := driverConn.(*Conn)
if !ok {
return nil, fmt.Errorf("not a duckdb driver connection")
}
if conn.closed {
return nil, errClosedCon
}
return &Arrow{conn: conn}, nil
}
// QueryContext prepares statements, executes them, and returns an Apache Arrow array.RecordReader as a result of the last
// executed statement. Arguments are bound to the last statement.
func (a *Arrow) QueryContext(ctx context.Context, query string, args ...any) (array.RecordReader, error) {
if a.conn.closed {
return nil, errClosedCon
}
r, err := a.conn.QueryContext(ctx, query, a.anyArgsToNamedArgs(args))
if err != nil {
if r != nil {
err = errors.Join(err, r.Close())
}
return nil, err
}
return recordReaderFromRows(ctx, r)
}
var errArrowScan = errors.New("could not register arrow view due to arrow scan API failure")
// RegisterView registers an Arrow record reader as a view with the given name in DuckDB.
// The returned release function must be called to release the memory once the view is no longer needed.
func (a *Arrow) RegisterView(reader array.RecordReader, name string) (release func(), err error) {
if a.conn.closed {
return nil, errClosedCon
}
stream := C.calloc(1, C.sizeof_struct_ArrowArrayStream)
release = func() {
cdata.ReleaseCArrowArrayStream((*cdata.CArrowArrayStream)(stream))
C.free(stream)
}
cdata.ExportRecordReader(reader, (*cdata.CArrowArrayStream)(stream))
arrowStream := arrowmapping.ArrowStream{
Ptr: unsafe.Pointer(stream),
}
if arrowmapping.ArrowScan(a.conn.conn, name, arrowStream) == mapping.StateError {
release()
return nil, errArrowScan
}
return release, nil
}
func (a *Arrow) anyArgsToNamedArgs(args []any) []driver.NamedValue {
if len(args) == 0 {
return nil
}
values := make([]driver.Value, len(args))
for i, arg := range args {
values[i] = arg
}
return argsToNamedArgs(values)
}
var _ array.RecordReader = (*recordReader)(nil)
type recordReader struct {
ctx context.Context
res mapping.Result
opts arrowmapping.ArrowOptions
schema *arrow.Schema
rows *rows
mu sync.Mutex // protects err and current
refCount int64
closed bool // tracks if the reader has been closed/released
current arrow.RecordBatch
err error
}
func recordReaderFromRows(ctx context.Context, from driver.Rows) (array.RecordReader, error) {
rr, ok := from.(*rows)
if !ok {
return nil, fmt.Errorf("not a duckdb rows")
}
if rr.stmt == nil || rr.stmt.closed || rr.stmt.conn == nil || rr.stmt.conn.closed {
return nil, errClosedCon
}
if rr.rowCount != 0 {
return nil, fmt.Errorf("cannot convert duckdb rows to arrow reader after reading has started")
}
// arrow options
arrowOptions := arrowmapping.ResultGetArrowOptions(&rr.res)
// get arrow schema
cc := mapping.ColumnCount(&rr.res)
names := make([]string, cc)
types := make([]mapping.LogicalType, cc)
for i := range cc {
names[i] = mapping.ColumnName(&rr.res, i)
types[i] = mapping.ColumnLogicalType(&rr.res, i)
}
defer func() {
for i := range cc {
mapping.DestroyLogicalType(&types[i])
}
}()
schema, ed := arrowmapping.NewArrowSchema(arrowOptions, types, names)
if err := errorDataError(ed); err != nil {
defer arrowmapping.DestroyArrowOptions(&arrowOptions)
return nil, fmt.Errorf("failed to create arrow schema: %w", err)
}
return &recordReader{
ctx: ctx,
res: rr.res,
opts: arrowOptions,
schema: schema,
rows: rr,
refCount: 1,
}, nil
}
func (r *recordReader) Retain() {
r.mu.Lock()
defer r.mu.Unlock()
if r.err != nil || r.closed {
return // Do not increase refCount if there is an error or if closed.
}
r.refCount++
}
func (r *recordReader) Release() {
r.mu.Lock()
defer r.mu.Unlock()
if r.refCount <= 0 {
return // Do not release if refCount is already zero.
}
r.refCount--
if r.refCount != 0 {
return // Do not release if there are still references.
}
// If this is the last reference, we need to clean up.
r.closed = true
if r.current != nil {
r.current.Release()
r.current = nil
}
arrowmapping.DestroyArrowOptions(&r.opts)
r.err = r.rows.Close()
}
func (r *recordReader) Schema() *arrow.Schema {
return r.schema
}
func (r *recordReader) Next() bool {
r.mu.Lock()
defer r.mu.Unlock()
if r.current != nil {
r.current.Release()
r.current = nil
}
if r.closed {
r.err = errors.New("arrow reader has been closed")
return false
}
if r.err != nil {
return false
}
select {
case <-r.ctx.Done():
r.err = r.ctx.Err()
return false
default:
chunk := mapping.FetchChunk(r.res)
if chunk.Ptr == nil {
return false
}
defer mapping.DestroyDataChunk(&chunk)
rec, ed := arrowmapping.DataChunkToArrowArray(r.opts, r.schema, chunk)
if err := errorDataError(ed); err != nil {
r.err = fmt.Errorf("failed to create arrow array: %w", err)
return false
}
r.current = rec
return true
}
}
func (r *recordReader) Record() arrow.RecordBatch {
return r.RecordBatch()
}
func (r *recordReader) RecordBatch() arrow.RecordBatch {
r.mu.Lock()
defer r.mu.Unlock()
if r.err != nil {
return nil
}
return r.current
}
func (r *recordReader) Err() error {
r.mu.Lock()
defer r.mu.Unlock()
return r.err
}