Skip to content

Commit db1e927

Browse files
committed
Use primitive-math
1 parent 947153f commit db1e927

File tree

10 files changed

+91
-83
lines changed

10 files changed

+91
-83
lines changed

src/manifold/bus.clj

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
[stream :as s]
77
[deferred :as d]
88
[utils :refer [definterface+]]]
9-
[potemkin.types :refer [deftype+]])
9+
[potemkin.types :refer [deftype+]]
10+
[clj-commons.primitive-math :as p])
1011
(:import
1112
[java.util.concurrent
1213
ConcurrentHashMap]
@@ -53,25 +54,25 @@
5354
(if (nil? ary)
5455
(object-array [x])
5556
(let [len (Array/getLength ary)
56-
ary' (object-array (inc len))]
57+
ary' (object-array (p/inc len))]
5758
(System/arraycopy ary 0 ary' 0 len)
5859
(aset ^objects ary' len x)
5960
ary')))
6061

6162
(defn- disj' [^objects ary x]
6263
(let [len (Array/getLength ary)]
6364
(if-let [idx (loop [i 0]
64-
(if (<= len i)
65+
(if (p/<= len i)
6566
nil
6667
(if (identical? x (aget ary i))
6768
i
68-
(recur (inc i)))))]
69-
(let [idx (long idx)]
70-
(if (== 1 len)
69+
(recur (p/inc i)))))]
70+
(let [idx (p/long idx)]
71+
(if (p/== 1 len)
7172
nil
72-
(let [ary' (object-array (dec len))]
73+
(let [ary' (object-array (p/dec len))]
7374
(System/arraycopy ary 0 ary' 0 idx)
74-
(System/arraycopy ary (inc idx) ary' idx (- len idx 1))
75+
(System/arraycopy ary (p/inc idx) ary' idx (p/- len idx 1))
7576
ary')))
7677
ary)))
7778

src/manifold/deferred.clj

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
[manifold.time :as time]
1111
[manifold.utils :as utils :refer [assert-some definterface+]]
1212
[potemkin.types :refer [def-abstract-type reify+ defprotocol+ deftype+]]
13+
[clj-commons.primitive-math :as p]
1314
[riddley.compiler :as compiler]
1415
[riddley.walk :as walk])
1516
(:import
@@ -689,7 +690,7 @@
689690
([]
690691
(deferred (ex/executor)))
691692
([executor]
692-
(if (and (zero? (rem (.incrementAndGet created) 1024))
693+
(if (and (p/zero? (rem (.incrementAndGet created) 1024))
693694
debug/*dropped-error-logging-enabled?*)
694695
(LeakAwareDeferred. nil ::unset nil (utils/mutex) (LinkedList.) nil false executor)
695696
(Deferred. nil ::unset nil (utils/mutex) (LinkedList.) nil false executor)))))
@@ -703,8 +704,8 @@
703704
#_{:inline
704705
(fn [val]
705706
(cond
706-
(true? val) 'manifold.deferred/true-deferred-
707-
(false? val) 'manifold.deferred/false-deferred-
707+
(p/true? val) 'manifold.deferred/true-deferred-
708+
(p/false? val) 'manifold.deferred/false-deferred-
708709
(nil? val) 'manifold.deferred/nil-deferred-
709710
:else `(SuccessDeferred. ~val nil nil)))
710711
:inline-arities #{1}}
@@ -1026,7 +1027,7 @@
10261027
"Like `chain`, but does not coerce deferrable values. This is useful both when coercion
10271028
is undesired, or for 2-4x better performance than `chain`."
10281029
{:inline (fn [& args]
1029-
(if false #_(< 3 (count args))
1030+
(if false #_(p/< 3 (count args))
10301031
(apply unroll-chain 'manifold.deferred/unwrap' args)
10311032
`(chain'- nil ~@args)))}
10321033
([x]
@@ -1046,12 +1047,12 @@
10461047
The returned deferred will only be realized once all functions have been applied and their
10471048
return values realized.
10481049
1049-
@(chain 1 inc #(future (inc %))) => 3
1050+
@(chain 1 inc #(future (p/inc %))) => 3
10501051
10511052
@(chain (future 1) inc inc) => 3
10521053
"
10531054
{:inline (fn [& args]
1054-
(if false #_(< 3 (count args))
1055+
(if false #_(p/< 3 (count args))
10551056
(apply unroll-chain 'manifold.deferred/unwrap args)
10561057
`(chain- nil ~@args)))}
10571058
([x]
@@ -1153,7 +1154,7 @@
11531154
(defn finally
11541155
"An equivalent of the finally clause, which takes a no-arg side-effecting function that executes
11551156
no matter what the result.
1156-
1157+
11571158
Returns x unless a Throwable is thrown in f, in which case it returns an error-deferred."
11581159
[x f]
11591160
(if-let [d (->deferred x nil)]
@@ -1174,7 +1175,7 @@
11741175

11751176
;; no further results, decrement the counter one last time
11761177
;; and return the result if everything else has been realized
1177-
(if (zero? (.get counter))
1178+
(if (p/zero? (.get counter))
11781179
(success-deferred (or (seq ary) (list)))
11791180
d)
11801181

@@ -1196,7 +1197,7 @@
11961197
(on-realized (chain' x)
11971198
(fn [val]
11981199
(aset ary idx val)
1199-
(when (zero? (.decrementAndGet counter))
1200+
(when (p/zero? (.decrementAndGet counter))
12001201
(success! d (seq ary))))
12011202
(fn [err]
12021203
(error! d err)))
@@ -1228,10 +1229,10 @@
12281229
(clojure.core/loop [i 1]
12291230
(if (= i n)
12301231
a
1231-
(let [j (rand-int (inc i))]
1232+
(let [j (rand-int (p/inc i))]
12321233
(aset a i (aget a j))
12331234
(aset a j i)
1234-
(recur (inc i)))))))
1235+
(recur (p/inc i)))))))
12351236

12361237
(defn alt'
12371238
"Like `alt`, but only unwraps Manifold deferreds."
@@ -1240,7 +1241,7 @@
12401241
cnt (count vals)
12411242
^ints idxs (random-array cnt)]
12421243
(clojure.core/loop [i 0]
1243-
(when (< i cnt)
1244+
(when (p/< i cnt)
12441245
(let [i' (aget idxs i)
12451246
x (nth vals i')]
12461247
(if (deferred? x)
@@ -1250,7 +1251,7 @@
12501251
(do (on-realized (chain' x)
12511252
#(success! d %)
12521253
#(error! d %))
1253-
(recur (inc i))))
1254+
(recur (p/inc i))))
12541255
(success! d x)))))
12551256
d))
12561257

@@ -1324,9 +1325,9 @@
13241325
13251326
(loop [i 1e6]
13261327
(chain (future i)
1327-
#(if (zero? %)
1328+
#(if (p/zero? %)
13281329
%
1329-
(recur (dec %)))))"
1330+
(recur (p/dec %)))))"
13301331
[bindings & body]
13311332
(let [vars (->> bindings (partition 2) (map first))
13321333
vals (->> bindings (partition 2) (map second))
@@ -1479,15 +1480,15 @@
14791480
Returns a deferred value, representing the value returned by the body.
14801481
14811482
(let-flow [x (future 1)]
1482-
(+ x 1))
1483+
(p/+ x 1))
14831484
14841485
(let-flow [x (future 1)
1485-
y (future (+ x 1))]
1486-
(+ y 1))
1486+
y (future (p/+ x 1))]
1487+
(p/+ y 1))
14871488
14881489
(let [x (future 1)]
1489-
(let-flow [y (future (+ x 1))]
1490-
(+ y 1)))"
1490+
(let-flow [y (future (p/+ x 1))]
1491+
(p/+ y 1)))"
14911492
[bindings & body]
14921493
(expand-let-flow
14931494
'manifold.deferred/chain

src/manifold/executor.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,9 @@
8888
(when (contains? stats Stats$Metric/QUEUE_LENGTH)
8989
{:queue-length (q #(.getQueueLength s %))})
9090
(when (contains? stats Stats$Metric/QUEUE_LATENCY)
91-
{:queue-latency (q #(double (/ (.getQueueLatency s %) 1e6)))})
91+
{:queue-latency (q #(p/double (p// (.getQueueLatency s %) 1e6)))})
9292
(when (contains? stats Stats$Metric/TASK_LATENCY)
93-
{:task-latency (q #(double (/ (.getTaskLatency s %) 1e6)))})
93+
{:task-latency (q #(p/double (p// (.getTaskLatency s %) 1e6)))})
9494
(when (contains? stats Stats$Metric/TASK_ARRIVAL_RATE)
9595
{:task-arrival-rate (q #(.getTaskArrivalRate s %))})
9696
(when (contains? stats Stats$Metric/TASK_COMPLETION_RATE)

src/manifold/stream.clj

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
[clojure.core :as clj]
88
[manifold.deferred :as d]
99
[potemkin.types :refer [deftype+]]
10+
[clj-commons.primitive-math :as p]
1011
[manifold.utils :as utils]
1112
[manifold.time :as time]
1213
[manifold.stream
@@ -494,7 +495,7 @@
494495
callback #(d/chain %
495496
callback
496497
(fn [result]
497-
(when (false? result)
498+
(when (p/false? result)
498499
(d/success! complete true))
499500
result))]
500501
(connect source (Callback. callback #(d/success! complete true) nil nil) nil)
@@ -567,7 +568,7 @@
567568
(cons x (stream->seq s timeout-interval)))))))
568569

569570
(defn- periodically-
570-
[stream period initial-delay f]
571+
[stream ^long period initial-delay f]
571572
(let [cancel (promise)]
572573
(deliver cancel
573574
(time/every period
@@ -588,7 +589,7 @@
588589
(fn [x]
589590
(if-not x
590591
(close! stream)
591-
(periodically- stream period (- period (rem (System/currentTimeMillis) period)) f)))))))
592+
(periodically- stream period (p/- period (rem (System/currentTimeMillis) period)) f)))))))
592593
(catch Throwable e
593594
(@cancel)
594595
(close! stream)
@@ -600,8 +601,8 @@
600601
(let [s (stream 1)]
601602
(periodically- s period initial-delay f)
602603
(source-only s)))
603-
([period f]
604-
(periodically period (- period (rem (System/currentTimeMillis) period)) f)))
604+
([^long period f]
605+
(periodically period (p/- period (rem (System/currentTimeMillis) period)) f)))
605606

606607
(declare zip)
607608

@@ -745,7 +746,7 @@
745746

746747
(defn mapcat
747748
"Equivalent to Clojure's `mapcat`, but for streams instead of sequences.
748-
749+
749750
Note that just like `clojure.core/mapcat`, the provided function `f`
750751
must return a collection and not a stream."
751752
([f s]
@@ -904,7 +905,7 @@
904905
(identical? result ::timeout)
905906
timeout-val
906907

907-
(false? result)
908+
(p/false? result)
908909
false
909910

910911
:else
@@ -926,7 +927,7 @@
926927
(if (identical? default-val x)
927928
x
928929
(let [[size msg] x]
929-
(buf+ (- size))
930+
(buf+ (p/- ^long size))
930931
msg))))]
931932
(if blocking?
932933
@val
@@ -944,7 +945,7 @@
944945

945946
:else
946947
(let [[size msg] x]
947-
(buf+ (- size))
948+
(buf+ (p/- ^long size))
948949
msg))))]
949950
(if blocking?
950951
@val
@@ -965,7 +966,7 @@
965966
(buffered-stream (constantly 1) buffer-size))
966967
([metric limit]
967968
(buffered-stream metric limit identity))
968-
([metric limit description]
969+
([metric ^long limit description]
969970
(let [buf (stream Integer/MAX_VALUE)
970971
buffer-size (AtomicLong. 0)
971972
last-put (AtomicReference. (d/success-deferred true))
@@ -974,10 +975,10 @@
974975
(let [buf' (.addAndGet buffer-size n)
975976
buf (unchecked-subtract buf' n)]
976977
(cond
977-
(and (<= buf' limit) (< limit buf))
978+
(and (p/<= buf' limit) (p/< limit buf))
978979
(-> last-put .get (d/success! true))
979980

980-
(and (<= buf limit) (< limit buf'))
981+
(and (p/<= buf limit) (p/< limit buf'))
981982
(-> last-put (.getAndSet (d/deferred)) (d/success! true))))))]
982983

983984
(BufferedStream.
@@ -1013,7 +1014,7 @@
10131014
(batch (constantly 1) batch-size nil s))
10141015
([max-size max-latency s]
10151016
(batch (constantly 1) max-size max-latency s))
1016-
([metric max-size max-latency s]
1017+
([metric ^long max-size ^long max-latency s]
10171018
(assert (pos? max-size))
10181019

10191020
(let [buf (stream)
@@ -1022,16 +1023,16 @@
10221023
(connect-via-proxy s buf s' {:description {:op "batch"}})
10231024
(on-closed s' #(close! buf))
10241025

1025-
(d/loop [msgs [], size 0, earliest-message -1, last-message -1]
1026+
(d/loop [msgs [], ^long size 0, ^long earliest-message -1, ^long last-message -1]
10261027
(cond
10271028
(or
1028-
(== size max-size)
1029-
(and (< max-size size) (== (count msgs) 1)))
1029+
(p/== size max-size)
1030+
(and (p/< max-size size) (p/== (count msgs) 1)))
10301031
(d/chain' (put! s' msgs)
10311032
(fn [_]
10321033
(d/recur [] 0 -1 -1)))
10331034

1034-
(> size max-size)
1035+
(p/> size max-size)
10351036
(let [msg (peek msgs)]
10361037
(d/chain' (put! s' (pop msgs))
10371038
(fn [_]
@@ -1045,7 +1046,7 @@
10451046
(take! buf ::empty)
10461047
(try-take! buf
10471048
::empty
1048-
(- max-latency (- (System/currentTimeMillis) earliest-message))
1049+
(p/- max-latency (p/- (System/currentTimeMillis) earliest-message))
10491050
::timeout))
10501051
(fn [msg]
10511052
(condp identical? msg
@@ -1064,8 +1065,8 @@
10641065
(let [time (System/currentTimeMillis)]
10651066
(d/recur
10661067
(conj msgs msg)
1067-
(+ size (metric msg))
1068-
(if (neg? earliest-message)
1068+
(p/+ size ^long (metric msg))
1069+
(if (p/< earliest-message 0)
10691070
time
10701071
earliest-message)
10711072
time)))))))
@@ -1080,15 +1081,15 @@
10801081
this is set to one second's worth."
10811082
([max-rate s]
10821083
(throttle max-rate max-rate s))
1083-
([max-rate max-backlog s]
1084+
([^long max-rate ^double max-backlog s]
10841085
(let [buf (stream)
10851086
s' (stream)
1086-
period (double (/ 1000 max-rate))]
1087+
period (p/double (p// 1000 max-rate))]
10871088

10881089
(connect-via-proxy s buf s' {:description {:op "throttle"}})
10891090
(on-closed s' #(close! buf))
10901091

1091-
(d/loop [backlog 0.0, read-start (System/currentTimeMillis)]
1092+
(d/loop [^double backlog 0.0, ^long read-start (System/currentTimeMillis)]
10921093
(d/chain (take! buf ::none)
10931094

10941095
(fn [msg]
@@ -1100,11 +1101,11 @@
11001101

11011102
(fn [result]
11021103
(when result
1103-
(let [elapsed (double (- (System/currentTimeMillis) read-start))
1104-
backlog' (min (+ backlog (- (/ elapsed period) 1)) max-backlog)]
1105-
(if (<= 1 backlog')
1106-
(- backlog' 1.0)
1107-
(d/timeout! (d/deferred) (- period elapsed) 0.0)))))
1104+
(let [elapsed (p/double (p/- (System/currentTimeMillis) read-start))
1105+
backlog' (p/min (p/+ backlog (p/- (p// elapsed period) 1.0)) max-backlog)]
1106+
(if (p/<= 1.0 backlog')
1107+
(p/- backlog' 1.0)
1108+
(d/timeout! (d/deferred) (p/- period elapsed) 0.0)))))
11081109

11091110
(fn [backlog]
11101111
(if backlog

0 commit comments

Comments
 (0)