Skip to content

Commit 2d4aef5

Browse files
committed
aws: compression: Implement compression of Apache Arrow Parquet
Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent 498e75f commit 2d4aef5

File tree

2 files changed

+135
-0
lines changed

2 files changed

+135
-0
lines changed

src/aws/compression/arrow/compress.c

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
*/
99

1010
#include <arrow-glib/arrow-glib.h>
11+
#ifdef FLB_HAVE_ARROW_PARQUET
12+
#include <parquet-glib/parquet-glib.h>
13+
#endif
1114
#include <inttypes.h>
1215

1316
/*
@@ -145,3 +148,120 @@ int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_s
145148
g_bytes_unref(bytes);
146149
return 0;
147150
}
151+
152+
#ifdef FLB_HAVE_ARROW_PARQUET
153+
static GArrowResizableBuffer* table_to_parquet_buffer(GArrowTable *table)
154+
{
155+
GArrowResizableBuffer *buffer;
156+
GArrowBufferOutputStream *sink;
157+
GParquetArrowFileWriter *writer;
158+
GArrowSchema *schema;
159+
GError *error = NULL;
160+
gboolean success;
161+
162+
buffer = garrow_resizable_buffer_new(0, &error);
163+
if (buffer == NULL) {
164+
g_error_free(error);
165+
return NULL;
166+
}
167+
168+
sink = garrow_buffer_output_stream_new(buffer);
169+
if (sink == NULL) {
170+
g_object_unref(buffer);
171+
return NULL;
172+
}
173+
174+
schema = garrow_table_get_schema(table);
175+
if (schema == NULL) {
176+
g_object_unref(buffer);
177+
g_object_unref(sink);
178+
return NULL;
179+
}
180+
181+
/* Create a new Parquet file writer */
182+
writer = gparquet_arrow_file_writer_new_arrow(schema,
183+
GARROW_OUTPUT_STREAM(sink),
184+
NULL, /* Arrow writer properties */
185+
&error);
186+
g_object_unref(schema);
187+
if (writer == NULL) {
188+
g_error_free(error);
189+
g_object_unref(buffer);
190+
g_object_unref(sink);
191+
return NULL;
192+
}
193+
194+
/* Write the entire table to the Parquet file buffer */
195+
success = gparquet_arrow_file_writer_write_table(writer, table, 0, &error);
196+
if (!success) {
197+
g_error_free(error);
198+
g_object_unref(buffer);
199+
g_object_unref(sink);
200+
g_object_unref(writer);
201+
return NULL;
202+
}
203+
204+
/* Close the writer to finalize the Parquet file metadata */
205+
success = gparquet_arrow_file_writer_close(writer, &error);
206+
if (!success) {
207+
g_error_free(error);
208+
g_object_unref(buffer);
209+
g_object_unref(sink);
210+
g_object_unref(writer);
211+
return NULL;
212+
}
213+
214+
g_object_unref(sink);
215+
g_object_unref(writer);
216+
return buffer;
217+
}
218+
219+
220+
int out_s3_compress_parquet(void *json, size_t size, void **out_buf, size_t *out_size)
221+
{
222+
GArrowTable *table;
223+
GArrowResizableBuffer *buffer;
224+
GBytes *bytes;
225+
gconstpointer ptr;
226+
gsize len;
227+
uint8_t *buf;
228+
229+
table = parse_json((uint8_t *) json, size);
230+
if (table == NULL) {
231+
return -1;
232+
}
233+
234+
buffer = table_to_parquet_buffer(table);
235+
g_object_unref(table);
236+
if (buffer == NULL) {
237+
return -1;
238+
}
239+
240+
bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer));
241+
if (bytes == NULL) {
242+
g_object_unref(buffer);
243+
return -1;
244+
}
245+
246+
ptr = g_bytes_get_data(bytes, &len);
247+
if (ptr == NULL) {
248+
g_object_unref(buffer);
249+
g_bytes_unref(bytes);
250+
return -1;
251+
}
252+
253+
buf = malloc(len);
254+
if (buf == NULL) {
255+
g_object_unref(buffer);
256+
g_bytes_unref(bytes);
257+
return -1;
258+
}
259+
memcpy(buf, ptr, len);
260+
*out_buf = (void *) buf;
261+
*out_size = len;
262+
263+
g_object_unref(buffer);
264+
g_bytes_unref(bytes);
265+
return 0;
266+
}
267+
#endif

src/aws/compression/arrow/compress.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,18 @@
1111
*/
1212

1313
int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_size);
14+
15+
#ifdef FLB_HAVE_ARROW_PARQUET
16+
/*
17+
* This function converts out_s3 buffer into Apache Parquet format.
18+
*
19+
* `json` is a string that contain (concatenated) JSON objects.
20+
*
21+
* `size` is the length of the json data (excluding the trailing
22+
* null-terminator character).
23+
*
24+
* Return 0 on success (with `out_buf` and `out_size` updated),
25+
* and -1 on failure
26+
*/
27+
int out_s3_compress_parquet(void *json, size_t size, void **out_buf, size_t *out_size);
28+
#endif

0 commit comments

Comments
 (0)