Skip to content

Commit 577f103

Browse files
committed
Support for arrow's uuid extension.
1 parent 2b6dd7f commit 577f103

File tree

8 files changed

+90
-27
lines changed

8 files changed

+90
-27
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,5 @@ java_test/test.arrow
3535
java_test/simulation*
3636
.idea
3737
.calva
38-
*.iml
38+
*.iml
39+
.dir-locals.el

build.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
(defn compile [_]
1919
(b/javac {:src-dirs ["java" "java_public_api"]
2020
:class-dir class-dir
21-
:basis (b/create-basis {:project "deps.edn" :aliases [:dev-m1-mac]})
21+
:basis (b/create-basis {:project "deps.edn" :aliases [:dev-mac-m1]})
2222
:javac-opts ["-source" "8" "-target" "8" "-Xlint:unchecked"]}))
2323

2424
(defn jar [_]

deps.edn

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@
150150
}
151151
:extra-paths ["neanderthal" "test"]}
152152

153-
:dev-m1-mac
153+
:dev-mac-m1
154154
{:extra-deps
155155
{criterium/criterium {:mvn/version "0.4.5"}
156156
com.cognitect/transit-clj {:mvn/version "1.0.333"}

scripts/arrow_uuid.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import pyarrow as pa
2+
import uuid as uuid
3+
4+
schema = pa.schema([pa.field('id', pa.uuid())])
5+
data = [uuid.UUID("8be643d6-0df7-4e5e-837c-f94170c87914").bytes,
6+
uuid.UUID("24bc9cf4-e2e8-444f-bb2d-82394f33ff76").bytes,
7+
uuid.UUID("e8149e1b-aef6-4671-b1b4-3b7a21eed92a").bytes]
8+
9+
with pa.OSFile('test/data/uuid_ext.arrow', 'wb') as sink:
10+
with pa.ipc.new_file(sink, schema=schema) as writer:
11+
batch = pa.record_batch([data], schema=schema)
12+
writer.write(batch)

scripts/run-tests-m1

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#!/bin/bash
22

33
scripts/compile
4-
clojure -X:dev-m1-mac:codegen
5-
clojure -M:dev-m1-mac:test
4+
clojure -X:dev-mac-m1:codegen
5+
clojure -M:dev-mac-m1:test

src/tech/v3/libs/arrow.clj

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@
119119
[org.apache.arrow.vector.types.pojo Field Schema ArrowType$Int
120120
ArrowType$Utf8 ArrowType$Timestamp ArrowType$Time DictionaryEncoding FieldType
121121
ArrowType$FloatingPoint ArrowType$Bool ArrowType$Date ArrowType$Duration
122-
ArrowType$LargeUtf8 ArrowType$Null ArrowType$List ArrowType$Binary]
122+
ArrowType$LargeUtf8 ArrowType$Null ArrowType$List ArrowType$Binary ArrowType$FixedSizeBinary]
123123
[org.apache.arrow.flatbuf CompressionType]
124124
[org.apache.arrow.vector.types MetadataVersion]
125125
[org.apache.arrow.vector.ipc WriteChannel]
@@ -132,7 +132,7 @@
132132
[java.io OutputStream InputStream ByteArrayOutputStream ByteArrayInputStream]
133133
[java.nio ByteBuffer ByteOrder ShortBuffer IntBuffer LongBuffer DoubleBuffer
134134
FloatBuffer]
135-
[java.util List ArrayList Map HashMap Map$Entry Iterator Set]
135+
[java.util List ArrayList Map HashMap Map$Entry Iterator Set UUID]
136136
[java.util.concurrent ForkJoinTask]
137137
[java.time ZoneId]
138138
[java.nio.channels WritableByteChannel]
@@ -435,6 +435,9 @@ Dependent block frames are not supported!!")
435435
(datafy [this] {:id (.getId this)
436436
:ordered? (.isOrdered this)
437437
:index-type (datafy (.getIndexType this))})
438+
ArrowType$FixedSizeBinary
439+
(datafy [this] {:datatype :fixed-size-binary
440+
:byte-width (.getByteWidth this)})
438441
ArrowType$List
439442
(datafy [this]
440443
{:datatype :list}))
@@ -710,10 +713,7 @@ Dependent block frames are not supported!!")
710713
(ft-fn (ArrowType$Utf8.) encoding)
711714
;;If no encoding is provided then just save the string as text
712715
(ft-fn (ArrowType$Utf8.)))
713-
:uuid (do
714-
(when (== 1 (long (swap! uuid-warn-counter inc)))
715-
(log/warn "Columns of type UUID are converted to type text when serializing to Arrow"))
716-
(ft-fn (ArrowType$Utf8.)))
716+
:uuid (ft-fn (ArrowType$FixedSizeBinary. 16))
717717
:text (ft-fn (ArrowType$Utf8.))
718718
:encoded-text (ft-fn (ArrowType$Utf8.))))))
719719

