|
| 1 | +(ns tech.v3.libs.parquet-hardwood |
| 2 | + (:require [ham-fisted.lazy-caching :as lznc] |
| 3 | + [ham-fisted.api :as hamf] |
| 4 | + [ham-fisted.iterator :as hamf-iter] |
| 5 | + [tech.v3.datatype :as dt] |
| 6 | + [tech.v3.datatype.array-buffer :as array-buffer] |
| 7 | + [tech.v3.datatype.errors :as errors] |
| 8 | + [tech.v3.dataset :as ds] |
| 9 | + [tech.v3.dataset.io.column-parsers :as col-parsers] |
| 10 | + [tech.v3.dataset.io.context :as parse-context] |
| 11 | + [tech.v3.dataset.base :as ds-base] |
| 12 | + [clojure.java.io :as io] |
| 13 | + [clojure.tools.logging :as log]) |
| 14 | + (:import [dev.morling.hardwood.reader ParquetFileReader ColumnReader] |
| 15 | + [dev.morling.hardwood.schema FileSchema ColumnSchema] |
| 16 | + [dev.morling.hardwood.metadata RowGroup ColumnChunk LogicalType |
| 17 | + LogicalType$DateType LogicalType$StringType LogicalType$UuidType |
| 18 | + LogicalType$EnumType LogicalType$JsonType LogicalType$TimestampType |
| 19 | + LogicalType$BsonType LogicalType$IntervalType LogicalType$IntType |
| 20 | + LogicalType$DecimalType LogicalType$TimeType LogicalType$MapType |
| 21 | + LogicalType$ListType] |
| 22 | + [clojure.lang MapEntry])) |
| 23 | + |
| 24 | +(set! *warn-on-reflection* true) |
| 25 | + |
| 26 | +(def ^:private column-finalizers |
| 27 | + {LogicalType$StringType (fn [_ str-data] (dt/emap (fn [ss] (String. ^bytes ss)) :string str-data)) |
| 28 | + LogicalType$DateType (fn [_ int-data] (-> (dt/as-array-buffer int-data) |
| 29 | + (array-buffer/set-datatype :packed-local-date)))}) |
| 30 | + |
| 31 | +(defn- read-column |
| 32 | + [idx ^ColumnSchema col-schema ^ColumnChunk col-def ^ParquetFileReader rdr options] |
| 33 | + (let [crdr (.getColumnReader rdr col-schema col-def) |
| 34 | + parser (col-parsers/promotional-object-parser (.name col-schema) nil) |
| 35 | + lt (.logicalType col-schema) |
| 36 | + finalizer (get column-finalizers (type lt) (fn [_ v] v)) |
| 37 | + key-fn (:key-fn options identity)] |
| 38 | + (loop [idx 0] |
| 39 | + (if (.hasNext crdr) |
| 40 | + (do |
| 41 | + (when-let [nv (.readNext crdr)] (col-parsers/add-value! parser idx nv)) |
| 42 | + (recur (inc idx))) |
| 43 | + {:column-name (key-fn (.name col-schema)) |
| 44 | + :column-parser (reify tech.v3.dataset.io.column_parsers.PParser |
| 45 | + (finalize [this n-rows] |
| 46 | + (-> (col-parsers/finalize! parser n-rows) |
| 47 | + (update :tech.v3.dataset/data #(finalizer lt %))))) |
| 48 | + :n-rows idx})))) |
| 49 | + |
| 50 | +(defn- ->path |
| 51 | + [data] |
| 52 | + (if (instance? java.nio.file.Path data) |
| 53 | + data |
| 54 | + (java.nio.file.Paths/get (str data) (into-array String [])))) |
| 55 | + |
| 56 | +(defn parquet->ds-seq |
| 57 | + [path options] |
| 58 | + (let [rdr (ParquetFileReader/open (->path path)) |
| 59 | + schema (.getFileSchema rdr) |
| 60 | + schema-columns (->> (range (.getColumnCount schema)) |
| 61 | + (into [] (map #(.getColumn schema (long %))))) |
| 62 | + col-name->idx (into {} (map-indexed (fn [idx ^ColumnSchema col] |
| 63 | + [(.name col) idx])) |
| 64 | + schema-columns) |
| 65 | + col-idx->name (into {} (map-indexed (fn [idx ^ColumnSchema col] |
| 66 | + [idx (.name col)])) |
| 67 | + schema-columns) |
| 68 | + cref->idx (fn [centry] (if (number? centry) (long centry) (col-name->idx centry))) |
| 69 | + allowset (into #{} (map cref->idx) (get options :column-allowlist)) |
| 70 | + blockset (into #{} (map cref->idx) (get options :column-blocklist)) |
| 71 | + is-allowed? (fn [^long idx] |
| 72 | + (if-not (empty? allowset) |
| 73 | + (boolean (allowset idx)) |
| 74 | + (if-not (empty? blockset) |
| 75 | + (boolean (not (blockset idx))) |
| 76 | + true))) |
| 77 | + src-indexes (vec (->> (hamf/range (count schema-columns)) |
| 78 | + (lznc/filter is-allowed?))) |
| 79 | + rv (->> (.getFileMetaData rdr) |
| 80 | + (.rowGroups) |
| 81 | + (lznc/map (fn [^RowGroup grp] |
| 82 | + (let [parsers (->> src-indexes |
| 83 | + (hamf/pmap-io 12 (fn [^long idx] |
| 84 | + (read-column idx (nth schema-columns idx) |
| 85 | + (nth (.columns grp) idx) |
| 86 | + rdr |
| 87 | + options))) |
| 88 | + (vec)) |
| 89 | + row-count (reduce max 0 (lznc/map :n-rows parsers))] |
| 90 | + (parse-context/parsers->dataset options parsers row-count))))) |
| 91 | + closer* (delay (.close rdr)) |
| 92 | + seq* (delay (seq rv))] |
| 93 | + (reify |
| 94 | + java.lang.AutoCloseable |
| 95 | + (close [this] @closer*) |
| 96 | + clojure.lang.Seqable |
| 97 | + (seq [this] @seq*) |
| 98 | + Iterable |
| 99 | + (iterator [this] |
| 100 | + (if (realized? seq*) |
| 101 | + (.iterator ^Iterable @seq*) |
| 102 | + (let [src-iter (.iterator ^Iterable rv)] |
| 103 | + (reify |
| 104 | + java.util.Iterator |
| 105 | + (hasNext [this] (let [rv (.hasNext src-iter)] |
| 106 | + (when-not rv @closer*) |
| 107 | + rv)) |
| 108 | + (next [this] (.next src-iter)))))) |
| 109 | + clojure.lang.IReduceInit |
| 110 | + (reduce [this rfn acc] |
| 111 | + (let [acc |
| 112 | + (if (realized? seq*) |
| 113 | + (reduce rfn acc @seq*) |
| 114 | + (reduce rfn acc rv))] |
| 115 | + @closer* |
| 116 | + acc))))) |
| 117 | + |
| 118 | +(defn parquet->ds |
| 119 | + "Load a parquet file. Input must be a file on disk. |
| 120 | +
|
| 121 | + Options are a subset of the options used for loading datasets - |
| 122 | + specifically `:column-allowlist` and `:column-blocklist` can be |
| 123 | + useful here. The parquet metadata ends up as metadata on the |
| 124 | + datasets." |
| 125 | + ([input options] |
| 126 | + (let [data-file (io/file input) |
| 127 | + _ (errors/when-not-errorf |
| 128 | + (.exists data-file) |
| 129 | + "Only on-disk files work with parquet. %s does not resolve to a file" |
| 130 | + input) |
| 131 | + dataset-seq (vec (parquet->ds-seq (.getCanonicalPath data-file) options))] |
| 132 | + (when-not (or (:disable-parquet-warn-on-multiple-datasets options) |
| 133 | + (== 1 (count dataset-seq))) |
| 134 | + (log/warnf "Concatenating multiple datasets (%d) into one. |
| 135 | +To disable this warning use `:disable-parquet-warn-on-multiple-datasets`" |
| 136 | + (count dataset-seq))) |
| 137 | + (if (== 1 (count dataset-seq)) |
| 138 | + (first dataset-seq) |
| 139 | + (apply ds-base/concat-copying dataset-seq)))) |
| 140 | + ([input] |
| 141 | + (parquet->ds input nil))) |
| 142 | + |
| 143 | +;; No writing interface defined yet |
| 144 | +;; (defn ds-seq->parquet |
| 145 | +;; ([path options ds-seq]) |
| 146 | +;; ([path ds-seq])) |
| 147 | + |
| 148 | +;; (defn ds->parquet |
| 149 | +;; "Write a dataset to a parquet file. Many parquet options are possible; |
| 150 | +;; these can also be passed in via ds/->write! |
| 151 | + |
| 152 | +;; Options are the same as ds-seq->parquet." |
| 153 | +;; ([ds path options] |
| 154 | +;; (ds-seq->parquet path options [ds])) |
| 155 | +;; ([ds path] |
| 156 | +;; (ds->parquet ds path nil))) |
0 commit comments