Skip to content

Commit 5bcd6b9

Browse files
authored
Merge pull request #15 from thalesmg/20250610-read-parquet
feat(spark-query): add `/read-parquet` endpoint
2 parents 03e52dc + e424605 commit 5bcd6b9

File tree

3 files changed

+99
-3
lines changed

3 files changed

+99
-3
lines changed

spark-query/deps.edn

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77
org.slf4j/slf4j-api {:mvn/version "2.0.17"}
88
com.taoensso/telemere-slf4j {:mvn/version "1.0.1"}
99

10+
org.apache.parquet/parquet-common {:mvn/version "1.15.2"}
11+
org.apache.parquet/parquet-column {:mvn/version "1.15.2"}
12+
org.apache.parquet/parquet-hadoop {:mvn/version "1.15.2"}
13+
org.apache.parquet/parquet-avro {:mvn/version "1.15.2"}
14+
1015
nrepl/nrepl {:mvn/version "1.3.1"}
1116
cider/cider-nrepl {:mvn/version "0.55.1"}
1217
com.bhauman/rebel-readline-cljs {:mvn/version "0.1.5"}

spark-query/src/spark_query/core.clj

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@
2424
[org.apache.iceberg.rest RESTCatalog]
2525
[org.apache.iceberg.aws.s3 S3FileIOProperties]
2626
[org.apache.hadoop.conf Configuration]
27-
[org.apache.spark.sql SparkSession])
27+
[org.apache.spark.sql SparkSession]
28+
(org.apache.parquet.avro AvroParquetReader)
29+
(org.apache.parquet.conf PlainParquetConfiguration)
30+
(org.apache.parquet.io SeekableInputStream InputFile))
2831
(:gen-class))
2932

3033
(def PORT 8090)
@@ -159,10 +162,98 @@
159162
json/write-str)]
160163
{:body response-body}))
161164

165+
(defn avro->json
166+
[avro]
167+
(-> avro
168+
.toString
169+
json/read-str))
170+
171+
(defn new-seekable-input-stream
172+
[in-ba]
173+
(let [pos (atom 0)]
174+
(letfn [(read1-byte []
175+
(let [x (aget in-ba @pos)]
176+
(swap! pos inc)
177+
(byte x)))
178+
(read1 []
179+
(let [x (read1-byte)
180+
x-u (bit-and x 0xff)]
181+
(int x-u)))
182+
(read-array [out-array]
183+
(let [to-read (alength out-array)]
184+
(doseq [i (range to-read)]
185+
(let [x (read1-byte)]
186+
(aset-byte out-array i x)))))]
187+
(proxy [SeekableInputStream] []
188+
(read
189+
([]
190+
(read1))
191+
([byte-buffer]
192+
:todo))
193+
(getPos []
194+
@pos)
195+
(seek [new-pos]
196+
(reset! pos new-pos))
197+
(readFully
198+
([out-array]
199+
(if (bytes? out-array)
200+
(read-array out-array)
201+
;; java.nio.ByteBuffer
202+
(let [to-read (.remaining out-array)
203+
tmp (byte-array to-read (byte 0))]
204+
(read-array tmp)
205+
(.put out-array
206+
tmp
207+
(+ (.position out-array) (.arrayOffset out-array))
208+
(.remaining out-array)))))
209+
([out-array start len]
210+
(doseq [i (range len)]
211+
(let [x (read1-byte)]
212+
(aset-byte out-array (+ start i) x)))))))))
213+
214+
(defn new-mem-input-file
215+
[in-ba]
216+
(proxy [InputFile] []
217+
(getLength []
218+
(alength in-ba))
219+
(newStream []
220+
(new-seekable-input-stream in-ba))))
221+
222+
(defn read-parquet-avro
223+
[input-file]
224+
(with-open [reader (AvroParquetReader/genericRecordReader
225+
input-file
226+
(PlainParquetConfiguration.
227+
{"parquet.avro.readInt96AsFixed" "true"}))]
228+
(loop [record (.read reader)
229+
acc []]
230+
(if record
231+
(recur (.read reader)
232+
(->> record
233+
avro->json
234+
(conj acc)))
235+
acc))))
236+
237+
(defn read-parquet
238+
[data-raw]
239+
(with-open [in (io/input-stream data-raw)
240+
out (java.io.ByteArrayOutputStream.)]
241+
(io/copy in out)
242+
(-> out
243+
.toByteArray
244+
new-mem-input-file
245+
read-parquet-avro)))
246+
247+
(defn handle-read-parquet
248+
[request]
249+
(let [result (read-parquet (:body request))]
250+
{:body (json/write-str result)}))
251+
162252
(defroutes app-routes
163253
(GET "/scan/:ns/:table" [ns table] (handle-scan-table ns table))
164254
(GET "/partitions/:ns/:table" [ns table] (handle-table-partitions ns table))
165-
(POST "/sql" request (handle-spark-sql request)))
255+
(POST "/sql" request (handle-spark-sql request))
256+
(POST "/read-parquet" request (handle-read-parquet request)))
166257

167258
(defn- block-forever
168259
[]

spark-query/vsn

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.0.5
1+
1.0.6

0 commit comments

Comments
 (0)