@@ -722,6 +722,9 @@ Dependent block frames are not supported!!")
722722
^Field [dictionaries {strings-as-text? :strings-as-text?}
723723
col]
724724
(let [colmeta (meta col)
725+
colmeta (if (identical? :uuid (get colmeta :datatype))
726+
(assoc colmeta ARROW_EXTENSION_NAME ARROW_UUID_NAME)
727+
colmeta)
725728
nullable? (boolean
726729
(or (:nullable? colmeta)
727730
(not (empty? (ds-proto/missing col)))))
@@ -1225,6 +1228,18 @@ Dependent block frames are not supported!!")
12251228
(throw (Exception. "Numeric buffer missing concrete representation"))))])
12261229
(case col-dt
12271230
:boolean [(boolean-bytes cbuf)]
1231+
:uuid (let [data (byte-array (* 16 (dtype/ecount cbuf)))
1232+
wbuf (-> (java.nio.ByteBuffer/wrap data)
1233+
(.order java.nio.ByteOrder/BIG_ENDIAN))]
1234+
(reduce (fn [_ ^UUID v]
1235+
(if v
1236+
(do
1237+
(.putLong wbuf (.getMostSignificantBits v))
1238+
(.putLong wbuf (.getLeastSignificantBits v)))
1239+
(do
1240+
(.putLong wbuf 0) (.putLong wbuf 0))))
1241+
nil cbuf)
1242+
[(java.nio.ByteBuffer/wrap data)])
12281243
:string (let [str-t (ds-base/ensure-column-string-table col)
12291244
indices (dtype-proto/->array-buffer (str-table/indices str-t))]
12301245
[(nio-buffer/as-nio-buffer indices)])
@@ -1633,8 +1648,45 @@ Dependent block frames are not supported!!")
16331648
(field-metadata field)
16341649
(node-buf->missing node validity-buf))))))
16351650

