|
| 1 | +(ns aleph.resource-leak-detector |
| 2 | + "Provides a Netty leak detector which is more reliable than the default implementation. |
| 3 | +
|
| 4 | + Its chance of detecting leaks is around 95%. See `with-leak-collection` for details. |
| 5 | +
|
| 6 | + Since it adds considerable runtime overhead, its main purpose is to be used in tests to assert |
| 7 | + intentional presence of leaks and to check for unintended leaks. The main API entry points for |
| 8 | + this are `with-leak-collection`, `with-expected-leaks` and `instrument-tests!`. |
| 9 | +
|
| 10 | + To enable it, pass `-Dio.netty.customResourceLeakDetector=aleph.resource_leak_detector` to the |
| 11 | + JVM. You will also have to pass `-Dio.netty.leakDetection.level=PARANOID`. |
| 12 | +
|
| 13 | + Most reliable results are achieved by also passing the following options: |
| 14 | + `-Dio.netty.leakDetection.targetRecords=1` |
| 15 | + `-Dio.netty.allocator.type=unpooled` |
| 16 | +
|
| 17 | + All of the above is also provided by the `:leak-detection` Leiningen profile. |
| 18 | +
|
| 19 | + NOTE: Currently only improves reliability for detecting leaked ByteBufs. Other types of leaked |
| 20 | + resources will still be detected but not with the same reliability. Search Netty's codebase for |
| 21 | + invocations of `newResourceLeakDetector` to see which other candidate resources types there are." |
| 22 | + (:gen-class |
| 23 | + :extends io.netty.util.ResourceLeakDetector) |
| 24 | + (:require |
| 25 | + [clojure.test :as test] |
| 26 | + [clojure.tools.logging :as log]) |
| 27 | + (:import |
| 28 | + (io.netty.buffer AbstractByteBufAllocator) |
| 29 | + (io.netty.util |
| 30 | + ResourceLeakDetector |
| 31 | + ResourceLeakDetector$Level |
| 32 | + ResourceLeakDetectorFactory))) |
| 33 | + |
| 34 | +(defn enabled? |
| 35 | + "Checks whether the resource leak detector is enabled. |
| 36 | +
|
| 37 | + See `aleph.resource-leak-detector` docstring on how to enable it." |
| 38 | + [] |
| 39 | + (= "aleph.resource_leak_detector" |
| 40 | + (System/getProperty "io.netty.customResourceLeakDetector"))) |
| 41 | + |
| 42 | +(def active-resource-leak-detector-class |
| 43 | + (delay |
| 44 | + (class (.newResourceLeakDetector (ResourceLeakDetectorFactory/instance) String)))) |
| 45 | + |
| 46 | +(def active? |
| 47 | + (delay |
| 48 | + (= (Class/forName "aleph.resource_leak_detector") |
| 49 | + @active-resource-leak-detector-class))) |
| 50 | + |
| 51 | +(defn ensure-consistent-config! [] |
| 52 | + (when-not @active? |
| 53 | + (if (enabled?) |
| 54 | + (throw (RuntimeException. |
| 55 | + (str "`aleph.resource_leak_detector` is enabled but the active resource leak detector is " |
| 56 | + "`" (.getName ^Class @active-resource-leak-detector-class)"`. This indicates that " |
| 57 | + "`io.netty.util.ResourceLeakDetectorFactory` ran into an initialization error. " |
| 58 | + "Enable Netty debug logging to diagnose the cause."))) |
| 59 | + (throw (RuntimeException. |
| 60 | + (str "Attempted to use `aleph.resource-leak-detector` API but it is not enabled. " |
| 61 | + "Pass `-Dio.netty.customResourceLeakDetector=aleph.resource_leak_detector` to enable."))))) |
| 62 | + (when-not (= ResourceLeakDetector$Level/PARANOID (ResourceLeakDetector/getLevel)) |
| 63 | + (throw (RuntimeException. |
| 64 | + (str "`aleph.resource_leak_detector` requires `-Dio.netty.leakDetection.level=PARANOID`. " |
| 65 | + "Current level is `" (ResourceLeakDetector/getLevel) "`."))))) |
| 66 | + |
| 67 | +(def +max-probe-gc-runs+ |
| 68 | + "Maximum number of times the GC will be run to detect a leaked probe." |
| 69 | + 10) |
| 70 | + |
| 71 | +(def +probe-hint-marker+ |
| 72 | + "ALEPH LEAK DETECTOR PROBE") |
| 73 | + |
| 74 | +(defn hint-record-pattern [hint-pattern] |
| 75 | + (re-pattern (str "(?m)^\\s*Hint: " hint-pattern "$"))) |
| 76 | + |
| 77 | +(def +probe-hint-pattern+ |
| 78 | + (hint-record-pattern (str +probe-hint-marker+ " \\d+"))) |
| 79 | + |
| 80 | +(defn probe? [leak] |
| 81 | + (re-find +probe-hint-pattern+ (:records leak))) |
| 82 | + |
| 83 | +(defn contains-hint? [hint leak] |
| 84 | + (re-find (hint-record-pattern hint) (:records leak))) |
| 85 | + |
| 86 | +(defn remove-probes [leaks] |
| 87 | + (remove probe? leaks)) |
| 88 | + |
| 89 | +(let [cnt (atom 0)] |
| 90 | + (defn gen-probe-hint [] |
| 91 | + (str +probe-hint-marker+ " " (swap! cnt inc)))) |
| 92 | + |
| 93 | +(defn leak-probe! [hint] |
| 94 | + (-> AbstractByteBufAllocator/DEFAULT |
| 95 | + (.buffer 1) |
| 96 | + (.touch hint))) |
| 97 | + |
| 98 | +(def current-leaks) |
| 99 | + |
| 100 | +(defn force-leak-detection! [] |
| 101 | + (System/gc) |
| 102 | + (System/runFinalization) |
| 103 | + ;; Transitively trigger a track() invocation which in turn works |
| 104 | + ;; off the leaked references queue. |
| 105 | + (-> AbstractByteBufAllocator/DEFAULT (.buffer 1) .release)) |
| 106 | + |
| 107 | +(defn await-probe! [probe-hint] |
| 108 | + (loop [n +max-probe-gc-runs+] |
| 109 | + (force-leak-detection!) |
| 110 | + (if (zero? n) |
| 111 | + (throw (RuntimeException. "Gave up awaiting leak probe. Try increasing +max-probe-gc-runs+.")) |
| 112 | + (when-not (some (partial contains-hint? probe-hint) @current-leaks) |
| 113 | + (recur (dec n)))))) |
| 114 | + |
| 115 | +(defn with-leak-collection |
| 116 | + "Invokes thunk `f` and tries hard to collect any resource leaks it may have caused. |
| 117 | +
|
| 118 | + It works as follows: After invoking `f`, it intentionally leaks a (small) buffer, marked as a |
| 119 | + probe. It then runs the garbage collector and polls the leak detector in a loop until it reports a |
| 120 | + leak which matches the probe. Eventually, it invokes `handle-leaks` with a sequence of any other |
| 121 | + detected leaks it collected along the way (empty when none were detected). |
| 122 | +
|
| 123 | + A leak is represented as a map with the following keys: |
| 124 | + - `:resource-type` is a string with the name of the leaked resource type (e.g. \"ByteBuf\") |
| 125 | + - `:records` is a multi-line string which holds the trace of the leak. |
| 126 | +
|
| 127 | + Requires the leak detector to be `enabled?`. |
| 128 | +
|
| 129 | + When nested, each child establishes a fresh leak collection scope. However, this only works within |
| 130 | + the same thread, so any asynchronous processes started by and outliving `f` will leak into the |
| 131 | + parent scope(s)." |
| 132 | + [f handle-leaks] |
| 133 | + (ensure-consistent-config!) |
| 134 | + (with-redefs [current-leaks (atom [])] |
| 135 | + (f) |
| 136 | + (let [hint (gen-probe-hint)] |
| 137 | + (leak-probe! hint) |
| 138 | + (await-probe! hint) |
| 139 | + (handle-leaks (remove-probes @current-leaks))))) |
| 140 | + |
| 141 | +(defn -needReport [_this] |
| 142 | + true) |
| 143 | + |
| 144 | +(defn -reportTracedLeak [_this resource-type records] |
| 145 | + (swap! current-leaks conj {:resource-type resource-type |
| 146 | + :records records})) |
| 147 | + |
| 148 | +;; NOTE: Since we require level PARANOID, this should never be called in practice. |
| 149 | +(defn -reportUntracedLeak [_this resource-type] |
| 150 | + (swap! current-leaks conj {:resource-type resource-type |
| 151 | + :records "[untraced]"})) |
| 152 | + |
| 153 | +(defn log-leaks! [leaks] |
| 154 | + (doseq [{:keys [resource-type records]} leaks] |
| 155 | + ;; Log message cribbed from io.netty.util.ResourceLeakDetector's (protected) reportTracedLeak method |
| 156 | + (log/error (str "LEAK: " resource-type ".release() was not called before it's garbage-collected.") |
| 157 | + (str "See https://netty.io/wiki/reference-counted-objects.html for more information." records)))) |
| 158 | + |
| 159 | +(defmacro with-expected-leaks |
| 160 | + "Runs `body` and expects it to produce exactly `expected-leak-count` leaks. Intended for use in tests |
| 161 | + which intentionally leak resources. |
| 162 | +
|
| 163 | + Requires the leak detector to be `enabled?`." |
| 164 | + [expected-leak-count & body] |
| 165 | + `(with-leak-collection |
| 166 | + (fn [] ~@body) |
| 167 | + ;; NOTE: Using a raw symbol here instead of a gensym to get nicer test failures. |
| 168 | + (fn [~'leaks] |
| 169 | + (when-not (test/is (= ~expected-leak-count (count ~'leaks)) "Unexpected leak count! See log output for details.") |
| 170 | + (log-leaks! ~'leaks))))) |
| 171 | + |
| 172 | +(defn- report-test-leaks! [leaks] |
| 173 | + (when (seq leaks) |
| 174 | + (log-leaks! leaks) |
| 175 | + ;; We include the assertion here within the `when` form so that we don't add a mystery assertion |
| 176 | + ;; to every passing test (which is the common case). |
| 177 | + (test/is (zero? (count leaks)) |
| 178 | + "Leak detected! See log output for details."))) |
| 179 | + |
| 180 | +(defn- instrument-test-fn [tf] |
| 181 | + (if (::instrumented? tf) |
| 182 | + tf |
| 183 | + (with-meta |
| 184 | + (fn [] |
| 185 | + (with-leak-collection tf report-test-leaks!)) |
| 186 | + {::instrumented? true}))) |
| 187 | + |
| 188 | +(defn instrument-tests! |
| 189 | + "If `enabled?`, instruments all tests in the current namespace with leak detection by wrapping them |
| 190 | + in `with-leak-collection`. If leaks are detected, a corresponding (failing) assertion is injected |
| 191 | + into the test and the leak reports are logged at level `error`. |
| 192 | +
|
| 193 | + Usually placed at the end of a test namespace. |
| 194 | +
|
| 195 | + Note that this is intentionally not implemented as a fixture since there is no clean way to make a |
| 196 | + test fail from within a fixture: Neither a failing assertion nor throwing an exception will |
| 197 | + preserve which particular test caused it. See |
| 198 | + e.g. https://github.com/technomancy/leiningen/issues/2694 for an example of this." |
| 199 | + [] |
| 200 | + (when (enabled?) |
| 201 | + (->> (ns-interns *ns*) |
| 202 | + vals |
| 203 | + (filter (comp :test meta)) |
| 204 | + (run! (fn [tv] |
| 205 | + (alter-meta! tv update :test instrument-test-fn)))))) |
| 206 | + |
| 207 | +(if (enabled?) |
| 208 | + (log/info "enabled.") |
| 209 | + (log/info "disabled. This means resource leaks will be reported less accurately." |
| 210 | + "Pass `-Dio.netty.customResourceLeakDetector=aleph.resource_leak_detector` to enable.")) |
0 commit comments