106106 [ham-fisted.api :as hamf]
107107 [ham-fisted.reduce :as hamf-rf]
108108 [ham-fisted.lazy-noncaching :as lznc]
109+ [ham-fisted.function :as hamf-fn]
109110 [ham-fisted.set :as set]
110111 [ham-fisted.protocols :as hamf-proto])
111112 (:import [ham_fisted ArrayLists]
@@ -1459,7 +1460,7 @@ Dependent block frames are not supported!!")
14591460 (fn parse-boolean-field
14601461 [decompressor]
14611462 ; ;nil subtype means null column
1462- (if
1463+ (if nil-subtype?
14631464 (col-impl/new-column
14641465 (:name field)
14651466 (dtype/const-reader false n-elems)
@@ -1830,33 +1831,32 @@ Dependent block frames are not supported!!")
18301831(deftype NextDatasetIter [schema ^Iterator messages fname
18311832 ^{:unsynchronized-mutable true
18321833 :tag long} idx
1833- ^:unsynchronized-mutable dict-map
1834+ ^Map dict-map
18341835 options]
18351836 Iterator
18361837 (hasNext [this] (.hasNext messages))
18371838 (next [this]
1838- (loop [msg (.next messages)
1839- dicts dict-map]
1839+ (loop [msg (.next messages)]
18401840 (if (identical? :dictionary-batch (get msg :message-type ))
1841- (recur (maybe-next messages)
1842- (update dicts (get msg :id )
1843- (fn [old-val]
1844- (delay
1845- (let [new-val (dictionary->strings msg)]
1846- (if (and old-val (new-val :delta? ))
1847- (update new-val :strings
1848- (fn [new-strs]
1849- (let [old-strs (get @old-val :strings )
1850- new-ec (+ (count new-strs) (count old-strs))
1851- rv (hamf/wrap-array-growable
1852- (make-array String new-ec)
1853- 0 )]
1854- (.addAllReducible rv old-strs)
1855- (.addAllReducible rv new-strs)
1856- rv))))
1857- new-val)))))
1841+ (do
1842+ (.compute dict-map (get msg :id )
1843+ (hamf-fn/bi-function
1844+ k old-val
1845+ (delay
1846+ (let [new-val (dictionary->strings msg)]
1847+ (if (and old-val (new-val :delta? ))
1848+ (let [old-strs (get @old-val :strings )
1849+ new-strs (get new-val :strings )
1850+ new-ec (+ (count new-strs) (count old-strs))
1851+ rv (hamf/wrap-array-growable
1852+ (make-array String new-ec)
1853+ 0 )]
1854+ (.addAllReducible rv old-strs)
1855+ (.addAllReducible rv new-strs)
1856+ (assoc new-val :strings rv))
1857+ new-val)))))
1858+ (recur (maybe-next messages)))
18581859 (let [cur-idx idx]
1859- (set! dict-map dicts)
18601860 (set! idx (inc cur-idx))
18611861 (-> (records->ds schema dict-map msg options)
18621862 (ds-base/set-dataset-name (format " %s-%03d" fname cur-idx))))))))
@@ -1881,7 +1881,7 @@ Dependent block frames are not supported!!")
18811881 (get options :close-input-stream? true ))
18821882 (.close ^InputStream input))
18831883 (throw (Exception. " Initial message is not a schema message." )))
1884- (NextDatasetIter. schema messages fname 0 {} options)))
1884+ (NextDatasetIter. schema messages fname 0 ( hamf/java-hashmap ) options)))
18851885
18861886(defn stream->dataset-iterable
18871887 " Loads data up to and including the first data record. Returns the a lazy
@@ -1924,9 +1924,9 @@ Dependent block frames are not supported!!")
19241924 :arrow-file
19251925 (next-dataset-iter (discard input 8 ) fname options)
19261926 :arrow-ipc
1927- (next-dataset-iter input options)
1927+ (next-dataset-iter input fname options)
19281928 :feather-v1
1929- [( feather->ds input options)] )))))
1929+ ( iter ( hamf/vector ( feather->ds input options))) )))))
19301930
19311931(defn ^:no-doc stream->dataset-seq
19321932 " see docs for [[stream->dataset-iterable]]"
0 commit comments