Skip to content

Commit b350ef6

Browse files
committed
Rearranging best fit thread-call into impl ns
1 parent 2029bad commit b350ef6

File tree

3 files changed

+47
-32
lines changed

3 files changed

+47
-32
lines changed

src/main/clojure/clojure/core/async.clj

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,11 @@ to catch and handle."
3030
clojure.core.async.impl.go ;; TODO: make conditional
3131
[clojure.core.async.impl.mutex :as mutex]
3232
[clojure.core.async.impl.concurrent :as conc]
33-
[clojure.core.async.impl.exec.threadpool :as threadp])
33+
)
3434
(:import [java.util.concurrent.atomic AtomicLong]
3535
[java.util.concurrent.locks Lock]
3636
[java.util.concurrent Executors Executor ThreadLocalRandom ExecutorService]
37-
[java.util Arrays ArrayList]
38-
[clojure.lang Var]))
37+
[java.util Arrays ArrayList]))
3938

4039
(alias 'core 'clojure.core)
4140

@@ -462,24 +461,7 @@ to catch and handle."
462461
[& body]
463462
(#'clojure.core.async.impl.go/go-impl &env body))
464463

465-
(defn- best-fit-thread-call
466-
[f exec]
467-
(let [c (chan 1)
468-
^ExecutorService e (case exec
469-
:compute threadp/compute-executor
470-
:io threadp/io-executor
471-
threadp/mixed-executor)]
472-
(let [binds (Var/getThreadBindingFrame)]
473-
(.execute e
474-
(fn []
475-
(Var/resetThreadBindingFrame binds)
476-
(try
477-
(let [ret (f)]
478-
(when-not (nil? ret)
479-
(>!! c ret)))
480-
(finally
481-
(close! c))))))
482-
c))
464+
(require '[clojure.core.async.impl.exec.services :as exec-services])
483465

484466
(defn thread-call
485467
"Executes f in another thread, returning immediately to the calling
@@ -488,15 +470,15 @@ to catch and handle."
488470
nature of f's workload, one of :mixed (default) :io or :compute
489471
whereby core.async may be able to choose a best fit thread type."
490472
[f]
491-
(best-fit-thread-call f :mixed))
473+
(exec-services/best-fit-thread-call f :mixed))
492474

493475
(defmacro io-thread
494476
"Executes the body in a thread intended for blocking I/O workloads,
495477
returning immediately to the calling thread. The body must not do
496478
extended computation (if so, use 'thread' instead). Returns a channel
497479
which will receive the result of the body when completed, then close."
498480
[& body]
499-
`(#'best-fit-thread-call (^:once fn* [] ~@body) :io))
481+
`(exec-services/best-fit-thread-call (^:once fn* [] ~@body) :io))
500482

501483
(defmacro thread
502484
"Executes the body in another thread, returning immediately to the
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
;; Copyright (c) Rich Hickey and contributors. All rights reserved.
2+
;; The use and distribution terms for this software are covered by the
3+
;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
4+
;; which can be found in the file epl-v10.html at the root of this distribution.
5+
;; By using this software in any fashion, you are agreeing to be bound by
6+
;; the terms of this license.
7+
;; You must not remove this notice, or any other, from this software.
8+
9+
(ns clojure.core.async.impl.exec.services
10+
(:require [clojure.core.async.impl.concurrent :as conc])
11+
(:import [java.util.concurrent Executors ExecutorService]
12+
[clojure.lang Var]))
13+
14+
(set! *warn-on-reflection* true)
15+
16+
(defonce ^ExecutorService mixed-executor
17+
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-mixed-%d" true)))
18+
19+
(defonce ^ExecutorService io-executor
20+
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-io-%d" true)))
21+
22+
(defonce ^ExecutorService compute-executor
23+
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-compute-%d" true)))
24+
25+
(defn best-fit-thread-call
26+
[f exec]
27+
(let [c (clojure.core.async/chan 1)
28+
^ExecutorService e (case exec
29+
:compute compute-executor
30+
:io io-executor
31+
mixed-executor)]
32+
(let [binds (Var/getThreadBindingFrame)]
33+
(.execute e
34+
(fn []
35+
(Var/resetThreadBindingFrame binds)
36+
(try
37+
(let [ret (f)]
38+
(when-not (nil? ret)
39+
(clojure.core.async/>!! c ret)))
40+
(finally
41+
(clojure.core.async/close! c))))))
42+
c))

src/main/clojure/clojure/core/async/impl/exec/threadpool.clj

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,3 @@
3030
(reify impl/Executor
3131
(impl/exec [_ r]
3232
(.execute executor-svc ^Runnable r))))))
33-
34-
(defonce ^ExecutorService mixed-executor
35-
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-mixed-%d" true)))
36-
37-
(defonce ^ExecutorService io-executor
38-
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-io-%d" true)))
39-
40-
(defonce ^ExecutorService compute-executor
41-
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-compute-%d" true)))

0 commit comments

Comments
 (0)