Skip to content

Commit de8e606

Browse files
committed
Added dynamic-require helper to provide dyn-load of ioc in macro body.
1 parent 8c6b263 commit de8e606

File tree

2 files changed

+54
-25
lines changed

2 files changed

+54
-25
lines changed

src/main/clojure/clojure/core/async.clj

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,15 @@ to IOC if not available. If AOT compiling, go blocks are always compiled
5858
as normal Clojure code to be run on vthreads and will throw at runtime
5959
if vthreads are not available (Java <21)
6060
61-
\"avoid\" - means that vthreads will not be used - you can use this to
62-
minimize impacts if you are not yet ready to evaluate vthreads in your app.
63-
If AOT compiling, go blocks will use IOC. At runtime, io-thread and the
64-
:io thread pool use platform threads
61+
\"avoid\" - means that vthreads will not be used by core.async - you can
62+
use this to minimize impacts if you are not yet ready to utilize vthreads
63+
in your app. If AOT compiling, go blocks will use IOC. At runtime, io-thread
64+
and the :io thread pool use platform threads
65+
66+
Note: existing IOC compiled go blocks from older core.async versions continue
67+
to work (we retain and load the IOC state machine runtime - this does not
68+
require the analyzer), and you can interact with the same channels from both
69+
IOC and vthread code.
6570
"
6671
(:refer-clojure :exclude [reduce transduce into merge map take partition
6772
partition-by bounded-count])
@@ -80,8 +85,8 @@ If AOT compiling, go blocks will use IOC. At runtime, io-thread and the
8085

8186
(alias 'core 'clojure.core)
8287

83-
(when (not (or (dispatch/aot-vthreads?) (dispatch/runtime-vthreads?)))
84-
(require 'clojure.core.async.impl.go))
88+
(def go-becomes-ioc? (not (or (dispatch/vthreads-available-and-allowed?)
89+
(dispatch/target-vthreads?))))
8590

8691
(set! *warn-on-reflection* false)
8792

@@ -163,11 +168,13 @@ If AOT compiling, go blocks will use IOC. At runtime, io-thread and the
163168
(timers/timeout msecs))
164169

165170
(defmacro defparkingop
171+
"Emits either parking op or reimplement as blocking op when vthreads
172+
available."
166173
[op doc arglist & body]
167174
(let [as (mapv #(list 'quote %) arglist)
168175
blockingop (-> op name (str "!") symbol)]
169176
`(def ~(with-meta op {:arglists `(list ~as) :doc doc})
170-
(if (dispatch/runtime-vthreads?)
177+
(if (dispatch/vthreads-available-and-allowed?)
171178
(fn [~'& ~'args]
172179
~(list* apply blockingop '[args]))
173180
(fn ~arglist
@@ -198,8 +205,9 @@ If AOT compiling, go blocks will use IOC. At runtime, io-thread and the
198205
(deref p))))
199206

200207
(defparkingop <!
201-
"takes a val from port. Must be called inside a (go ...) block. Will
202-
return nil if closed. Will park if nothing is available."
208+
"takes a val from port. Must be called inside a (go ...) block, or on
209+
a virtual thread. Will return nil if closed. Will park if nothing is
210+
available."
203211
[port]
204212
(assert nil "<! used not in (go ...) block"))
205213

@@ -238,7 +246,8 @@ If AOT compiling, go blocks will use IOC. At runtime, io-thread and the
238246

239247
(defparkingop >!
240248
"puts a val into port. nil values are not allowed. Must be called
241-
inside a (go ...) block. Will park if no buffer space is available.
249+
inside a (go ...) block, or on a virtual thread. Will park if no buffer
250+
space is available.
242251
Returns true unless port is already closed."
243252
[port val]
244253
(assert nil ">! used not in (go ...) block"))
@@ -381,9 +390,9 @@ If AOT compiling, go blocks will use IOC. At runtime, io-thread and the
381390

382391
(defparkingop alts!
383392
"Completes at most one of several channel operations. Must be called
384-
inside a (go ...) block. ports is a vector of channel endpoints,
385-
which can be either a channel to take from or a vector of
386-
[channel-to-put-to val-to-put], in any combination. Takes will be
393+
inside a (go ...) block, or on a virtual thread. ports is a vector of
394+
channel endpoints, which can be either a channel to take from or a vector
395+
of [channel-to-put-to val-to-put], in any combination. Takes will be
387396
made as if by <!, and puts will be made as if by >!. Unless
388397
the :priority option is true, if more than one port operation is
389398
ready a non-deterministic choice will be made. If no operation is
@@ -522,10 +531,13 @@ If AOT compiling, go blocks will use IOC. At runtime, io-thread and the
522531
Returns a channel which will receive the result of the body when
523532
completed"
524533
[& body]
525-
(cond (dispatch/aot-vthreads?) `(do (dispatch/ensure-runtime-vthreads!)
526-
(thread-call (^:once fn* [] ~@body) :io))
527-
(dispatch/runtime-vthreads?) `(thread-call (^:once fn* [] ~@body) :io)
528-
:default ((find-var 'clojure.core.async.impl.go/go-impl) &env body)))
534+
(let [rt-check-step (when clojure.core/*compile-files*
535+
'(dispatch/ensure-runtime-vthreads!))]
536+
(if go-becomes-ioc?
537+
(do (dispatch/dynamic-require 'clojure.core.async.impl.go)
538+
((find-var 'clojure.core.async.impl.go/go-impl) &env body))
539+
`(do ~rt-check-step
540+
(thread-call (^:once fn* [] ~@body) :io)))))
529541

530542
(defonce ^:private thread-macro-executor nil)
531543

src/main/clojure/clojure/core/async/impl/dispatch.clj

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,25 +85,42 @@
8585
[]
8686
(System/getProperty "clojure.core.async.vthreads"))
8787

88-
(defn aot-vthreads? []
89-
(and clojure.core/*compile-files*
90-
(= (vthreads-directive) "target")))
88+
(defn target-vthreads? []
89+
(= (vthreads-directive) "target"))
9190

92-
(def runtime-vthreads?
91+
(def vthreads-available-and-allowed?
9392
(memoize
9493
(fn []
95-
(and (not clojure.core/*compile-files*)
96-
(not= (vthreads-directive) "avoid")
94+
(and (not= (vthreads-directive) "avoid")
9795
@virtual-threads-available?))))
9896

9997
(defn ensure-runtime-vthreads! []
100-
(when (not (runtime-vthreads?))
98+
(when (not (vthreads-available-and-allowed?))
10199
(throw (ex-info "Code compiled to target virtual threads, but is running on a JVM without vthread support."
102100
{:runtime-jvm-version (System/getProperty "java.version")}))))
103101

102+
(defn dynamic-require [& args]
103+
(let [p (promise)
104+
n *ns*
105+
ll @#'clojure.core/*loaded-libs*]
106+
(.start
107+
(Thread.
108+
(fn []
109+
(deliver p
110+
(binding [*ns* n
111+
clojure.core/*loaded-libs* ll]
112+
(try
113+
(apply require args)
114+
(catch Exception e
115+
e)))))))
116+
(let [res @p]
117+
(if (instance? Exception res)
118+
(throw res)
119+
res))))
120+
104121
(defn- make-io-executor
105122
[]
106-
(if (runtime-vthreads?)
123+
(if (vthreads-available-and-allowed?)
107124
(-> (.getDeclaredMethod Executors "newVirtualThreadPerTaskExecutor" (make-array Class 0))
108125
(.invoke nil (make-array Class 0)))
109126
(make-ctp-named :io)))

0 commit comments

Comments
 (0)