|
13 | 13 |
|
14 | 14 | (set! *warn-on-reflection* true) |
15 | 15 |
|
| 16 | +(defonce ^:private in-dispatch (ThreadLocal.)) |
| 17 | + |
| 18 | +(defonce executor nil) |
| 19 | + |
16 | 20 | (defn counted-thread-factory |
17 | 21 | "Create a ThreadFactory that maintains a counter for naming Threads. |
18 | 22 | name-format specifies thread names - use %d to include counter |
|
44 | 48 | any use of the async thread pool." |
45 | 49 | (delay (or (Long/getLong "clojure.core.async.pool-size") 8))) |
46 | 50 |
|
47 | | -(defonce ^:private in-dispatch (ThreadLocal.)) |
48 | | - |
49 | 51 | (defn in-dispatch-thread? |
50 | 52 | "Returns true if the current thread is a go block dispatch pool thread" |
51 | 53 | [] |
|
65 | 67 | (.uncaughtException (Thread/currentThread) ex)) |
66 | 68 | nil) |
67 | 69 |
|
68 | | -(defn- executor-ctor |
| 70 | +(defn- make-ctp-named |
69 | 71 | [workflow] |
70 | 72 | (Executors/newCachedThreadPool (counted-thread-factory (str "async-" (name workflow) "-%d") true))) |
71 | 73 |
|
72 | | -(def ^:private default-construct-executor |
| 74 | +(def ^:private default-executor-factory |
73 | 75 | (memoize |
74 | 76 | (fn [workload] |
75 | 77 | (case workload |
76 | | - :compute (executor-ctor :compute) |
77 | | - :io (executor-ctor :io) |
78 | | - :mixed (executor-ctor :mixed) |
79 | | - :core-async-dispatch (default-construct-executor :io))))) |
| 78 | + :compute (make-ctp-named :compute) |
| 79 | + :io (make-ctp-named :io) |
| 80 | + :mixed (make-ctp-named :mixed) |
| 81 | + :core-async-dispatch (make-ctp-named :io))))) |
80 | 82 |
|
81 | 83 | (defn construct-executor |
82 | 84 | ^ExecutorService [workload] |
83 | 85 | (if-let [sysprop-ctor (when-let [esf (System/getProperty "clojure.core.async.executor-factory")] |
84 | 86 | (requiring-resolve (symbol esf)))] |
85 | | - (or (sysprop-ctor workload) (default-construct-executor workload)) |
86 | | - (default-construct-executor workload))) |
| 87 | + (or (sysprop-ctor workload) (default-executor-factory workload)) |
| 88 | + (default-executor-factory workload))) |
87 | 89 |
|
88 | 90 | (defn executor-for [workload] |
89 | 91 | (case workload |
|
96 | 98 | (let [^ExecutorService e (executor-for workload)] |
97 | 99 | (.execute e r))) |
98 | 100 |
|
99 | | -(defonce executor |
100 | | - (delay (let [^ExecutorService executor-svc (construct-executor :core-async-dispatch)] |
101 | | - (reify impl/Executor |
102 | | - (impl/exec [_ r] |
103 | | - (.execute executor-svc ^Runnable r)))))) |
104 | | - |
105 | 101 | (defn run |
106 | 102 | "Runs Runnable r on current thread when :on-caller? meta true, else in a thread pool thread." |
107 | 103 | [^Runnable r] |
108 | 104 | (if (-> r meta :on-caller?) |
109 | 105 | (try (.run r) (catch Throwable t (ex-handler t))) |
110 | | - (impl/exec @executor r))) |
111 | | - |
| 106 | + (exec r :io))) |
0 commit comments