Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion src/manifold/debug.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns manifold.debug
{:no-doc true})
{:no-doc true}
(:require [clojure.tools.logging :as log]))

(def ^:dynamic *dropped-error-logging-enabled?* true)

Expand All @@ -8,3 +9,30 @@

(defn disable-dropped-error-logging! []
(.bindRoot #'*dropped-error-logging-enabled?* false))

(def ^:dynamic *leak-aware-deferred-rate* 1024)

(defn set-leak-aware-deferred-rate! [n]
(.bindRoot #'*leak-aware-deferred-rate* n))

(def dropped-errors nil)

(defn log-dropped-error! [error]
(some-> dropped-errors (swap! inc))
(log/warn error "unconsumed deferred in error state, make sure you're using `catch`."))

(defn with-dropped-error-detection
"Calls f, then attempts to trigger dropped errors to be detected and finally calls
handle-dropped-errors with the number of detected dropped errors. Details about these are logged
as warnings."
[f handle-dropped-errors]
(assert (nil? dropped-errors) "with-dropped-error-detection may not be nested")
;; Flush out any pending dropped errors from before
(System/gc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was told before that System/gc is more of a suggestion to the VM than an actual call

Did you observe this working reliably?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, there is no guarantee that System/gc actually triggers a GC, let alone whether it reclaims all garbage. Quoting from the docs

Calling the gc method suggests that the Java Virtual Machine expend effort toward recycling unused objects in order to make the memory they currently occupy available for reuse by the Java Virtual Machine. When control returns from the method call, the Java Virtual Machine has made a best effort to reclaim space from all unused objects. There is no guarantee that this effort will recycle any particular number of unused objects, reclaim any particular amount of space, or complete at any particular time, if at all, before the method returns or ever. There is also no guarantee that this effort will determine the change of reachability in any particular number of objects, or that any particular number of Reference objects will be cleared and enqueued.

However, it does work pretty well in practice, see for example the failing tests for this PR which uses OpenJDK 8. Locally, I've also successfully used it with OpenJDK 21. So yeah, it's not deterministic but still quite useful as evidenced by the issues it uncovered 🙂

(System/runFinalization)
(with-redefs [dropped-errors (atom 0)]
(f)
;; Flush out any errors which were dropped during f
(System/gc)
(System/runFinalization)
(handle-dropped-errors @dropped-errors)))
13 changes: 8 additions & 5 deletions src/manifold/deferred.clj
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,10 @@
[Object
(finalize [_]
(utils/with-lock lock
(when (and (identical? ::error state) (not consumed?))
(log/warn val "unconsumed deferred in error state, make sure you're using `catch`."))))]
(when (and (identical? ::error state)
(not consumed?)
debug/*dropped-error-logging-enabled?*)
(debug/log-dropped-error! val))))]
nil)

clojure.lang.IReference
Expand Down Expand Up @@ -631,7 +633,7 @@
(when (and
(not consumed?)
debug/*dropped-error-logging-enabled?*)
(log/warn error "unconsumed deferred in error state, make sure you're using `catch`.")))
(debug/log-dropped-error! error)))

clojure.lang.IReference
(meta [_] mta)
Expand Down Expand Up @@ -690,8 +692,9 @@
([]
(deferred (ex/executor)))
([executor]
(if (and (p/zero? (rem (.incrementAndGet created) 1024))
debug/*dropped-error-logging-enabled?*)
(if (and debug/*dropped-error-logging-enabled?*
(p/zero? (rem (.incrementAndGet created)
^long debug/*leak-aware-deferred-rate*)))
(LeakAwareDeferred. nil ::unset nil (utils/mutex) (LinkedList.) nil false executor)
(Deferred. nil ::unset nil (utils/mutex) (LinkedList.) nil false executor)))))

Expand Down
2 changes: 2 additions & 0 deletions test/manifold/bus_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@
d (b/publish! b (long 1) 42)]
(is (= 42 @(s/take! s)))
(is (= true @d))))

(instrument-tests-with-dropped-error-detection!)
3 changes: 3 additions & 0 deletions test/manifold/deferred_stage_test.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns manifold.deferred-stage-test
(:require [manifold.deferred :as d]
[manifold.test-utils :refer :all]
[manifold.utils :refer
[fn->Function fn->Consumer fn->BiFunction fn->BiConsumer]]
[clojure.test :refer [deftest is testing]])
Expand Down Expand Up @@ -598,3 +599,5 @@
(d/success-deferred (d/success-deferred x)))))]

(is (d/deferred? @d2)))))

(instrument-tests-with-dropped-error-detection!)
31 changes: 19 additions & 12 deletions test/manifold/deferred_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
(:require
[clojure.test :refer :all]
[manifold.test-utils :refer :all]
[manifold.debug :as debug]
[manifold.deferred :as d]
[manifold.executor :as ex])
(:import
(java.util.concurrent
CompletableFuture
CompletionStage)
CompletionStage
TimeoutException)
(manifold.deferred IDeferred)))

(defmacro future' [& body]
Expand Down Expand Up @@ -314,7 +316,7 @@

;;;

(deftest ^:benchmark benchmark-chain
(deftest ^:ignore-dropped-errors ^:benchmark benchmark-chain
(bench "invoke comp x1"
((comp inc) 0))
(bench "chain x1"
Expand All @@ -334,7 +336,7 @@
(bench "chain' x5"
@(d/chain' 0 inc inc inc inc inc)))

(deftest ^:benchmark benchmark-deferred
(deftest ^:ignore-dropped-errors ^:benchmark benchmark-deferred
(bench "create deferred"
(d/deferred))
(bench "add-listener and success"
Expand Down Expand Up @@ -381,16 +383,19 @@
(deliver d 1)
@d)))

(deftest ^:stress test-error-leak-detection
(deftest ^:ignore-dropped-errors ^:stress test-error-leak-detection
(testing "error-deferred always detects dropped errors"
(expect-dropped-errors 1
(d/error-deferred (Throwable.))))

(d/error-deferred (Throwable.))
(System/gc)
(testing "regular deferreds detect errors on every debug/*leak-aware-deferred-rate*'th instance (1024 by default)"
(expect-dropped-errors 2
;; Explicitly restating the (current) default here for clarity
(binding [debug/*leak-aware-deferred-rate* 1024]
(dotimes [_ 2048]
(d/error! (d/deferred) (Throwable.)))))))

(dotimes [_ 2e3]
(d/error! (d/deferred) (Throwable.)))
(System/gc))

(deftest ^:stress test-deferred-chain
(deftest ^:ignore-dropped-errors ^:stress test-deferred-chain
(dotimes [_ 1e4]
(let [d (d/deferred)
result (d/future
Expand All @@ -401,7 +406,7 @@
(d/connect % d')
d')
d))))]
(Thread/sleep (rand-int 10))
(Thread/sleep ^long (rand-int 10))
(d/success! d 1)
(is (= 1 @@result)))))

Expand Down Expand Up @@ -431,3 +436,5 @@

(finally
(remove-method print-method CompletionStage))))

(instrument-tests-with-dropped-error-detection!)
5 changes: 4 additions & 1 deletion test/manifold/executor_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
(ns manifold.executor-test
(:require
[clojure.test :refer :all]
[manifold.executor :as e])
[manifold.executor :as e]
[manifold.test-utils :refer :all])
(:import
[io.aleph.dirigiste
Executor
Expand Down Expand Up @@ -68,3 +69,5 @@
500)
thread (.newThread tf (constantly nil))]
(is (= "custom-name" (.getName thread)))))

(instrument-tests-with-dropped-error-detection!)
4 changes: 3 additions & 1 deletion test/manifold/go_off_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@
(s/close! test-stream)
(is (= @test-d [0 1 2 nil nil]))))

(deftest ^:benchmark benchmark-go-off
(deftest ^:ignore-dropped-errors ^:benchmark benchmark-go-off
(bench "invoke comp x1"
((comp inc) 0))
(bench "go-off x1"
Expand All @@ -174,3 +174,5 @@
@(go-off (inc (<!? (inc (<!? (inc (<!? (inc (<!? (inc (<!? (d/success-deferred 0))))))))))))
(bench "go-off future 200 x5"
@(go-off (inc (<!? (inc (<!? (inc (<!? (inc (<!? (inc (<!? (d/future (Thread/sleep 200) 0)))))))))))))))

(instrument-tests-with-dropped-error-detection!)
12 changes: 7 additions & 5 deletions test/manifold/stream_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@

;;;

(deftest ^:stress stress-buffered-stream
(deftest ^:ignore-dropped-errors ^:stress stress-buffered-stream
(let [s (s/buffered-stream identity 100)]
(future
(dotimes [_ 1e6]
Expand Down Expand Up @@ -518,7 +518,7 @@
(dotimes [_ 1e3]
@(s/take! s)))

(deftest ^:benchmark benchmark-conveyance
(deftest ^:ignore-dropped-errors ^:benchmark benchmark-conveyance
(let [s (s/stream)
s' (reduce
(fn [s _]
Expand All @@ -542,7 +542,7 @@
(async/go (async/>! c 1))
(async/<!! c'))))

(deftest ^:benchmark benchmark-map
(deftest ^:ignore-dropped-errors ^:benchmark benchmark-map
(let [s (s/stream)
s' (reduce
(fn [s _] (s/map inc s))
Expand All @@ -560,7 +560,7 @@
(async/go (async/>! c 1))
(async/<!! c'))))

(deftest ^:benchmark benchmark-alternatives
(deftest ^:ignore-dropped-errors ^:benchmark benchmark-alternatives
(let [q (ArrayBlockingQueue. 1024)]
(bench "blocking queue throughput w/ 1024 buffer"
(blocking-queue-benchmark q)))
Expand Down Expand Up @@ -589,7 +589,7 @@
(bench "core.async blocking channel throughput w/ no buffer"
(core-async-blocking-benchmark ch))))

(deftest ^:benchmark benchmark-streams
(deftest ^:ignore-dropped-errors ^:benchmark benchmark-streams
(let [s (s/stream 1024)]
(bench "stream throughput w/ 1024 buffer"
(stream-benchmark s)))
Expand All @@ -611,3 +611,5 @@
(s/consume (fn [_]) s)
(bench "put! with consume"
(s/put! s 1))))

(instrument-tests-with-dropped-error-detection!)
52 changes: 51 additions & 1 deletion test/manifold/test_utils.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
(ns manifold.test-utils
(:require
[criterium.core :as c]))
[clojure.test :as test]
[criterium.core :as c]
[manifold.debug :as debug]))

(defmacro long-bench [name & body]
`(do
Expand All @@ -16,3 +18,51 @@
(do ~@body)
:reduce-with #(and %1 %2))))

(defn report-dropped-errors! [dropped-errors]
(when (pos? dropped-errors)
;; We include the assertion here within the `when` form so that we don't add a mystery assertion
;; to every passing test (which is the common case).
(test/is (zero? dropped-errors)
"Dropped errors detected! See log output for details.")))

(defn instrument-test-fn-with-dropped-error-detection [tf]
(if (or (::detect-dropped-errors? tf)
(:ignore-dropped-errors tf))
tf
(with-meta
(fn []
(binding [debug/*leak-aware-deferred-rate* 1]
(debug/with-dropped-error-detection tf report-dropped-errors!)))
{::detect-dropped-errors? true})))

(defn instrument-tests-with-dropped-error-detection!
"Instrument all tests in the current namespace dropped error detection by wrapping them in
`manifold.debug/with-dropped-error-detection`. If dropped errors are detected, a corresponding (failing)
assertion is injected into the test and the leak reports are logged at level `error`.
Usually placed at the end of a test namespace.
Add `:ignore-dropped-errors` to a test var's metadata to skip it from being instrumented.
Note that this is intentionally not implemented as a fixture since there is no clean way to make a
test fail from within a fixture: Neither a failing assertion nor throwing an exception will
preserve which particular test caused it. See
e.g. https://github.com/technomancy/leiningen/issues/2694 for an example of this."
[]
(->> (ns-interns *ns*)
vals
(filter (comp :test meta))
(run! (fn [tv]
(when-not (:ignore-dropped-errors (meta tv))
(alter-meta! tv update :test instrument-test-fn-with-dropped-error-detection))))))

(defmacro expect-dropped-errors
"Expect n number of dropped errors after executing body in the form of a test assertion.
Add `:ignore-dropped-errors` to the a test's metadata to be able to use this macro in an
instrumented namespace (see `instrument-tests-with-dropped-error-detection!`)."
[n & body]
`(debug/with-dropped-error-detection
(fn [] ~@body)
(fn [n#]
(test/is (= ~n n#) "Expected number of dropped errors doesn't match detected number of dropped errors."))))
2 changes: 2 additions & 0 deletions test/manifold/time_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,5 @@
(is (= 1 @counter))
(t/advance c 1)
(is (= 1 @counter))))

(instrument-tests-with-dropped-error-detection!)