Skip to content

Commit bc1ed65

Browse files
authored
Futures (#441)
* Futures * Type checking fixes * Lintman * Actual lintman * Clean up a bit * Tests * Slightly smarter definitions of pcalls and pvalues * Appropriately chunk futures in pmap * Improve the docstring a bit * Juicy * Correct ordering issues * pmap, pcalls, pvalues tests * Use newer Docker image * New dependencies I guess * Fix the importer bug * Fix seq iterator stack overflow * Py 3.6 dependencies * Fix MyPy * Address attr cmp and eq changes * Changelog * Be literally a lot smarter * Update changelog * Type check it
1 parent 0322fe8 commit bc1ed65

File tree

6 files changed

+254
-7
lines changed

6 files changed

+254
-7
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

77
## [Unreleased]
8+
### Added
9+
* Added support for `future`s (#441)
810

911
## [v0.1.dev13] - 2020-03-16
1012
### Added

src/basilisp/core.lpy

Lines changed: 106 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@
4242
^{:doc "Creates a new sequence where o is the first element and seq is the rest.
4343
If seq is nil, return a list containing o. If seq is not a Seq, attempt
4444
to coerce it to a Seq and then cons o onto the resulting sequence."
45-
:arglists '([v seq])}
45+
:arglists '([o seq])}
4646
cons
47-
(fn* cons [v seq] (basilisp.lang.runtime/cons v seq)))
47+
(fn* cons [o seq] (basilisp.lang.runtime/cons o seq)))
4848

4949
(def ^{:doc "Return the metadata from o, or nil if there is no metadata."
5050
:arglists '([o])}
@@ -807,7 +807,8 @@
807807
(basilisp.lang.atom/Atom v))
808808

809809
(defn realized?
810-
"Return true if the delay or lazy sequence has been realized."
810+
"Return true if the delay, future, lazy sequence, or promise has been
811+
realized."
811812
[o]
812813
(.-is-realized o))
813814

@@ -847,6 +848,56 @@
847848
[p v]
848849
(.deliver p v))
849850

851+
;;;;;;;;;;;;;
852+
;; Futures ;;
853+
;;;;;;;;;;;;;
854+
855+
(import* atexit)
856+
857+
(def ^:dynamic *executor-pool*
858+
(basilisp.lang.futures/ThreadPoolExecutor))
859+
860+
(atexit/register (.-shutdown *executor-pool*))
861+
862+
(defn future-call
863+
"Call the no args function f in another thread. Returns a Future object.
864+
The value returned by f can be fetched using `deref` or `@`, though
865+
doing so may block unless the `deref` with a timeout argument is used."
866+
([f]
867+
(future-call f *executor-pool*))
868+
([f pool]
869+
(.submit pool f)))
870+
871+
(defmacro future
872+
"Execute the expressions of body in another thread. Returns a Future object.
873+
The value returned by the body can be fetched using `deref` or `@`, though
874+
doing so may block unless the `deref` with a timeout argument is used."
875+
[& body]
876+
`(future-call
877+
(fn* []
878+
~@body)))
879+
880+
(defn future-cancel
881+
"Attempt to cancel the Future fut. If the future can be cancelled, return
882+
true. Otherwise, return false."
883+
[fut]
884+
(.cancel fut))
885+
886+
(defn future?
887+
"Return true if x is a future, false otherwise."
888+
[x]
889+
(instance? basilisp.lang.futures/Future x))
890+
891+
(defn future-cancelled?
892+
"Return true if the Future fut has been cancelled, false otherwise."
893+
[fut]
894+
(.cancelled fut))
895+
896+
(defn future-done?
897+
"Return true if the Future fut is done, false otherwise."
898+
[fut]
899+
(.done fut))
900+
850901
;;;;;;;;;;;;;;;;;;;;;;;;;;
851902
;; Arithmetic Functions ;;
852903
;;;;;;;;;;;;;;;;;;;;;;;;;;
@@ -2571,6 +2622,58 @@
25712622
(recur (ret))
25722623
ret)))
25732624

2625+
(import* multiprocessing)
2626+
2627+
(def ^:dynamic *pmap-cpu-count*
2628+
(* 2 (multiprocessing/cpu-count)))
2629+
2630+
(defn pmap
2631+
"Apply f as by map, but in parallel using futures.
2632+
2633+
This may only be useful for functions which are mainly blocked on IO, since
2634+
Python threads do not allow parallel computation using threads.
2635+
2636+
This function is not fully lazy. Chunks of elements from the input collection
2637+
are grabbed eagerly to spawn Futures of `f`. Elements after the chunk size in
2638+
the input collection are grabbed lazily. The chunk size is set by
2639+
`*pmap-cpu-count*` and defaults to 2 times the number of CPU cores on the
2640+
machine; the value of `*pmap-cpu-count*` is captured when `pmap` is first
2641+
called, so it should be safe to bind in the initial calling thread. The
2642+
sequence of futures is fed into a final (fully lazy) sequence to deref the
2643+
Future and return its final value."
2644+
([f coll]
2645+
(lazy-seq
2646+
(when (seq coll)
2647+
(let [chunk-size *pmap-cpu-count*
2648+
futs (mapv #(future (f %)) (take chunk-size coll))]
2649+
(concat (map deref futs)
2650+
(pmap f (drop chunk-size coll)))))))
2651+
([f coll & colls]
2652+
(lazy-seq
2653+
(when (seq coll)
2654+
(let [chunk-size *pmap-cpu-count*
2655+
futs (apply mapv (fn [& args]
2656+
(future (apply f args)))
2657+
(take chunk-size coll)
2658+
(map #(take chunk-size %) colls))]
2659+
(concat (map deref futs)
2660+
(apply pmap f (drop chunk-size coll) (map #(drop chunk-size %) colls))))))))
2661+
2662+
(defn pcalls
2663+
"Return a lazy seq of the result of executing the no arg functions fns,
2664+
which will be evaluated in parallel."
2665+
[& fns]
2666+
(pmap (fn [f] (f)) fns))
2667+
2668+
(defmacro pvalues
2669+
"Returns a lazy seq of the result of exprs, which will be evaluated in
2670+
parallel."
2671+
[& exprs]
2672+
`(pcalls
2673+
~@(map (fn [expr]
2674+
(concat '(fn* []) [expr]))
2675+
exprs)))
2676+
25742677
;;;;;;;;;;;;;;;;;;;;;;
25752678
;; Random Functions ;;
25762679
;;;;;;;;;;;;;;;;;;;;;;

src/basilisp/lang/compiler/generator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,7 @@ def _var_ns_as_python_sym(name: str) -> str:
467467
_CORE_ALIAS = genname("core")
468468
_DELAY_ALIAS = genname("delay")
469469
_EXC_ALIAS = genname("exc")
470+
_FUTURES_ALIAS = genname("futures")
470471
_INTERFACES_ALIAS = genname("interfaces")
471472
_KW_ALIAS = genname("kw")
472473
_LIST_ALIAS = genname("llist")
@@ -488,6 +489,7 @@ def _var_ns_as_python_sym(name: str) -> str:
488489
"basilisp.core": _CORE_ALIAS,
489490
"basilisp.lang.delay": _DELAY_ALIAS,
490491
"basilisp.lang.exception": _EXC_ALIAS,
492+
"basilisp.lang.futures": _FUTURES_ALIAS,
491493
"basilisp.lang.interfaces": _INTERFACES_ALIAS,
492494
"basilisp.lang.keyword": _KW_ALIAS,
493495
"basilisp.lang.list": _LIST_ALIAS,

src/basilisp/lang/futures.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
from concurrent.futures import ( # noqa # pylint: disable=unused-import
2+
Future as _Future,
3+
ProcessPoolExecutor as _ProcessPoolExecutor,
4+
ThreadPoolExecutor as _ThreadPoolExecutor,
5+
TimeoutError as _TimeoutError,
6+
)
7+
from typing import Callable, Optional, TypeVar
8+
9+
import attr
10+
11+
from basilisp.lang.interfaces import IBlockingDeref
12+
13+
_CMP_OFF = getattr(attr, "__version_info__", (0,)) >= (19, 2)
14+
15+
T = TypeVar("T")
16+
17+
18+
@attr.s( # type: ignore
19+
auto_attribs=True,
20+
**({"eq": True} if _CMP_OFF else {"cmp": False}),
21+
frozen=True,
22+
repr=False,
23+
slots=True,
24+
)
25+
class Future(IBlockingDeref[T]):
26+
_future: "_Future[T]"
27+
28+
def __repr__(self): # pragma: no cover
29+
return self._future.__repr__()
30+
31+
def cancel(self) -> bool:
32+
return self._future.cancel()
33+
34+
def cancelled(self) -> bool:
35+
return self._future.cancelled()
36+
37+
def deref(
38+
self, timeout: Optional[float] = None, timeout_val: Optional[T] = None
39+
) -> Optional[T]:
40+
try:
41+
return self._future.result(timeout=timeout)
42+
except _TimeoutError:
43+
return timeout_val
44+
45+
def done(self) -> bool:
46+
return self._future.done()
47+
48+
@property
49+
def is_realized(self) -> bool:
50+
return self.done()
51+
52+
# Pass `Future.result(timeout=...)` through so `Executor.map(...)` can
53+
# still work with this Future wrapper.
54+
def result(self, timeout: Optional[float] = None) -> T:
55+
return self._future.result(timeout=timeout)
56+
57+
58+
# Basilisp's standard Future executor is the `ThreadPoolExecutor`, but since
59+
# it is set via a dynamic variable, it can be rebound using the binding macro.
60+
# Callers may wish to use a process pool if they have CPU bound work.
61+
62+
63+
class ProcessPoolExecutor(_ProcessPoolExecutor): # pragma: no cover
64+
def __init__(self, max_workers: Optional[int] = None):
65+
super().__init__(max_workers=max_workers)
66+
67+
def submit( # type: ignore
68+
self, fn: Callable[..., T], *args, **kwargs
69+
) -> "Future[T]":
70+
return Future(super().submit(fn, *args, **kwargs))
71+
72+
73+
class ThreadPoolExecutor(_ThreadPoolExecutor):
74+
def __init__(
75+
self,
76+
max_workers: Optional[int] = None,
77+
thread_name_prefix: str = "basilisp-futures",
78+
):
79+
super().__init__(max_workers=max_workers, thread_name_prefix=thread_name_prefix)
80+
81+
def submit( # type: ignore
82+
self, fn: Callable[..., T], *args, **kwargs
83+
) -> "Future[T]":
84+
return Future(super().submit(fn, *args, **kwargs))

src/basilisp/lang/runtime.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ class Namespace(ReferenceBase):
409409
"basilisp.lang.compiler",
410410
"basilisp.lang.delay",
411411
"basilisp.lang.exception",
412+
"basilisp.lang.futures",
412413
"basilisp.lang.interfaces",
413414
"basilisp.lang.keyword",
414415
"basilisp.lang.list",
@@ -1052,10 +1053,10 @@ def deref(o, timeout_s=None, timeout_val=None):
10521053
timeout_val are supplied, deref will wait at most timeout_s seconds,
10531054
returning timeout_val if timeout_s seconds elapse and o has not
10541055
returned."""
1055-
if isinstance(o, IDeref):
1056-
return o.deref()
1057-
elif isinstance(o, IBlockingDeref):
1056+
if isinstance(o, IBlockingDeref):
10581057
return o.deref(timeout_s, timeout_val)
1058+
elif isinstance(o, IDeref):
1059+
return o.deref()
10591060
raise TypeError(f"Object of type {type(o)} cannot be dereferenced")
10601061

10611062

@@ -1370,7 +1371,7 @@ def decorator(f):
13701371

13711372
def _fn_with_meta(f, meta: Optional[lmap.Map]):
13721373
"""Return a new function with the given meta. If the function f already
1373-
has a meta map, then merge the """
1374+
has a meta map, then merge the new meta with the existing meta."""
13741375

13751376
if not isinstance(meta, lmap.Map):
13761377
raise TypeError("meta must be a map")

tests/basilisp/core_fns_test.lpy

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
(ns tests.basilisp.core-fns-test
2+
(:import time)
23
(:require
34
[basilisp.test :refer [deftest are is testing]]))
45

@@ -335,6 +336,60 @@
335336
(is (= "lo w" (subs "hello world" 3 7)))
336337
(is (thrown? python/IndexError (subs "hello world" 12 3))))
337338

339+
(deftest futures-test
340+
(testing "successful future"
341+
(let [fut (future 1)]
342+
(is (= 1 @fut))
343+
(is (= 1 @fut))
344+
(is (= false (future-cancel fut)))
345+
(is (= false (future-cancelled? fut)))
346+
(is (= true (future-done? fut)))
347+
(is (= true (realized? fut)))))
348+
349+
(testing "timed deref of future"
350+
(let [fut (future (time/sleep 3))]
351+
(is (= :timed-out (deref fut 0.01 :timed-out)))
352+
(is (= false (future-cancelled? fut)))
353+
(is (= false (future-done? fut)))
354+
;; can't always cancel a sleep-ed Future
355+
(future-cancel fut))))
356+
357+
(deftest pmap-test
358+
(binding [*pmap-cpu-count* 2]
359+
(let [slow (fn slow [x]
360+
(time/sleep 0.5)
361+
(+ x 10))]
362+
(is (= [] (vec (pmap slow []))))
363+
(is (= [11] (vec (pmap slow [1]))))
364+
(is (= [11 12 13 14 15 16 17 18 19]
365+
(vec (pmap slow (range 1 10))))))))
366+
367+
(deftest pcalls-test
368+
(binding [*pmap-cpu-count* 2]
369+
(let [slow (fn slow [x]
370+
(time/sleep 0.5)
371+
(+ x 10))]
372+
(is (= [] (vec (pcalls))))
373+
(is (= [11] (vec (pcalls #(slow 1)))))
374+
(is (= [11 12 13 14]
375+
(vec (pcalls #(slow 1)
376+
#(slow 2)
377+
#(slow 3)
378+
#(slow 4))))))))
379+
380+
(deftest pvalues-test
381+
(binding [*pmap-cpu-count* 2]
382+
(let [slow (fn slow [x]
383+
(time/sleep 0.5)
384+
(+ x 10))]
385+
(is (= [] (vec (pvalues))))
386+
(is (= [11] (vec (pvalues (slow 1)))))
387+
(is (= [11 12 13 14]
388+
(vec (pvalues (slow 1)
389+
(slow 2)
390+
(slow 3)
391+
(slow 4))))))))
392+
338393
(deftest to-array-test
339394
(is (= #py [] (to-array [])))
340395
(is (= #py [] (to-array '())))

0 commit comments

Comments
 (0)