Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion src/manifold/debug.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns manifold.debug
{:no-doc true})
{:no-doc true}
(:require [clojure.tools.logging :as log]))

(def ^:dynamic *dropped-error-logging-enabled?* true)

Expand All @@ -8,3 +9,30 @@

(defn disable-dropped-error-logging! []
(.bindRoot #'*dropped-error-logging-enabled?* false))

(def ^:dynamic *leak-aware-deferred-rate* 1024)

(defn set-leak-aware-deferred-rate! [n]
(.bindRoot #'*leak-aware-deferred-rate* n))

(def dropped-errors nil)

(defn log-dropped-error! [error]
(some-> dropped-errors (swap! inc))
(log/warn error "unconsumed deferred in error state, make sure you're using `catch`."))

(defn with-dropped-error-detection
"Calls f, then attempts to trigger dropped errors to be detected and finally calls
handle-dropped-errors with the number of detected dropped errors. Details about these are logged
as warnings."
[f handle-dropped-errors]
(assert (nil? dropped-errors) "with-dropped-error-detection may not be nested")
;; Flush out any pending dropped errors from before
(System/gc)
(System/runFinalization)
(with-redefs [dropped-errors (atom 0)]
(f)
;; Flush out any errors which were dropped during f
(System/gc)
(System/runFinalization)
(handle-dropped-errors @dropped-errors)))
13 changes: 8 additions & 5 deletions src/manifold/deferred.clj
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,10 @@
[Object
(finalize [_]
(utils/with-lock lock
(when (and (identical? ::error state) (not consumed?))
(log/warn val "unconsumed deferred in error state, make sure you're using `catch`."))))]
(when (and (identical? ::error state)
(not consumed?)
debug/*dropped-error-logging-enabled?*)
(debug/log-dropped-error! val))))]
nil)

clojure.lang.IReference
Expand Down Expand Up @@ -631,7 +633,7 @@
(when (and
(not consumed?)
debug/*dropped-error-logging-enabled?*)
(log/warn error "unconsumed deferred in error state, make sure you're using `catch`.")))
(debug/log-dropped-error! error)))

clojure.lang.IReference
(meta [_] mta)
Expand Down Expand Up @@ -690,8 +692,9 @@
([]
(deferred (ex/executor)))
([executor]
(if (and (p/zero? (rem (.incrementAndGet created) 1024))
debug/*dropped-error-logging-enabled?*)
(if (and debug/*dropped-error-logging-enabled?*
(p/zero? (rem (.incrementAndGet created)
^long debug/*leak-aware-deferred-rate*)))
(LeakAwareDeferred. nil ::unset nil (utils/mutex) (LinkedList.) nil false executor)
(Deferred. nil ::unset nil (utils/mutex) (LinkedList.) nil false executor)))))

Expand Down
2 changes: 2 additions & 0 deletions test/manifold/bus_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@
d (b/publish! b (long 1) 42)]
(is (= 42 @(s/take! s)))
(is (= true @d))))

(instrument-tests-with-dropped-error-detection!)
3 changes: 3 additions & 0 deletions test/manifold/deferred_stage_test.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns manifold.deferred-stage-test
(:require [manifold.deferred :as d]
[manifold.test-utils :refer :all]
[manifold.utils :refer
[fn->Function fn->Consumer fn->BiFunction fn->BiConsumer]]
[clojure.test :refer [deftest is testing]])
Expand Down Expand Up @@ -598,3 +599,5 @@
(d/success-deferred (d/success-deferred x)))))]

(is (d/deferred? @d2)))))

(instrument-tests-with-dropped-error-detection!)
31 changes: 19 additions & 12 deletions test/manifold/deferred_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
(:require
[clojure.test :refer :all]
[manifold.test-utils :refer :all]
[manifold.debug :as debug]
[manifold.deferred :as d]
[manifold.executor :as ex])
(:import
(java.util.concurrent
CompletableFuture
CompletionStage)
CompletionStage
TimeoutException)
(manifold.deferred IDeferred)))

(defmacro future' [& body]
Expand Down Expand Up @@ -314,7 +316,7 @@

;;;

(deftest ^:benchmark benchmark-chain
(deftest ^:ignore-dropped-errors ^:benchmark benchmark-chain
(bench "invoke comp x1"
((comp inc) 0))
(bench "chain x1"
Expand All @@ -334,7 +336,7 @@
(bench "chain' x5"
@(d/chain' 0 inc inc inc inc inc)))

(deftest ^:benchmark benchmark-deferred
(deftest ^:ignore-dropped-errors ^:benchmark benchmark-deferred
(bench "create deferred"
(d/deferred))
(bench "add-listener and success"
Expand Down Expand Up @@ -381,16 +383,19 @@
(deliver d 1)
@d)))

(deftest ^:stress test-error-leak-detection
(deftest ^:ignore-dropped-errors ^:stress test-error-leak-detection
(testing "error-deferred always detects dropped errors"
(expect-dropped-errors 1
(d/error-deferred (Throwable.))))

(d/error-deferred (Throwable.))
(System/gc)
(testing "regular deferreds detect errors on every debug/*leak-aware-deferred-rate*'th instance (1024 by default)"
(expect-dropped-errors 2
;; Explicitly restating the (current) default here for clarity
(binding [debug/*leak-aware-deferred-rate* 1024]
(dotimes [_ 2048]
(d/error! (d/deferred) (Throwable.)))))))

(dotimes [_ 2e3]
(d/error! (d/deferred) (Throwable.)))
(System/gc))

(deftest ^:stress test-deferred-chain
(deftest ^:ignore-dropped-errors ^:stress test-deferred-chain
(dotimes [_ 1e4]
(let [d (d/deferred)
result (d/future
Expand All @@ -401,7 +406,7 @@
(d/connect % d')
d')
d))))]
(Thread/sleep (rand-int 10))
(Thread/sleep ^long (rand-int 10))
(d/success! d 1)
(is (= 1 @@result)))))

Expand Down Expand Up @@ -431,3 +436,5 @@

(finally
(remove-method print-method CompletionStage))))

(instrument-tests-with-dropped-error-detection!)
5 changes: 4 additions & 1 deletion test/manifold/executor_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
(ns manifold.executor-test
(:require
[clojure.test :refer :all]
[manifold.executor :as e])
[manifold.executor :as e]
[manifold.test-utils :refer :all])
(:import
[io.aleph.dirigiste
Executor
Expand Down Expand Up @@ -68,3 +69,5 @@
500)
thread (.newThread tf (constantly nil))]
(is (= "custom-name" (.getName thread)))))

(instrument-tests-with-dropped-error-detection!)
4 changes: 3 additions & 1 deletion test/manifold/go_off_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@
(s/close! test-stream)
(is (= @test-d [0 1 2 nil nil]))))

(deftest ^:benchmark benchmark-go-off
(deftest ^:ignore-dropped-errors ^:benchmark benchmark-go-off
(bench "invoke comp x1"
((comp inc) 0))
(bench "go-off x1"
Expand All @@ -174,3 +174,5 @@
@(go-off (inc (<!? (inc (<!? (inc (<!? (inc (<!? (inc (<!? (d/success-deferred 0))))))))))))
(bench "go-off future 200 x5"
@(go-off (inc (<!? (inc (<!? (inc (<!? (inc (<!? (inc (<!? (d/future (Thread/sleep 200) 0)))))))))))))))

(instrument-tests-with-dropped-error-detection!)
12 changes: 7 additions & 5 deletions test/manifold/stream_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@

;;;

(deftest ^:stress stress-buffered-stream
(deftest ^:ignore-dropped-errors ^:stress stress-buffered-stream
(let [s (s/buffered-stream identity 100)]
(future
(dotimes [_ 1e6]
Expand Down Expand Up @@ -518,7 +518,7 @@
(dotimes [_ 1e3]
@(s/take! s)))

(deftest ^:benchmark benchmark-conveyance
(deftest ^:ignore-dropped-errors ^:benchmark benchmark-conveyance
(let [s (s/stream)
s' (reduce
(fn [s _]
Expand All @@ -542,7 +542,7 @@
(async/go (async/>! c 1))
(async/<!! c'))))

(deftest ^:benchmark benchmark-map
(deftest ^:ignore-dropped-errors ^:benchmark benchmark-map
(let [s (s/stream)
s' (reduce
(fn [s _] (s/map inc s))
Expand All @@ -560,7 +560,7 @@
(async/go (async/>! c 1))
(async/<!! c'))))

(deftest ^:benchmark benchmark-alternatives
(deftest ^:ignore-dropped-errors ^:benchmark benchmark-alternatives
(let [q (ArrayBlockingQueue. 1024)]
(bench "blocking queue throughput w/ 1024 buffer"
(blocking-queue-benchmark q)))
Expand Down Expand Up @@ -589,7 +589,7 @@
(bench "core.async blocking channel throughput w/ no buffer"
(core-async-blocking-benchmark ch))))

(deftest ^:benchmark benchmark-streams
(deftest ^:ignore-dropped-errors ^:benchmark benchmark-streams
(let [s (s/stream 1024)]
(bench "stream throughput w/ 1024 buffer"
(stream-benchmark s)))
Expand All @@ -611,3 +611,5 @@
(s/consume (fn [_]) s)
(bench "put! with consume"
(s/put! s 1))))

(instrument-tests-with-dropped-error-detection!)
52 changes: 51 additions & 1 deletion test/manifold/test_utils.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
(ns manifold.test-utils
(:require
[criterium.core :as c]))
[clojure.test :as test]
[criterium.core :as c]
[manifold.debug :as debug]))

(defmacro long-bench [name & body]
`(do
Expand All @@ -16,3 +18,51 @@
(do ~@body)
:reduce-with #(and %1 %2))))

(defn report-dropped-errors! [dropped-errors]
(when (pos? dropped-errors)
;; We include the assertion here within the `when` form so that we don't add a mystery assertion
;; to every passing test (which is the common case).
(test/is (zero? dropped-errors)
"Dropped errors detected! See log output for details.")))

(defn instrument-test-fn-with-dropped-error-detection [tf]
(if (or (::detect-dropped-errors? tf)
(:ignore-dropped-errors tf))
tf
(with-meta
(fn []
(binding [debug/*leak-aware-deferred-rate* 1]
(debug/with-dropped-error-detection tf report-dropped-errors!)))
{::detect-dropped-errors? true})))

(defn instrument-tests-with-dropped-error-detection!
"Instrument all tests in the current namespace dropped error detection by wrapping them in
`manifold.debug/with-dropped-error-detection`. If dropped errors are detected, a corresponding (failing)
assertion is injected into the test and the leak reports are logged at level `error`.

Usually placed at the end of a test namespace.

Add `:ignore-dropped-errors` to a test var's metadata to skip it from being instrumented.

Note that this is intentionally not implemented as a fixture since there is no clean way to make a
test fail from within a fixture: Neither a failing assertion nor throwing an exception will
preserve which particular test caused it. See
e.g. https://github.com/technomancy/leiningen/issues/2694 for an example of this."
[]
(->> (ns-interns *ns*)
vals
(filter (comp :test meta))
(run! (fn [tv]
(when-not (:ignore-dropped-errors (meta tv))
(alter-meta! tv update :test instrument-test-fn-with-dropped-error-detection))))))

(defmacro expect-dropped-errors
"Expect n number of dropped errors after executing body in the form of a test assertion.

Add `:ignore-dropped-errors` to the a test's metadata to be able to use this macro in an
instrumented namespace (see `instrument-tests-with-dropped-error-detection!`)."
[n & body]
`(debug/with-dropped-error-detection
(fn [] ~@body)
(fn [n#]
(test/is (= ~n n#) "Expected number of dropped errors doesn't match detected number of dropped errors."))))
2 changes: 2 additions & 0 deletions test/manifold/time_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,5 @@
(is (= 1 @counter))
(t/advance c 1)
(is (= 1 @counter))))

(instrument-tests-with-dropped-error-detection!)