forked from marcboeker/go-duckdb
-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy pathreplacement_scan.go
More file actions
89 lines (75 loc) · 2.54 KB
/
replacement_scan.go
File metadata and controls
89 lines (75 loc) · 2.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package duckdb
/*
void replacement_scan_callback(void *, void *, void *);
typedef void (*replacement_scan_callback_t)(void *, void *, void *);
void replacement_scan_delete_callback(void *);
typedef void (*replacement_scan_delete_callback_t)(void *);
*/
import "C"
import (
"runtime"
"runtime/cgo"
"unsafe"
"github.com/duckdb/duckdb-go/v2/mapping"
)
type ReplacementScanCallback func(tableName string) (string, []any, error)
func RegisterReplacementScan(c *Connector, callback ReplacementScanCallback) {
pinnedCallback := pinnedValue[ReplacementScanCallback]{
pinner: &runtime.Pinner{},
value: callback,
}
h := cgo.NewHandle(pinnedCallback)
pinnedCallback.pinner.Pin(&h)
callbackPtr := unsafe.Pointer(C.replacement_scan_callback_t(C.replacement_scan_callback))
deleteCallbackPtr := unsafe.Pointer(C.replacement_scan_delete_callback_t(C.replacement_scan_delete_callback))
mapping.AddReplacementScan(c.db, callbackPtr, unsafe.Pointer(&h), deleteCallbackPtr)
}
//export replacement_scan_delete_callback
func replacement_scan_delete_callback(info unsafe.Pointer) {
h := *(*cgo.Handle)(info)
h.Value().(unpinner).unpin()
h.Delete()
}
//export replacement_scan_callback
func replacement_scan_callback(infoPtr, tableNamePtr, data unsafe.Pointer) {
info := mapping.ReplacementScanInfo{Ptr: infoPtr}
tableName := C.GoString((*C.char)(tableNamePtr))
scanner := getPinned[ReplacementScanCallback](data)
functionName, params, err := scanner(tableName)
if err != nil {
mapping.ReplacementScanSetError(info, err.Error())
return
}
mapping.ReplacementScanSetFunctionName(info, functionName)
for _, param := range params {
switch paramType := param.(type) {
case string:
val := mapping.CreateVarchar(paramType)
mapping.ReplacementScanAddParameter(info, val)
mapping.DestroyValue(&val)
case int64:
val := mapping.CreateInt64(paramType)
mapping.ReplacementScanAddParameter(info, val)
mapping.DestroyValue(&val)
case []string:
// Create values and logical type.
values := make([]mapping.Value, len(paramType))
for i, v := range paramType {
values[i] = mapping.CreateVarchar(v)
}
lt := mapping.CreateLogicalType(mapping.TypeVarchar)
val := mapping.CreateListValue(lt, values)
// Add the parameter.
mapping.ReplacementScanAddParameter(info, val)
// Destroy logical type and values.
mapping.DestroyLogicalType(<)
for _, v := range values {
mapping.DestroyValue(&v)
}
mapping.DestroyValue(&val)
default:
mapping.ReplacementScanSetError(info, "unsupported type for replacement scan")
return
}
}
}