From e9bee474091f49c12aa51d8bb9562c8958c65ca5 Mon Sep 17 00:00:00 2001 From: Frank Schlimbach Date: Tue, 7 Mar 2017 10:50:27 -0600 Subject: [PATCH 1/7] new implementations: multiprocessing, ipyparallel, mpi --- base_bs_erf.py | 142 +++++++++++++++--------------- bs_erf_apply.py | 31 +++++++ bs_erf_map.py | 33 +++++++ bs_erf_mpi.py | 36 ++++++++ bs_erf_naive.py | 54 ++++++++++-- bs_erf_naive_apply_pool.py | 6 ++ bs_erf_naive_apply_threadpool.py | 6 ++ bs_erf_naive_map_pool.py | 6 ++ bs_erf_naive_map_threadpool.py | 6 ++ bs_erf_naive_mpi.py | 5 ++ bs_erf_naive_threading.py | 5 ++ bs_erf_numpy.py | 48 +++++----- bs_erf_numpy_apply_ipyparallel.py | 55 ++++++++++++ bs_erf_numpy_apply_pool.py | 6 ++ bs_erf_numpy_apply_threadpool.py | 6 ++ bs_erf_numpy_map_pool.py | 6 ++ bs_erf_numpy_map_threadpool.py | 6 ++ bs_erf_numpy_mpi.py | 5 ++ bs_erf_numpy_threading.py | 5 ++ bs_erf_threading.py | 30 +++++++ 20 files changed, 397 insertions(+), 100 deletions(-) create mode 100644 bs_erf_apply.py create mode 100644 bs_erf_map.py create mode 100644 bs_erf_mpi.py create mode 100644 bs_erf_naive_apply_pool.py create mode 100644 bs_erf_naive_apply_threadpool.py create mode 100644 bs_erf_naive_map_pool.py create mode 100644 bs_erf_naive_map_threadpool.py create mode 100644 bs_erf_naive_mpi.py create mode 100644 bs_erf_naive_threading.py create mode 100644 bs_erf_numpy_apply_ipyparallel.py create mode 100644 bs_erf_numpy_apply_pool.py create mode 100644 bs_erf_numpy_apply_threadpool.py create mode 100644 bs_erf_numpy_map_pool.py create mode 100644 bs_erf_numpy_map_threadpool.py create mode 100644 bs_erf_numpy_mpi.py create mode 100644 bs_erf_numpy_threading.py create mode 100644 bs_erf_threading.py diff --git a/base_bs_erf.py b/base_bs_erf.py index 74a1bc2..1d09384 100644 --- a/base_bs_erf.py +++ b/base_bs_erf.py @@ -70,73 +70,75 @@ def gen_data(nopt): rnd.uniform(TL, TH, nopt), ) -############################################## - -def run(name, alg, sizes=15, step=2, nopt=1024, nparr=True, dask=False, pass_args=False): - import argparse - parser = argparse.ArgumentParser() - parser.add_argument('--steps', required=False, default=sizes, help="Number of steps") - parser.add_argument('--step', required=False, default=step, help="Factor for each step") - parser.add_argument('--chunk', required=False, default=2000000,help="Chunk size for Dask") - parser.add_argument('--size', required=False, default=nopt, help="Initial data size") - parser.add_argument('--repeat',required=False, default=100, help="Iterations inside measured region") - parser.add_argument('--dask', required=False, default="sq", help="Dask scheduler: sq, mt, mp") - parser.add_argument('--text', required=False, default="", help="Print with each result") - - args = parser.parse_args() - sizes= int(args.steps) - step = int(args.step) - nopt = int(args.size) - chunk= int(args.chunk) - repeat=int(args.repeat) - kwargs={} - - if(dask): - import dask - import dask.multiprocessing - import dask.array as da - dask_modes = { - "sq": dask.async.get_sync, - "mt": dask.threaded.get, - "mp": dask.multiprocessing.get - } - kwargs = {"schd": dask_modes[args.dask]} - name += "-"+args.dask - - for i in xrange(sizes): - price, strike, t = gen_data(nopt) - if not nparr: - call = [0.0 for i in range(nopt)] - put = [-1.0 for i in range(nopt)] - price=list(price) - strike=list(strike) - t=list(t) - repeat=1 # !!!!! ignore repeat count - if dask: - assert(not pass_args) - price = da.from_array(price, chunks=(chunk,), name=False) - strike = da.from_array(strike, chunks=(chunk,), name=False) - t = da.from_array(t, chunks=(chunk,), name=False) - if pass_args: - call = np.zeros(nopt, dtype=np.float64) - put = -np.ones(nopt, dtype=np.float64) - iterations = xrange(repeat) - print("ERF: {}: Size: {}".format(name, nopt)), - sys.stdout.flush() - - if pass_args: - alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, call, put) #warmup - t0 = now() - for _ in iterations: - alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, call, put, **kwargs) - else: - alg(nopt, price, strike, t, RISK_FREE, VOLATILITY) #warmup - t0 = now() - for _ in iterations: - alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, **kwargs) - mops = get_mops(t0, nopt) - print("MOPS: {}".format(mops*2*repeat), args.text) - nopt *= step - repeat -= step - if repeat < 1: - repeat = 1 +############################################## + +def run(name, alg, sizes=15, step=2, nopt=1024, nparr=True, dask=False, pass_args=False, verbose=True): + import argparse + parser = argparse.ArgumentParser() + parser.add_argument('--steps', required=False, default=sizes, help="Number of steps") + parser.add_argument('--step', required=False, default=step, help="Factor for each step") + parser.add_argument('--chunk', required=False, default=2000000,help="Chunk size for Dask") + parser.add_argument('--size', required=False, default=nopt, help="Initial data size") + parser.add_argument('--repeat',required=False, default=100, help="Iterations inside measured region") + parser.add_argument('--dask', required=False, default="sq", help="Dask scheduler: sq, mt, mp") + parser.add_argument('--text', required=False, default="", help="Print with each result") + + args = parser.parse_args() + sizes= int(args.steps) + step = int(args.step) + nopt = int(args.size) + chunk= int(args.chunk) + repeat=int(args.repeat) + kwargs={} + + if(dask): + import dask + import dask.multiprocessing + import dask.array as da + dask_modes = { + "sq": dask.async.get_sync, + "mt": dask.threaded.get, + "mp": dask.multiprocessing.get + } + kwargs = {"schd": dask_modes[args.dask]} + name += "-"+args.dask + + for i in xrange(sizes): + price, strike, t = gen_data(nopt) + if not nparr: + call = [0.0 for i in range(nopt)] + put = [-1.0 for i in range(nopt)] + price=list(price) + strike=list(strike) + t=list(t) + repeat=1 # !!!!! ignore repeat count + if dask: + assert(not pass_args) + price = da.from_array(price, chunks=(chunk,), name=False) + strike = da.from_array(strike, chunks=(chunk,), name=False) + t = da.from_array(t, chunks=(chunk,), name=False) + if pass_args: + call = np.zeros(nopt, dtype=np.float64) + put = -np.ones(nopt, dtype=np.float64) + iterations = xrange(repeat) + if verbose: + print("ERF: {}: Size: {}".format(name, nopt)), + sys.stdout.flush() + + if pass_args: + alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, call, put) #warmup + t0 = now() + for _ in iterations: + alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, call, put, **kwargs) + else: + alg(nopt, price, strike, t, RISK_FREE, VOLATILITY) #warmup + t0 = now() + for _ in iterations: + alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, **kwargs) + mops = get_mops(t0, nopt) + if verbose: + print("MOPS: {}".format(mops*2*repeat), args.text) + nopt *= step + repeat -= step + if repeat < 1: + repeat = 1 diff --git a/bs_erf_apply.py b/bs_erf_apply.py new file mode 100644 index 0000000..9280a3e --- /dev/null +++ b/bs_erf_apply.py @@ -0,0 +1,31 @@ +import base_bs_erf +import numpy as np +import multiprocessing +import bs_erf_naive + +global bs_impl +global pool +global nump + + +def black_scholes(nopt, price, strike, t, rate, vol): + global bs_impl + global pool + global nump + noptpp = int(nopt/nump) + call = np.empty(nopt, dtype=np.float64) + put = np.empty(nopt, dtype=np.float64) + asyncs = [pool.apply_async(bs_impl, (noptpp, price[i:i+noptpp], strike[i:i+noptpp], t[i:i+noptpp], rate, vol)) for i in range(0, nopt, noptpp)] + for a,i in zip(asyncs, range(len(asyncs))): + call[i:i+noptpp], put[i:i+noptpp] = a.get() + return call, put + + +def main(title, impl, thepool): + global bs_impl + global pool + global nump + bs_impl = impl + nump = multiprocessing.cpu_count() + pool = thepool(nump) + base_bs_erf.run(title, black_scholes, pass_args=False) diff --git a/bs_erf_map.py b/bs_erf_map.py new file mode 100644 index 0000000..b5a20d0 --- /dev/null +++ b/bs_erf_map.py @@ -0,0 +1,33 @@ +import base_bs_erf +import multiprocessing + +global bs_impl +global pool +global nump + + +class bs(object): + def __init__(self, nopt, rate, vol): + self.nopt = nopt + self.rate = rate + self.vol = vol + + def __call__(self, zipped): + return bs_impl(self.nopt, *zipped, self.rate, self.vol) + + +def black_scholes(nopt, price, strike, t, rate, vol): + global bs_impl + global pool + z = list(zip(price, strike, t)) + return pool.map(bs(nopt, rate, vol), z) + + +def main(title, impl, thepool): + global bs_impl + global pool + global nump + bs_impl = impl + nump = multiprocessing.cpu_count() + pool = thepool(nump) + base_bs_erf.run(title, black_scholes, pass_args=False) diff --git a/bs_erf_mpi.py b/bs_erf_mpi.py new file mode 100644 index 0000000..3341971 --- /dev/null +++ b/bs_erf_mpi.py @@ -0,0 +1,36 @@ +import base_bs_erf +import numpy as np +from mpi4py import MPI + +global nump +global bs_impl + + +def black_scholes(nopt, price, strike, t, rate, vol, call, put): + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + noptpp = int(nopt/nump) + + myprice = np.empty(noptpp, dtype=np.float64) + mystrike = np.empty(noptpp, dtype=np.float64) + myt = np.empty(noptpp, dtype=np.float64) + + # Scatter data into arrays + comm.Scatter(price, myprice, root=0) + comm.Scatter(strike, mystrike, root=0) + comm.Scatter(t, myt, root=0) + + mycall, myput = bs_impl(noptpp, myprice, mystrike, myt, rate, vol) + + comm.Gather(mycall, call) + comm.Gather(myput, put) + + return call, put + + +def main(title, impl): + global nump + global bs_impl + nump = MPI.COMM_WORLD.size + bs_impl = impl + base_bs_erf.run(title, black_scholes, pass_args=True, verbose=MPI.COMM_WORLD.Get_rank()==0) diff --git a/bs_erf_naive.py b/bs_erf_naive.py index db576c9..1ccd34a 100644 --- a/bs_erf_naive.py +++ b/bs_erf_naive.py @@ -3,14 +3,14 @@ import numpy as np invsqrt = lambda x: 1.0/sqrt(x) -def black_scholes ( nopt, price, strike, t, rate, vol, call, put ): + +def black_scholes_args(nopt, price, strike, t, rate, vol, call, put): mr = -rate sig_sig_two = vol * vol * 2 - for i in range(nopt): - P = float( price [i] ) - S = strike [i] - T = t [i] + P = float(price[i]) + S = strike[i] + T = t[i] a = log(P / S) b = T * mr @@ -27,7 +27,43 @@ def black_scholes ( nopt, price, strike, t, rate, vol, call, put ): Se = exp(b) * S - call [i] = P * d1 - Se * d2 - put [i] = call [i] - P + Se - -base_bs_erf.run("Naive-loop", black_scholes, 4, 8, nparr=False, pass_args=True) + call[i] = P * d1 - Se * d2 + put[i] = call[i] - P + Se + return call, put + + +def black_scholes(nopt, price, strike, t, rate, vol): + call = np.zeros(nopt, dtype=np.float64) + put = -np.ones(nopt, dtype=np.float64) + return black_scholes_args(nopt, price, strike, t, rate, vol, call, put) + + +def black_scholes_map(nopt, price, strike, t, rate, vol): + mr = -rate + sig_sig_two = vol * vol * 2 + P = float(price) + S = strike + T = t + + a = log(P / S) + b = T * mr + + z = T * sig_sig_two + c = 0.25 * z + y = invsqrt(z) + + w1 = (a - b + c) * y + w2 = (a - b - c) * y + + d1 = 0.5 + 0.5 * erf(w1) + d2 = 0.5 + 0.5 * erf(w2) + + Se = exp(b) * S + + call = P * d1 - Se * d2 + put = call - P + Se + return call, put + + +if __name__ == '__main__': + base_bs_erf.run(__file__, black_scholes_args, nparr=False, pass_args=True) diff --git a/bs_erf_naive_apply_pool.py b/bs_erf_naive_apply_pool.py new file mode 100644 index 0000000..e614631 --- /dev/null +++ b/bs_erf_naive_apply_pool.py @@ -0,0 +1,6 @@ +from bs_erf_apply import main +from multiprocessing.pool import Pool +from bs_erf_naive import black_scholes + +if __name__ == '__main__': + main(__file__, black_scholes, Pool) diff --git a/bs_erf_naive_apply_threadpool.py b/bs_erf_naive_apply_threadpool.py new file mode 100644 index 0000000..7dcc184 --- /dev/null +++ b/bs_erf_naive_apply_threadpool.py @@ -0,0 +1,6 @@ +from bs_erf_apply import main +from multiprocessing.pool import ThreadPool +from bs_erf_naive import black_scholes + +if __name__ == '__main__': + main(__file__, black_scholes, ThreadPool) diff --git a/bs_erf_naive_map_pool.py b/bs_erf_naive_map_pool.py new file mode 100644 index 0000000..7f97648 --- /dev/null +++ b/bs_erf_naive_map_pool.py @@ -0,0 +1,6 @@ +from bs_erf_map import main +from multiprocessing import Pool +from bs_erf_naive import black_scholes_map + +if __name__ == '__main__': + main(__file__, black_scholes_map, Pool) diff --git a/bs_erf_naive_map_threadpool.py b/bs_erf_naive_map_threadpool.py new file mode 100644 index 0000000..21869e6 --- /dev/null +++ b/bs_erf_naive_map_threadpool.py @@ -0,0 +1,6 @@ +from bs_erf_map import main +from multiprocessing.pool import ThreadPool +from bs_erf_naive import black_scholes_map + +if __name__ == '__main__': + main(__file__, black_scholes_map, ThreadPool) diff --git a/bs_erf_naive_mpi.py b/bs_erf_naive_mpi.py new file mode 100644 index 0000000..13d31b6 --- /dev/null +++ b/bs_erf_naive_mpi.py @@ -0,0 +1,5 @@ +from bs_erf_mpi import main +from bs_erf_naive import black_scholes + +if __name__ == '__main__': + main(__file__, black_scholes) diff --git a/bs_erf_naive_threading.py b/bs_erf_naive_threading.py new file mode 100644 index 0000000..a7ed736 --- /dev/null +++ b/bs_erf_naive_threading.py @@ -0,0 +1,5 @@ +from bs_erf_naive import black_scholes +from bs_erf_threading import main + +if __name__ == '__main__': + main(__file__, black_scholes) diff --git a/bs_erf_numpy.py b/bs_erf_numpy.py index 92a19fe..716f9d1 100644 --- a/bs_erf_numpy.py +++ b/bs_erf_numpy.py @@ -3,32 +3,38 @@ from numpy import log, exp from base_bs_erf import erf, invsqrt -def black_scholes ( nopt, price, strike, t, rate, vol ): - mr = -rate - sig_sig_two = vol * vol * 2 +def black_scholes(nopt, price, strike, t, rate, vol): + mr = -rate + sig_sig_two = vol * vol * 2 - P = price - S = strike - T = t + P = price + S = strike + T = t - a = log(P / S) - b = T * mr + a = log(P / S) + b = T * mr - z = T * sig_sig_two - c = 0.25 * z - y = invsqrt(z) + z = T * sig_sig_two + c = 0.25 * z + y = invsqrt(z) - w1 = (a - b + c) * y - w2 = (a - b - c) * y + w1 = (a - b + c) * y + w2 = (a - b - c) * y - d1 = 0.5 + 0.5 * erf(w1) - d2 = 0.5 + 0.5 * erf(w2) + d1 = 0.5 + 0.5 * erf(w1) + d2 = 0.5 + 0.5 * erf(w2) - Se = exp(b) * S + Se = exp(b) * S - call = P * d1 - Se * d2 - put = call - P + Se - - return (call, put) + call = P * d1 - Se * d2 + put = call - P + Se + + return call, put -base_bs_erf.run("Numpy", black_scholes) + +def black_scholes_args(nopt, price, strike, t, rate, vol, call, put): + call[:], put[:] = black_scholes(nopt, price, strike, t, rate, vol) + + +if __name__ == '__main__': + base_bs_erf.run(__file__, black_scholes) diff --git a/bs_erf_numpy_apply_ipyparallel.py b/bs_erf_numpy_apply_ipyparallel.py new file mode 100644 index 0000000..cd2ccaf --- /dev/null +++ b/bs_erf_numpy_apply_ipyparallel.py @@ -0,0 +1,55 @@ +import base_bs_erf +import numpy as np +import ipyparallel +from ipyparallel import Client +import os +global client +global dview + + +def bs(nopt, rate, vol): + mr = -rate + sig_sig_two = vol * vol * 2 + + P = price + S = strike + T = t + + a = log(P / S) + b = T * mr + + z = T * sig_sig_two + c = 0.25 * z + y = invsqrt(z) + + w1 = (a - b + c) * y + w2 = (a - b - c) * y + + d1 = 0.5 + 0.5 * erf(w1) + d2 = 0.5 + 0.5 * erf(w2) + + Se = exp(b) * S + + call = P * d1 - Se * d2 + put = call - P + Se + + return call, put + + +def black_scholes(nopt, price, strike, t, rate, vol): #, call, put): + dview.scatter('price', price) + dview.scatter('strike', strike) + dview.scatter('t', t) + dview.push(dict(call=0,put=-1)) + r = dview.apply(bs, t, rate, vol) #, block=False) + return dview.gather('call').get(), dview.gather('put').get() + + +if __name__ == '__main__': + global client + global dview + client = Client() #profile='mpi') + dview = client[:] + dview.execute('from numpy import log, exp') + dview.execute('from base_bs_erf import erf, invsqrt') + base_bs_erf.run(__file__, black_scholes, pass_args=False) diff --git a/bs_erf_numpy_apply_pool.py b/bs_erf_numpy_apply_pool.py new file mode 100644 index 0000000..4c90229 --- /dev/null +++ b/bs_erf_numpy_apply_pool.py @@ -0,0 +1,6 @@ +from bs_erf_apply import main +from multiprocessing.pool import Pool +from bs_erf_numpy import black_scholes + +if __name__ == '__main__': + main(__file__, black_scholes, Pool) diff --git a/bs_erf_numpy_apply_threadpool.py b/bs_erf_numpy_apply_threadpool.py new file mode 100644 index 0000000..67d14ef --- /dev/null +++ b/bs_erf_numpy_apply_threadpool.py @@ -0,0 +1,6 @@ +from bs_erf_apply import main +from multiprocessing.pool import ThreadPool +from bs_erf_numpy import black_scholes + +if __name__ == '__main__': + main(__file__, black_scholes, ThreadPool) diff --git a/bs_erf_numpy_map_pool.py b/bs_erf_numpy_map_pool.py new file mode 100644 index 0000000..8fb809a --- /dev/null +++ b/bs_erf_numpy_map_pool.py @@ -0,0 +1,6 @@ +from bs_erf_map import main +from multiprocessing.pool import Pool +from bs_erf_numpy import black_scholes + +if __name__ == '__main__': + main(__file__, black_scholes, Pool) diff --git a/bs_erf_numpy_map_threadpool.py b/bs_erf_numpy_map_threadpool.py new file mode 100644 index 0000000..620f9bd --- /dev/null +++ b/bs_erf_numpy_map_threadpool.py @@ -0,0 +1,6 @@ +from bs_erf_map import main +from multiprocessing.pool import ThreadPool +from bs_erf_numpy import black_scholes + +if __name__ == '__main__': + main(__file__, black_scholes, ThreadPool) diff --git a/bs_erf_numpy_mpi.py b/bs_erf_numpy_mpi.py new file mode 100644 index 0000000..dabc085 --- /dev/null +++ b/bs_erf_numpy_mpi.py @@ -0,0 +1,5 @@ +from bs_erf_mpi import main +from bs_erf_numpy import black_scholes + +if __name__ == '__main__': + main(__file__, black_scholes) diff --git a/bs_erf_numpy_threading.py b/bs_erf_numpy_threading.py new file mode 100644 index 0000000..02ddc2f --- /dev/null +++ b/bs_erf_numpy_threading.py @@ -0,0 +1,5 @@ +from bs_erf_numpy import black_scholes_args +from bs_erf_threading import main + +if __name__ == '__main__': + main(__file__, black_scholes_args) diff --git a/bs_erf_threading.py b/bs_erf_threading.py new file mode 100644 index 0000000..25e48e0 --- /dev/null +++ b/bs_erf_threading.py @@ -0,0 +1,30 @@ +import base_bs_erf +import numpy as np +import threading +from multiprocessing import cpu_count +import bs_erf_naive + +global bs_impl +global nump + + +def black_scholes(nopt, price, strike, t, rate, vol, call, put): + global bs_impl + global nump + noptpp = int(nopt/nump) + threads = [] + for i in range(0, nopt, noptpp): + thr = threading.Thread(target=bs_impl, args=(noptpp, price[i:i+noptpp], strike[i:i+noptpp], t[i:i+noptpp], rate, vol, call[i:i+noptpp], put[i:i+noptpp])) + thr.start() + threads.append(thr) + for thr in threads: + thr.join() + return call, put + + +def main(title, impl): + global bs_impl + global nump + bs_impl = impl + nump = cpu_count() + base_bs_erf.run(title, black_scholes, pass_args=True) From 44746ae30e7ae47b294143f6cd44e18a2f8279fc Mon Sep 17 00:00:00 2001 From: Frank Schlimbach Date: Tue, 7 Mar 2017 11:30:39 -0600 Subject: [PATCH 2/7] fixed typo --- bs_erf_naive_threading.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bs_erf_naive_threading.py b/bs_erf_naive_threading.py index a7ed736..f79cb29 100644 --- a/bs_erf_naive_threading.py +++ b/bs_erf_naive_threading.py @@ -1,5 +1,5 @@ -from bs_erf_naive import black_scholes +from bs_erf_naive import black_scholes_args from bs_erf_threading import main if __name__ == '__main__': - main(__file__, black_scholes) + main(__file__, black_scholes_args) From 1250817a9364aec20325889520edf4fec6cfc303 Mon Sep 17 00:00:00 2001 From: Frank Schlimbach Date: Wed, 8 Mar 2017 05:37:40 -0600 Subject: [PATCH 3/7] - renaming files to distinguish between runnable and supporting files - removing globals --- bs_apply.py | 16 +++++++++ bs_apply_ipyparallel.py | 25 ++++++++++++++ bs_erf_apply.py | 31 ----------------- bs_erf_map.py | 33 ------------------ bs_erf_mpi.py | 36 -------------------- bs_erf_naive_apply_ipyparallel.py | 7 ++++ bs_erf_naive_apply_pool.py | 12 ++++--- bs_erf_naive_apply_threadpool.py | 9 +++-- bs_erf_naive_map_pool.py | 10 ++++-- bs_erf_naive_map_threadpool.py | 9 +++-- bs_erf_naive_mpi.py | 9 +++-- bs_erf_naive_sequential.py | 5 +++ bs_erf_naive_threading.py | 9 +++-- bs_erf_numpy_apply_ipyparallel.py | 56 +++---------------------------- bs_erf_numpy_apply_pool.py | 12 ++++--- bs_erf_numpy_apply_threadpool.py | 9 +++-- bs_erf_numpy_map_pool.py | 12 ++++--- bs_erf_numpy_map_threadpool.py | 9 +++-- bs_erf_numpy_mpi.py | 9 +++-- bs_erf_numpy_sequential.py | 5 +++ bs_erf_numpy_threading.py | 9 +++-- bs_erf_threading.py | 30 ----------------- bs_map.py | 21 ++++++++++++ bs_mpi.py | 28 ++++++++++++++++ bs_erf_naive.py => bs_naive.py | 5 --- bs_erf_numpy.py => bs_numpy.py | 5 --- bs_threading.py | 18 ++++++++++ run.sh | 16 ++++++++- 28 files changed, 223 insertions(+), 232 deletions(-) create mode 100644 bs_apply.py create mode 100644 bs_apply_ipyparallel.py delete mode 100644 bs_erf_apply.py delete mode 100644 bs_erf_map.py delete mode 100644 bs_erf_mpi.py create mode 100644 bs_erf_naive_apply_ipyparallel.py create mode 100644 bs_erf_naive_sequential.py create mode 100644 bs_erf_numpy_sequential.py delete mode 100644 bs_erf_threading.py create mode 100644 bs_map.py create mode 100644 bs_mpi.py rename bs_erf_naive.py => bs_naive.py (91%) rename bs_erf_numpy.py => bs_numpy.py (87%) create mode 100644 bs_threading.py diff --git a/bs_apply.py b/bs_apply.py new file mode 100644 index 0000000..72a3367 --- /dev/null +++ b/bs_apply.py @@ -0,0 +1,16 @@ +import numpy as np + +class bs_runner(object): + def __init__(self, bs_impl, pool, nump): + self.bs_impl = bs_impl + self.pool = pool + self.nump = nump + + def __call__(self, nopt, price, strike, t, rate, vol): + noptpp = int(nopt/self.nump) + call = np.empty(nopt, dtype=np.float64) + put = np.empty(nopt, dtype=np.float64) + asyncs = [self.pool.apply_async(self.bs_impl, (noptpp, price[i:i+noptpp], strike[i:i+noptpp], t[i:i+noptpp], rate, vol)) for i in range(0, nopt, noptpp)] + for a,i in zip(asyncs, range(len(asyncs))): + call[i:i+noptpp], put[i:i+noptpp] = a.get() + return call, put diff --git a/bs_apply_ipyparallel.py b/bs_apply_ipyparallel.py new file mode 100644 index 0000000..b552efe --- /dev/null +++ b/bs_apply_ipyparallel.py @@ -0,0 +1,25 @@ +import ipyparallel +from ipyparallel import Client + + +class bs(object): + def __init__(self, bs_impl): + self.bs_impl = bs_impl + + def __call__(self, nopt, rate, vol): + return self.bs_impl(nopt, price, strike, t, rate, vol) + + +class bs_runner(object): + def __init__(self, bs_impl): + self.bs_impl = bs_impl + self.client = Client() + self.dview = self.client[:] + + def __call__(self, nopt, price, strike, t, rate, vol): + self.dview.scatter('price', price) + self.dview.scatter('strike', strike) + self.dview.scatter('t', t) + self.dview.push(dict(call=0,put=-1)) + r = self.dview.apply(bs(self.bs_impl), t, rate, vol) + return self.dview.gather('call').get(), self.dview.gather('put').get() diff --git a/bs_erf_apply.py b/bs_erf_apply.py deleted file mode 100644 index 9280a3e..0000000 --- a/bs_erf_apply.py +++ /dev/null @@ -1,31 +0,0 @@ -import base_bs_erf -import numpy as np -import multiprocessing -import bs_erf_naive - -global bs_impl -global pool -global nump - - -def black_scholes(nopt, price, strike, t, rate, vol): - global bs_impl - global pool - global nump - noptpp = int(nopt/nump) - call = np.empty(nopt, dtype=np.float64) - put = np.empty(nopt, dtype=np.float64) - asyncs = [pool.apply_async(bs_impl, (noptpp, price[i:i+noptpp], strike[i:i+noptpp], t[i:i+noptpp], rate, vol)) for i in range(0, nopt, noptpp)] - for a,i in zip(asyncs, range(len(asyncs))): - call[i:i+noptpp], put[i:i+noptpp] = a.get() - return call, put - - -def main(title, impl, thepool): - global bs_impl - global pool - global nump - bs_impl = impl - nump = multiprocessing.cpu_count() - pool = thepool(nump) - base_bs_erf.run(title, black_scholes, pass_args=False) diff --git a/bs_erf_map.py b/bs_erf_map.py deleted file mode 100644 index b5a20d0..0000000 --- a/bs_erf_map.py +++ /dev/null @@ -1,33 +0,0 @@ -import base_bs_erf -import multiprocessing - -global bs_impl -global pool -global nump - - -class bs(object): - def __init__(self, nopt, rate, vol): - self.nopt = nopt - self.rate = rate - self.vol = vol - - def __call__(self, zipped): - return bs_impl(self.nopt, *zipped, self.rate, self.vol) - - -def black_scholes(nopt, price, strike, t, rate, vol): - global bs_impl - global pool - z = list(zip(price, strike, t)) - return pool.map(bs(nopt, rate, vol), z) - - -def main(title, impl, thepool): - global bs_impl - global pool - global nump - bs_impl = impl - nump = multiprocessing.cpu_count() - pool = thepool(nump) - base_bs_erf.run(title, black_scholes, pass_args=False) diff --git a/bs_erf_mpi.py b/bs_erf_mpi.py deleted file mode 100644 index 3341971..0000000 --- a/bs_erf_mpi.py +++ /dev/null @@ -1,36 +0,0 @@ -import base_bs_erf -import numpy as np -from mpi4py import MPI - -global nump -global bs_impl - - -def black_scholes(nopt, price, strike, t, rate, vol, call, put): - comm = MPI.COMM_WORLD - rank = comm.Get_rank() - noptpp = int(nopt/nump) - - myprice = np.empty(noptpp, dtype=np.float64) - mystrike = np.empty(noptpp, dtype=np.float64) - myt = np.empty(noptpp, dtype=np.float64) - - # Scatter data into arrays - comm.Scatter(price, myprice, root=0) - comm.Scatter(strike, mystrike, root=0) - comm.Scatter(t, myt, root=0) - - mycall, myput = bs_impl(noptpp, myprice, mystrike, myt, rate, vol) - - comm.Gather(mycall, call) - comm.Gather(myput, put) - - return call, put - - -def main(title, impl): - global nump - global bs_impl - nump = MPI.COMM_WORLD.size - bs_impl = impl - base_bs_erf.run(title, black_scholes, pass_args=True, verbose=MPI.COMM_WORLD.Get_rank()==0) diff --git a/bs_erf_naive_apply_ipyparallel.py b/bs_erf_naive_apply_ipyparallel.py new file mode 100644 index 0000000..25c5d2d --- /dev/null +++ b/bs_erf_naive_apply_ipyparallel.py @@ -0,0 +1,7 @@ +from bs_apply_ipyparallel import bs_runner +from bs_naive import black_scholes +import base_bs_erf + +if __name__ == '__main__': + bsr = bs_runner(black_scholes) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_naive_apply_pool.py b/bs_erf_naive_apply_pool.py index e614631..41b642a 100644 --- a/bs_erf_naive_apply_pool.py +++ b/bs_erf_naive_apply_pool.py @@ -1,6 +1,10 @@ -from bs_erf_apply import main -from multiprocessing.pool import Pool -from bs_erf_naive import black_scholes +from multiprocessing import Pool +from multiprocessing import cpu_count +from bs_apply import bs_runner +from bs_naive import black_scholes +import base_bs_erf if __name__ == '__main__': - main(__file__, black_scholes, Pool) + n = int(cpu_count()/2) + bsr = bs_runner(black_scholes, Pool(n), n) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_naive_apply_threadpool.py b/bs_erf_naive_apply_threadpool.py index 7dcc184..bdacf57 100644 --- a/bs_erf_naive_apply_threadpool.py +++ b/bs_erf_naive_apply_threadpool.py @@ -1,6 +1,9 @@ -from bs_erf_apply import main from multiprocessing.pool import ThreadPool -from bs_erf_naive import black_scholes +from multiprocessing import cpu_count +from bs_apply import bs_runner +from bs_naive import black_scholes +import base_bs_erf if __name__ == '__main__': - main(__file__, black_scholes, ThreadPool) + bsr = bs_runner(black_scholes, ThreadPool(cpu_count()), cpu_count()) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_naive_map_pool.py b/bs_erf_naive_map_pool.py index 7f97648..af2bb23 100644 --- a/bs_erf_naive_map_pool.py +++ b/bs_erf_naive_map_pool.py @@ -1,6 +1,10 @@ -from bs_erf_map import main from multiprocessing import Pool -from bs_erf_naive import black_scholes_map +from multiprocessing import cpu_count +from bs_map import bs_runner +from bs_naive import black_scholes_map +import base_bs_erf if __name__ == '__main__': - main(__file__, black_scholes_map, Pool) + n = int(cpu_count()/2) + bsr = bs_runner(black_scholes_map, Pool(n)) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_naive_map_threadpool.py b/bs_erf_naive_map_threadpool.py index 21869e6..b8aba7e 100644 --- a/bs_erf_naive_map_threadpool.py +++ b/bs_erf_naive_map_threadpool.py @@ -1,6 +1,9 @@ -from bs_erf_map import main from multiprocessing.pool import ThreadPool -from bs_erf_naive import black_scholes_map +from multiprocessing import cpu_count +from bs_map import bs_runner +from bs_naive import black_scholes_map +import base_bs_erf if __name__ == '__main__': - main(__file__, black_scholes_map, ThreadPool) + bsr = bs_runner(black_scholes_map, ThreadPool(cpu_count())) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_naive_mpi.py b/bs_erf_naive_mpi.py index 13d31b6..a619225 100644 --- a/bs_erf_naive_mpi.py +++ b/bs_erf_naive_mpi.py @@ -1,5 +1,8 @@ -from bs_erf_mpi import main -from bs_erf_naive import black_scholes +from mpi4py import MPI +from bs_mpi import bs_runner +from bs_naive import black_scholes +import base_bs_erf if __name__ == '__main__': - main(__file__, black_scholes) + bsr = bs_runner(black_scholes) + base_bs_erf.run(__file__, bsr, pass_args=True, verbose=MPI.COMM_WORLD.Get_rank()==0) diff --git a/bs_erf_naive_sequential.py b/bs_erf_naive_sequential.py new file mode 100644 index 0000000..1290bef --- /dev/null +++ b/bs_erf_naive_sequential.py @@ -0,0 +1,5 @@ +import base_bs_erf +from bs_naive import black_scholes_args + +if __name__ == '__main__': + base_bs_erf.run(__file__, black_scholes_args, nparr=False, pass_args=True) diff --git a/bs_erf_naive_threading.py b/bs_erf_naive_threading.py index f79cb29..3271b23 100644 --- a/bs_erf_naive_threading.py +++ b/bs_erf_naive_threading.py @@ -1,5 +1,8 @@ -from bs_erf_naive import black_scholes_args -from bs_erf_threading import main +from multiprocessing import cpu_count +from bs_threading import bs_runner +from bs_naive import black_scholes_args +import base_bs_erf if __name__ == '__main__': - main(__file__, black_scholes_args) + bsr = bs_runner(black_scholes_args, cpu_count()) + base_bs_erf.run(__file__, bsr, pass_args=True) diff --git a/bs_erf_numpy_apply_ipyparallel.py b/bs_erf_numpy_apply_ipyparallel.py index cd2ccaf..50393de 100644 --- a/bs_erf_numpy_apply_ipyparallel.py +++ b/bs_erf_numpy_apply_ipyparallel.py @@ -1,55 +1,7 @@ +from bs_apply_ipyparallel import bs_runner +from bs_numpy import black_scholes import base_bs_erf -import numpy as np -import ipyparallel -from ipyparallel import Client -import os -global client -global dview - - -def bs(nopt, rate, vol): - mr = -rate - sig_sig_two = vol * vol * 2 - - P = price - S = strike - T = t - - a = log(P / S) - b = T * mr - - z = T * sig_sig_two - c = 0.25 * z - y = invsqrt(z) - - w1 = (a - b + c) * y - w2 = (a - b - c) * y - - d1 = 0.5 + 0.5 * erf(w1) - d2 = 0.5 + 0.5 * erf(w2) - - Se = exp(b) * S - - call = P * d1 - Se * d2 - put = call - P + Se - - return call, put - - -def black_scholes(nopt, price, strike, t, rate, vol): #, call, put): - dview.scatter('price', price) - dview.scatter('strike', strike) - dview.scatter('t', t) - dview.push(dict(call=0,put=-1)) - r = dview.apply(bs, t, rate, vol) #, block=False) - return dview.gather('call').get(), dview.gather('put').get() - if __name__ == '__main__': - global client - global dview - client = Client() #profile='mpi') - dview = client[:] - dview.execute('from numpy import log, exp') - dview.execute('from base_bs_erf import erf, invsqrt') - base_bs_erf.run(__file__, black_scholes, pass_args=False) + bsr = bs_runner(black_scholes) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_numpy_apply_pool.py b/bs_erf_numpy_apply_pool.py index 4c90229..0fd220c 100644 --- a/bs_erf_numpy_apply_pool.py +++ b/bs_erf_numpy_apply_pool.py @@ -1,6 +1,10 @@ -from bs_erf_apply import main -from multiprocessing.pool import Pool -from bs_erf_numpy import black_scholes +from multiprocessing import Pool +from multiprocessing import cpu_count +from bs_apply import bs_runner +from bs_numpy import black_scholes +import base_bs_erf if __name__ == '__main__': - main(__file__, black_scholes, Pool) + n = int(cpu_count()/2) + bsr = bs_runner(black_scholes, Pool(n), n) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_numpy_apply_threadpool.py b/bs_erf_numpy_apply_threadpool.py index 67d14ef..3eea4b8 100644 --- a/bs_erf_numpy_apply_threadpool.py +++ b/bs_erf_numpy_apply_threadpool.py @@ -1,6 +1,9 @@ -from bs_erf_apply import main from multiprocessing.pool import ThreadPool -from bs_erf_numpy import black_scholes +from multiprocessing import cpu_count +from bs_apply import bs_runner +from bs_numpy import black_scholes +import base_bs_erf if __name__ == '__main__': - main(__file__, black_scholes, ThreadPool) + bsr = bs_runner(black_scholes, ThreadPool(cpu_count()), cpu_count()) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_numpy_map_pool.py b/bs_erf_numpy_map_pool.py index 8fb809a..b250bbc 100644 --- a/bs_erf_numpy_map_pool.py +++ b/bs_erf_numpy_map_pool.py @@ -1,6 +1,10 @@ -from bs_erf_map import main -from multiprocessing.pool import Pool -from bs_erf_numpy import black_scholes +from multiprocessing import Pool +from multiprocessing import cpu_count +from bs_map import bs_runner +from bs_numpy import black_scholes +import base_bs_erf if __name__ == '__main__': - main(__file__, black_scholes, Pool) + n = int(cpu_count()/2) + bsr = bs_runner(black_scholes, Pool(n)) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_numpy_map_threadpool.py b/bs_erf_numpy_map_threadpool.py index 620f9bd..a1cfab6 100644 --- a/bs_erf_numpy_map_threadpool.py +++ b/bs_erf_numpy_map_threadpool.py @@ -1,6 +1,9 @@ -from bs_erf_map import main from multiprocessing.pool import ThreadPool -from bs_erf_numpy import black_scholes +from multiprocessing import cpu_count +from bs_map import bs_runner +from bs_numpy import black_scholes +import base_bs_erf if __name__ == '__main__': - main(__file__, black_scholes, ThreadPool) + bsr = bs_runner(black_scholes, ThreadPool(cpu_count())) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_numpy_mpi.py b/bs_erf_numpy_mpi.py index dabc085..9df7fca 100644 --- a/bs_erf_numpy_mpi.py +++ b/bs_erf_numpy_mpi.py @@ -1,5 +1,8 @@ -from bs_erf_mpi import main -from bs_erf_numpy import black_scholes +from mpi4py import MPI +from bs_mpi import bs_runner +from bs_numpy import black_scholes +import base_bs_erf if __name__ == '__main__': - main(__file__, black_scholes) + bsr = bs_runner(black_scholes) + base_bs_erf.run(__file__, bsr, pass_args=True, verbose=MPI.COMM_WORLD.Get_rank()==0) diff --git a/bs_erf_numpy_sequential.py b/bs_erf_numpy_sequential.py new file mode 100644 index 0000000..668c47c --- /dev/null +++ b/bs_erf_numpy_sequential.py @@ -0,0 +1,5 @@ +import base_bs_erf +from bs_numpy import black_scholes + +if __name__ == '__main__': + base_bs_erf.run(__file__, black_scholes) diff --git a/bs_erf_numpy_threading.py b/bs_erf_numpy_threading.py index 02ddc2f..2fed7c4 100644 --- a/bs_erf_numpy_threading.py +++ b/bs_erf_numpy_threading.py @@ -1,5 +1,8 @@ -from bs_erf_numpy import black_scholes_args -from bs_erf_threading import main +from multiprocessing import cpu_count +from bs_threading import bs_runner +from bs_numpy import black_scholes_args +import base_bs_erf if __name__ == '__main__': - main(__file__, black_scholes_args) + bsr = bs_runner(black_scholes_args, cpu_count()) + base_bs_erf.run(__file__, bsr, pass_args=True) diff --git a/bs_erf_threading.py b/bs_erf_threading.py deleted file mode 100644 index 25e48e0..0000000 --- a/bs_erf_threading.py +++ /dev/null @@ -1,30 +0,0 @@ -import base_bs_erf -import numpy as np -import threading -from multiprocessing import cpu_count -import bs_erf_naive - -global bs_impl -global nump - - -def black_scholes(nopt, price, strike, t, rate, vol, call, put): - global bs_impl - global nump - noptpp = int(nopt/nump) - threads = [] - for i in range(0, nopt, noptpp): - thr = threading.Thread(target=bs_impl, args=(noptpp, price[i:i+noptpp], strike[i:i+noptpp], t[i:i+noptpp], rate, vol, call[i:i+noptpp], put[i:i+noptpp])) - thr.start() - threads.append(thr) - for thr in threads: - thr.join() - return call, put - - -def main(title, impl): - global bs_impl - global nump - bs_impl = impl - nump = cpu_count() - base_bs_erf.run(title, black_scholes, pass_args=True) diff --git a/bs_map.py b/bs_map.py new file mode 100644 index 0000000..507cc04 --- /dev/null +++ b/bs_map.py @@ -0,0 +1,21 @@ +# we can put bs and bs_runner into one as soon as our driver +# does not expect a callable but calls a member function instead +class bs(object): + def __init__(self, bs_impl, nopt, rate, vol): + self.nopt = nopt + self.rate = rate + self.vol = vol + self.bs_impl = bs_impl + + def __call__(self, zipped): + return self.bs_impl(self.nopt, *zipped, self.rate, self.vol) + + +class bs_runner(object): + def __init__(self, bs_impl, pool): + self.bs_impl = bs_impl + self.pool = pool + + def __call__(self, nopt, price, strike, t, rate, vol): + z = list(zip(price, strike, t)) + return self.pool.map(bs(self.bs_impl, nopt, rate, vol), z) diff --git a/bs_mpi.py b/bs_mpi.py new file mode 100644 index 0000000..7008f39 --- /dev/null +++ b/bs_mpi.py @@ -0,0 +1,28 @@ +import numpy as np +from mpi4py import MPI + +class bs_runner(object): + def __init__(self, bs_impl): + self.bs_impl = bs_impl + self.nump = MPI.COMM_WORLD.size + + def __call__(self, nopt, price, strike, t, rate, vol, call, put): + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + noptpp = int(nopt/self.nump) + + myprice = np.empty(noptpp, dtype=np.float64) + mystrike = np.empty(noptpp, dtype=np.float64) + myt = np.empty(noptpp, dtype=np.float64) + + # Scatter data into arrays + comm.Scatter(price, myprice, root=0) + comm.Scatter(strike, mystrike, root=0) + comm.Scatter(t, myt, root=0) + + mycall, myput = self.bs_impl(noptpp, myprice, mystrike, myt, rate, vol) + + comm.Gather(mycall, call) + comm.Gather(myput, put) + + return call, put diff --git a/bs_erf_naive.py b/bs_naive.py similarity index 91% rename from bs_erf_naive.py rename to bs_naive.py index 1ccd34a..ee563f1 100644 --- a/bs_erf_naive.py +++ b/bs_naive.py @@ -1,4 +1,3 @@ -import base_bs_erf from math import log, sqrt, exp, erf import numpy as np invsqrt = lambda x: 1.0/sqrt(x) @@ -63,7 +62,3 @@ def black_scholes_map(nopt, price, strike, t, rate, vol): call = P * d1 - Se * d2 put = call - P + Se return call, put - - -if __name__ == '__main__': - base_bs_erf.run(__file__, black_scholes_args, nparr=False, pass_args=True) diff --git a/bs_erf_numpy.py b/bs_numpy.py similarity index 87% rename from bs_erf_numpy.py rename to bs_numpy.py index 716f9d1..1dcdc60 100644 --- a/bs_erf_numpy.py +++ b/bs_numpy.py @@ -1,4 +1,3 @@ -import base_bs_erf import numpy as np from numpy import log, exp from base_bs_erf import erf, invsqrt @@ -34,7 +33,3 @@ def black_scholes(nopt, price, strike, t, rate, vol): def black_scholes_args(nopt, price, strike, t, rate, vol, call, put): call[:], put[:] = black_scholes(nopt, price, strike, t, rate, vol) - - -if __name__ == '__main__': - base_bs_erf.run(__file__, black_scholes) diff --git a/bs_threading.py b/bs_threading.py new file mode 100644 index 0000000..2aaed55 --- /dev/null +++ b/bs_threading.py @@ -0,0 +1,18 @@ +import threading + + +class bs_runner(object): + def __init__(self, bs_impl, nump): + self.bs_impl = bs_impl + self.nump = nump + + def __call__(self, nopt, price, strike, t, rate, vol, call, put): + noptpp = int(nopt/self.nump) + threads = [] + for i in range(0, nopt, noptpp): + thr = threading.Thread(target=self.bs_impl, args=(noptpp, price[i:i+noptpp], strike[i:i+noptpp], t[i:i+noptpp], rate, vol, call[i:i+noptpp], put[i:i+noptpp])) + thr.start() + threads.append(thr) + for thr in threads: + thr.join() + return call, put diff --git a/run.sh b/run.sh index fb33568..4c114bc 100755 --- a/run.sh +++ b/run.sh @@ -1,4 +1,18 @@ #!/bin/bash mkdir -p logs -for i in bs_erf_*.py; do echo -e "\n$i:"; ${PYTHON:-python} $i $* | tee -a logs/$i.log; done +for i in `ls bs_erf_*.py | egrep -v '_mpi|_ipyparallel'`; do + echo -e "\n$i:" + ${PYTHON:-python} $i $* | tee -a logs/$i.log +done +for i in `ls bs_erf_*.py | grep _mpi`; do + echo -e "\n$i:" + mpirun -n 16 ${PYTHON:-python} $i $* | tee -a logs/$i.log; +done +ipcluster start -n 16 --daemonize=True +sleep 5; sync +for i in `ls bs_erf_*.py | grep _ipyparallel`; do + echo -e "\n$i:" + ${PYTHON:-python} $i $* | tee -a logs/$i.log +done +ipcluster stop From dbfc729f34a064f1002484140fcba4c3c96c73fb Mon Sep 17 00:00:00 2001 From: Frank Schlimbach Date: Thu, 9 Mar 2017 05:10:41 -0600 Subject: [PATCH 4/7] adding py3 executor implementations --- bs_erf_naive_map_processpoolexecutor.py | 11 +++++++++++ bs_erf_naive_map_threadpoolexecutor.py | 11 +++++++++++ bs_erf_numpy_map_processpoolexecutor.py | 11 +++++++++++ bs_erf_numpy_map_threadpoolexecutor.py | 11 +++++++++++ bs_executor.py | 21 +++++++++++++++++++++ run.sh | 2 +- 6 files changed, 66 insertions(+), 1 deletion(-) create mode 100644 bs_erf_naive_map_processpoolexecutor.py create mode 100644 bs_erf_naive_map_threadpoolexecutor.py create mode 100644 bs_erf_numpy_map_processpoolexecutor.py create mode 100644 bs_erf_numpy_map_threadpoolexecutor.py create mode 100644 bs_executor.py diff --git a/bs_erf_naive_map_processpoolexecutor.py b/bs_erf_naive_map_processpoolexecutor.py new file mode 100644 index 0000000..eab190d --- /dev/null +++ b/bs_erf_naive_map_processpoolexecutor.py @@ -0,0 +1,11 @@ +from concurrent.futures import ProcessPoolExecutor +from multiprocessing import cpu_count +from bs_executor import bs_runner +from bs_naive import black_scholes_map +import base_bs_erf + +if __name__ == '__main__': + n = int(cpu_count()/2) + with ProcessPoolExecutor(n) as executor: + bsr = bs_runner(black_scholes_map, executor, n) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_naive_map_threadpoolexecutor.py b/bs_erf_naive_map_threadpoolexecutor.py new file mode 100644 index 0000000..72d5a3c --- /dev/null +++ b/bs_erf_naive_map_threadpoolexecutor.py @@ -0,0 +1,11 @@ +from concurrent.futures import ThreadPoolExecutor +from multiprocessing import cpu_count +from bs_executor import bs_runner +from bs_naive import black_scholes_map +import base_bs_erf + +if __name__ == '__main__': + n = cpu_count() + with ThreadPoolExecutor(n) as executor: + bsr = bs_runner(black_scholes_map, executor, n) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_numpy_map_processpoolexecutor.py b/bs_erf_numpy_map_processpoolexecutor.py new file mode 100644 index 0000000..a4b56bd --- /dev/null +++ b/bs_erf_numpy_map_processpoolexecutor.py @@ -0,0 +1,11 @@ +from concurrent.futures import ProcessPoolExecutor +from multiprocessing import cpu_count +from bs_executor import bs_runner +from bs_numpy import black_scholes +import base_bs_erf + +if __name__ == '__main__': + n = int(cpu_count()/2) + with ProcessPoolExecutor(n) as executor: + bsr = bs_runner(black_scholes, executor, n) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_numpy_map_threadpoolexecutor.py b/bs_erf_numpy_map_threadpoolexecutor.py new file mode 100644 index 0000000..a84f929 --- /dev/null +++ b/bs_erf_numpy_map_threadpoolexecutor.py @@ -0,0 +1,11 @@ +from concurrent.futures import ThreadPoolExecutor +from multiprocessing import cpu_count +from bs_executor import bs_runner +from bs_numpy import black_scholes +import base_bs_erf + +if __name__ == '__main__': + n = cpu_count() + with ThreadPoolExecutor(n) as executor: + bsr = bs_runner(black_scholes, executor, n) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_executor.py b/bs_executor.py new file mode 100644 index 0000000..4302335 --- /dev/null +++ b/bs_executor.py @@ -0,0 +1,21 @@ +# we can put bs and bs_runner into one as soon as our driver +# does not expect a callable but calls a member function instead +class bs(object): + def __init__(self, bs_impl, nopt, rate, vol): + self.nopt = nopt + self.rate = rate + self.vol = vol + self.bs_impl = bs_impl + + def __call__(self, *iterables): + return self.bs_impl(self.nopt, *iterables, self.rate, self.vol) + + +class bs_runner(object): + def __init__(self, bs_impl, executor, nump): + self.bs_impl = bs_impl + self.executor = executor + self.nump = nump + + def __call__(self, nopt, price, strike, t, rate, vol): + return self.executor.map(bs(self.bs_impl, nopt, rate, vol), price, strike, t, chunksize=int(nopt/self.nump)) diff --git a/run.sh b/run.sh index 4c114bc..64fd2b3 100755 --- a/run.sh +++ b/run.sh @@ -10,7 +10,7 @@ for i in `ls bs_erf_*.py | grep _mpi`; do mpirun -n 16 ${PYTHON:-python} $i $* | tee -a logs/$i.log; done ipcluster start -n 16 --daemonize=True -sleep 5; sync +sync; sleep 20; sync for i in `ls bs_erf_*.py | grep _ipyparallel`; do echo -e "\n$i:" ${PYTHON:-python} $i $* | tee -a logs/$i.log From 2eb2d21ebc4bb987e926fdb149e86b9ad50473d2 Mon Sep 17 00:00:00 2001 From: Frank Schlimbach Date: Tue, 14 Mar 2017 09:48:09 -0500 Subject: [PATCH 5/7] added distarray --- bs_erf_distarray.py | 51 +++++++++++++++++++++++++++++++++++++++++++++ bs_mpi.py | 1 - run.sh | 2 +- set_python_envs.sh | 2 +- 4 files changed, 53 insertions(+), 3 deletions(-) create mode 100644 bs_erf_distarray.py diff --git a/bs_erf_distarray.py b/bs_erf_distarray.py new file mode 100644 index 0000000..126ecd1 --- /dev/null +++ b/bs_erf_distarray.py @@ -0,0 +1,51 @@ +import distarray.globalapi as da +from distarray.globalapi import log, sqrt, exp +import numpy as np +invsqrt = lambda x: 1.0/sqrt(x) +from base_bs_erf import run, erf + + +def black_scholes(ctxt, nopt, price, strike, t, rate, vol): + mr = -rate + sig_sig_two = vol * vol * 2 + + P = price + S = strike + T = t + + a = log(P / S) + b = T * mr + + z = T * sig_sig_two + c = 0.25 * z + y = invsqrt(z) + + w1 = (a - b + c) * y + w2 = (a - b - c) * y + + d1 = 0.5 + 0.5 * ctxt.fromarray(erf(np.asarray(w1))) + d2 = 0.5 + 0.5 * ctxt.fromarray(erf(np.asarray(w2))) + + Se = exp(b) * S + + call = P * d1 - Se * d2 + put = call - P + Se + + return call, put + + +class bs_runner(object): + def __init__(self, ctxt): + self.ctxt = ctxt + + def __call__(self, nopt, price, strike, t, rate, vol): + dprice = self.ctxt.fromarray(price) + dstrike = self.ctxt.fromarray(strike) + dt = self.ctxt.fromarray(t) + ret = black_scholes(self.ctxt, nopt, dprice, dstrike, dt, rate, vol) + return ret[0].toarray(), ret[1].toarray() + + +if __name__ == '__main__': + bsr = bs_runner(da.Context()) + run(__file__, bsr, pass_args=False) diff --git a/bs_mpi.py b/bs_mpi.py index 7008f39..adf22f7 100644 --- a/bs_mpi.py +++ b/bs_mpi.py @@ -8,7 +8,6 @@ def __init__(self, bs_impl): def __call__(self, nopt, price, strike, t, rate, vol, call, put): comm = MPI.COMM_WORLD - rank = comm.Get_rank() noptpp = int(nopt/self.nump) myprice = np.empty(noptpp, dtype=np.float64) diff --git a/run.sh b/run.sh index 64fd2b3..c883245 100755 --- a/run.sh +++ b/run.sh @@ -7,7 +7,7 @@ for i in `ls bs_erf_*.py | egrep -v '_mpi|_ipyparallel'`; do done for i in `ls bs_erf_*.py | grep _mpi`; do echo -e "\n$i:" - mpirun -n 16 ${PYTHON:-python} $i $* | tee -a logs/$i.log; + mpirun -genv I_MPI_SHM_LMT=shm -n 16 ${PYTHON:-python} $i $* | tee -a logs/$i.log; done ipcluster start -n 16 --daemonize=True sync; sleep 20; sync diff --git a/set_python_envs.sh b/set_python_envs.sh index ad8d30c..ea65c86 100755 --- a/set_python_envs.sh +++ b/set_python_envs.sh @@ -7,4 +7,4 @@ bash ./Miniconda3-latest-Linux-x86_64.sh -b -p $DIR -f #export ACCEPT_INTEL_PYTHON_EULA=yes CONDA=$DIR/bin/conda [ -x $CONDA ] || exit 1 -$CONDA create -y -n intel3 -c intel python=3 numpy numexpr scipy tbb dask numba cython +$CONDA create -y -n intel3 -c intel python=3 intelpython3_core numpy numexpr scipy tbb dask numba cython mpi4py ipyparallel distarray From 449d90c579ff07627c9efc5bbb1cd5ef846610da Mon Sep 17 00:00:00 2001 From: Frank Schlimbach Date: Wed, 15 Mar 2017 06:37:16 -0500 Subject: [PATCH 6/7] using @syntax --- bs_erf_numba_vec_par.py | 57 +++++++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/bs_erf_numba_vec_par.py b/bs_erf_numba_vec_par.py index 202c4b9..c3e8d0d 100644 --- a/bs_erf_numba_vec_par.py +++ b/bs_erf_numba_vec_par.py @@ -2,35 +2,36 @@ import numba as nb from math import log, sqrt, exp, erf -def black_scholes_numba_opt(price, strike, t, mr, sig_sig_two, vol, call, put): - P = float( price [0] ) - S = strike [0] - T = t [0] - - a = log(P / S) - b = T * mr[0] - - z = T * sig_sig_two[0] - c = 0.25 * z - y = 1./sqrt(z) - - w1 = (a - b + c) * y - w2 = (a - b - c) * y - - d1 = 0.5 + 0.5 * erf(w1) - d2 = 0.5 + 0.5 * erf(w2) - - Se = exp(b) * S - - call [0] = P * d1 - Se * d2 - put [0] = call [0] - P + Se - -black_scholes_numba_opt_vec = nb.guvectorize('(f8[:],f8[:],f8[:],f8[:],f8[:],f8[:],f8[:],f8[:])','(),(),(),(),(),(),(),()', nopython=True, target="parallel")(black_scholes_numba_opt) +@nb.guvectorize('(f8[:],f8[:],f8[:],f8[:],f8[:],f8[:],f8[:])', '(),(),(),(),(),(),()', nopython=True, target="parallel") +def black_scholes_numba_opt(price, strike, t, mr, sig_sig_two, call, put): + P = float(price[0]) + S = strike[0] + T = t[0] + + a = log(P / S) + b = T * mr[0] + + z = T * sig_sig_two[0] + c = 0.25 * z + y = 1./sqrt(z) + + w1 = (a - b + c) * y + w2 = (a - b - c) * y + + d1 = 0.5 + 0.5 * erf(w1) + d2 = 0.5 + 0.5 * erf(w2) + + Se = exp(b) * S + + cal[0] = P * d1 - Se * d2 + put[0] = call[0] - P + Se + @nb.jit def black_scholes(nopt, price, strike, t, rate, vol, call, put): - sig_sig_two = vol*vol*2 - mr = -rate - black_scholes_numba_opt_vec(price, strike, t, mr, sig_sig_two, vol, call, put) + sig_sig_two = vol*vol*2 + mr = -rate + black_scholes_numba_opt(price, strike, t, mr, sig_sig_two, call, put) -base_bs_erf.run("Numba@vec-par", black_scholes, pass_args=True) +if __name__ == '__main__': + base_bs_erf.run(__file__, black_scholes, pass_args=True) From 287d9c258eb2a91d628e42030800365bf0c58b79 Mon Sep 17 00:00:00 2001 From: Frank Schlimbach Date: Wed, 15 Mar 2017 12:05:12 -0500 Subject: [PATCH 7/7] exe slow benches last --- run.sh | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/run.sh b/run.sh index c883245..c9c2a09 100755 --- a/run.sh +++ b/run.sh @@ -1,7 +1,9 @@ #!/bin/bash +export MKL_NUM_THREADS=1 + mkdir -p logs -for i in `ls bs_erf_*.py | egrep -v '_mpi|_ipyparallel'`; do +for i in `ls bs_erf_*.py | egrep -v '_mpi|_ipyparallel|pool'`; do echo -e "\n$i:" ${PYTHON:-python} $i $* | tee -a logs/$i.log done @@ -16,3 +18,7 @@ for i in `ls bs_erf_*.py | grep _ipyparallel`; do ${PYTHON:-python} $i $* | tee -a logs/$i.log done ipcluster stop +for i in `ls bs_erf_*pool*.py`; do + echo -e "\n$i:" + ${PYTHON:-python} $i $* | tee -a logs/$i.log +done