1636-
(defmethod ^:private preparse-field :default
1651+
(def ^{:private true
1652+
:tag String} ARROW_EXTENSION_NAME "ARROW:extension:name")
1653+
(def ^{:private true
1654+
:tag String} ARROW_UUID_NAME "arrow.uuid")
1655+
1656+
(defmethod ^:private preparse-field :fixed-size-binary
16371657
[field ^Iterator node-iter ^Iterator buf-iter dict-map options]
1658+
(let [node (.next node-iter)
1659+
buffers [(.next buf-iter) (.next buf-iter)]
1660+
n-elems (long (:n-elems node))
1661+
field-width (long (get-in field [:field-type :byte-width]))]
1662+
(fn parse-fixed-binary-field
1663+
[decompressor]
1664+
(let [[validity-buf data-buf] (decompressor buffers)
1665+
^bytes data-ary (if (instance? NativeBuffer data-buf)
1666+
(native-buffer/->jvm-array data-buf 0 (dtype/ecount data-buf))
1667+
(dtype/->array data-buf))
1668+
fm (field-metadata field)]
1669+
(col-impl/new-column
1670+
(:name field)
1671+
(if (= ARROW_UUID_NAME (get fm ARROW_EXTENSION_NAME))
1672+
(let [longsdata (-> (java.nio.ByteBuffer/wrap data-ary)
1673+
(.order (java.nio.ByteOrder/BIG_ENDIAN)))]
1674+
(println "is uuid")
1675+
(dtype/make-reader :uuid n-elems
1676+
(let [lidx (* idx 16)]
1677+
(java.util.UUID. (.getLong longsdata lidx)
1678+
(.getLong longsdata (+ lidx 8))))))
1679+
(let [ll (ArrayLists/toList data-ary)]
1680+
(println "is obj")
1681+
(dtype/make-reader :object n-elems
1682+
(let [lidx (* idx field-width)]
1683+
(.subList ll lidx (+ lidx field-width))))))
1684+
fm
1685+
(node-buf->missing node validity-buf))))))
1686+
1687+
1688+
(defmethod ^:private preparse-field :default
1689+
[field ^Iterator node-iter ^Iterator buf-iter dict-map options]
16381690
(assert (= 0 (count (:children field)))
16391691
(format "Field %s cannot be parsed with default parser" field))
16401692
(let [field-dtype (get-in field [:field-type :datatype])
@@ -2094,16 +2146,7 @@ Please use stream->dataset-seq.")))
20942146
;;datatypes
20952147
(reduce
20962148
(fn [ds col]
2097-
(cond
2098-
(= :uuid (dtype/elemwise-datatype col))
2099-
(let [missing (ds-proto/missing col)
2100-
metadata (meta col)]
2101-
(assoc ds (metadata :name)
2102-
#:tech.v3.dataset{:data (mapv (comp #(Text. %) str) col)
2103-
:missing missing
2104-
:metadata metadata
2105-
:name (metadata :name)}))
2106-
(and (= :string (dtype/elemwise-datatype col))
2149+
(if (and (= :string (dtype/elemwise-datatype col))
21072150
(not (:strings-as-text? options)))
21082151
(if (and (nil? prev-ds)
21092152
(instance? StringTable (.data ^Column col)))
@@ -2131,7 +2174,6 @@ Please use stream->dataset-seq.")))
21312174
:metadata (assoc metadata
21322175
::previous-string-table prev-str-t)
21332176
:name (metadata :name)})))))
2134-
:else
21352177
ds))
21362178
ds
21372179
(ds-base/columns ds))))

test/data/uuid_ext.arrow

762 Bytes
Binary file not shown.

test/tech/v3/libs/arrow_test.clj

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,18 +242,26 @@
242242

243243

244244
(deftest uuid-test
245+
(let [py-uuid (ds/->dataset "test/data/uuid_ext.arrow" {:key-fn keyword})]
246+
247+
(is (= :uuid (dtype/elemwise-datatype (py-uuid :id))))
248+
(is (= (mapv #(java.util.UUID/fromString %)
249+
["8be643d6-0df7-4e5e-837c-f94170c87914"
250+
"24bc9cf4-e2e8-444f-bb2d-82394f33ff76"
251+
"e8149e1b-aef6-4671-b1b4-3b7a21eed92a"])
252+
(py-uuid :id))))
245253
(try
246254
(let [uuid-ds (ds/->dataset "test/data/uuid.parquet"
247255
{:parser-fn {"uuids" :uuid}})
248256
_ (arrow/write-dataset-to-stream! uuid-ds "test-uuid.arrow")
249257
copying-ds (arrow/read-stream-dataset-copying "test-uuid.arrow")
250258
inplace-ds (arrow/read-stream-dataset-inplace "test-uuid.arrow")]
251-
(is (= :text ((comp :datatype meta) (copying-ds "uuids"))))
252-
(is (= :text ((comp :datatype meta) (inplace-ds "uuids"))))
259+
(is (= :uuid ((comp :datatype meta) (copying-ds "uuids"))))
260+
(is (= :uuid ((comp :datatype meta) (inplace-ds "uuids"))))
253261
(is (= (vec (copying-ds "uuids"))
254262
(vec (inplace-ds "uuids"))))
255-
(is (= (mapv str (uuid-ds "uuids"))
256-
(mapv str (copying-ds "uuids")))))
263+
(is (= (vec (uuid-ds "uuids"))
264+
(vec (copying-ds "uuids")))))
257265
(finally
258266
(.delete (java.io.File. "test-uuid.arrow")))))
259267

0 commit comments

Comments
 (0)