@@ -18,11 +18,11 @@ type ParquetModule struct {
18
18
19
19
type ParquetTable struct {
20
20
mmap mmap.MMap
21
- column map [int ]string
21
+ column map [int ]parquetColumn
22
22
}
23
23
24
24
type ParquetCursor struct {
25
- column map [int ]string
25
+ column map [int ]parquetColumn
26
26
reader * parquet.GenericReader [any ]
27
27
rowBuffer * deque.Deque [map [string ]interface {}]
28
28
rowID int64
@@ -34,6 +34,12 @@ type ParquetCursor struct {
34
34
noMoreRows bool
35
35
}
36
36
37
+ type parquetColumn struct {
38
+ Name string
39
+ Type string
40
+ SubFields map [string ]parquetColumn
41
+ }
42
+
37
43
const rowToRequestPerBatch = 16
38
44
39
45
func (m * ParquetModule ) Create (c * sqlite3.SQLiteConn , args []string ) (sqlite3.VTab , error ) {
@@ -69,7 +75,7 @@ func (m *ParquetModule) Connect(c *sqlite3.SQLiteConn, args []string) (sqlite3.V
69
75
}
70
76
71
77
// Open the file
72
- mmap := mmap.MMap {}
78
+ var mmap mmap.MMap
73
79
var err error
74
80
75
81
mmap , err = openMmapedFile (fileName )
@@ -82,7 +88,7 @@ func (m *ParquetModule) Connect(c *sqlite3.SQLiteConn, args []string) (sqlite3.V
82
88
// Read the parquet file
83
89
reader := parquet.NewGenericReader [any ](byteReader )
84
90
85
- column := make (map [int ]string )
91
+ column := make (map [int ]parquetColumn )
86
92
87
93
sqlSchema := strings.Builder {}
88
94
sqlSchema .WriteString ("CREATE TABLE parquet (" )
@@ -106,9 +112,28 @@ func (m *ParquetModule) Connect(c *sqlite3.SQLiteConn, args []string) (sqlite3.V
106
112
default :
107
113
sqlSchema .WriteString ("TEXT" )
108
114
}
115
+
109
116
// Save the column name
110
- column [i ] = field .Name ()
117
+ col := parquetColumn {
118
+ Name : field .Name (),
119
+ Type : field .Type ().String (),
120
+ }
121
+
122
+ // Get subfields if the field is a group
123
+ if field .Type ().String () == "group" {
124
+ col .SubFields = make (map [string ]parquetColumn )
125
+ for _ , subField := range field .Fields () {
126
+ col .SubFields [subField .Name ()] = parquetColumn {
127
+ Name : subField .Name (),
128
+ Type : subField .Type ().String (),
129
+ }
130
+ }
131
+ }
132
+
133
+ // Save the column in the map
134
+ column [i ] = col
111
135
}
136
+
112
137
sqlSchema .WriteString (");" )
113
138
c .DeclareVTab (sqlSchema .String ())
114
139
@@ -118,6 +143,7 @@ func (m *ParquetModule) Connect(c *sqlite3.SQLiteConn, args []string) (sqlite3.V
118
143
func (t * ParquetTable ) Open () (sqlite3.VTabCursor , error ) {
119
144
// Create a new reader
120
145
reader := parquet.NewGenericReader [any ](bytes .NewReader (t .mmap ))
146
+
121
147
return & ParquetCursor {
122
148
column : t .column ,
123
149
reader : reader ,
@@ -195,7 +221,7 @@ func (t *ParquetCursor) Column(context *sqlite3.SQLiteContext, col int) error {
195
221
context .ResultNull ()
196
222
return nil
197
223
}
198
- val , ok := t .rowBuffer .Front ()[colName ]
224
+ val , ok := t .rowBuffer .Front ()[colName . Name ]
199
225
if ! ok {
200
226
context .ResultNull ()
201
227
return nil
@@ -225,10 +251,36 @@ func (t *ParquetCursor) Column(context *sqlite3.SQLiteContext, col int) error {
225
251
case float64 :
226
252
context .ResultDouble (valParsed )
227
253
case string :
228
- context .ResultText (valParsed )
254
+ // parquet-go returns BYTE_ARRAY as string resulting in UTF-8 issues
255
+ // When we detect a column that has a BYTE_ARRAY type, we will convert it to a byte slice
256
+ if colName .Type == "BYTE_ARRAY" || colName .Type == "FIXED_LEN_BYTE_ARRAY" {
257
+ // Convert the string to a byte slice
258
+ context .ResultBlob ([]byte (valParsed ))
259
+ } else {
260
+ context .ResultText (valParsed )
261
+ }
229
262
case []byte :
230
263
context .ResultBlob (valParsed )
231
264
case map [string ]interface {}:
265
+ for key , value := range valParsed {
266
+
267
+ // Get the subfield type if it exists
268
+ subFieldType , ok := colName .SubFields [key ]
269
+ if ! ok {
270
+ continue
271
+ }
272
+
273
+ // Same as the string case, we need to handle BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY
274
+ if subFieldType .Type == "BYTE_ARRAY" || subFieldType .Type == "FIXED_LEN_BYTE_ARRAY" {
275
+ if strValue , ok := value .(string ); ok {
276
+ valParsed [key ] = []byte (strValue )
277
+ } else if byteValue , ok := value .([]byte ); ok {
278
+ valParsed [key ] = byteValue
279
+ } else {
280
+ valParsed [key ] = value
281
+ }
282
+ }
283
+ }
232
284
marshaled , err := json .Marshal (valParsed )
233
285
if err != nil {
234
286
context .ResultNull ()
0 commit comments