Skip to content

Commit 2f9141d

Browse files
committed
fix(spark-query): recursively convert iceberg records to clj
1 parent 761c846 commit 2f9141d

File tree

2 files changed

+26
-16
lines changed

2 files changed

+26
-16
lines changed

spark-query/src/spark_query/core.clj

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
TableIdentifier]
2121
[org.apache.iceberg.data
2222
IcebergGenerics
23+
GenericRecord
2324
Record]
2425
[org.apache.iceberg.rest RESTCatalog]
2526
[org.apache.iceberg.aws.s3 S3FileIOProperties]
@@ -33,15 +34,22 @@
3334
(def PORT 8090)
3435
(def NREPL-PORT 7890)
3536

36-
(defn- record->vec
37-
[record]
38-
(let [size (.size record)
39-
columns (->> record
40-
.struct
41-
.fields
42-
(mapv #(.name %)))
43-
values (mapv #(.get record %) (range size))]
44-
(zipmap columns values)))
37+
(defn record->clj
38+
"Converts Iceberg records to Clojure data types"
39+
[x]
40+
(cond
41+
(instance? GenericRecord x)
42+
(let [size (.size x)
43+
columns (->> x
44+
.struct
45+
.fields
46+
(mapv #(.name %)))
47+
values (mapv #(-> x (.get %) record->clj) (range size))]
48+
(zipmap columns values))
49+
50+
(nil? x) x
51+
(and (seqable? x) (not (string? x))) (mapv record->clj x)
52+
:else x))
4553

4654
(defn open-catalog
4755
[]
@@ -81,11 +89,13 @@
8189
catalog)))
8290

8391
(defn load-table
84-
[catalog ns table]
85-
(let [ns-id (Namespace/of (into-array String [ns]))
86-
table-id (TableIdentifier/of ns-id table)
87-
table (.loadTable catalog table-id)]
88-
table))
92+
([ns table]
93+
(load-table (get-catalog) ns table))
94+
([catalog ns table]
95+
(let [ns-id (Namespace/of (into-array String [ns]))
96+
table-id (TableIdentifier/of ns-id table)
97+
table (.loadTable catalog table-id)]
98+
table)))
8999

90100
(defn scan-table
91101
[table]
@@ -134,7 +144,7 @@
134144
(let [table (load-table (get-catalog) ns-in table-in)
135145
rows (scan-table table)
136146
response-body (->> rows
137-
(mapv record->vec)
147+
(mapv record->clj)
138148
json/write-str)]
139149
{:body response-body}))
140150

spark-query/vsn

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.0.6
1+
1.0.7

0 commit comments

Comments
 (0)