diff --git a/base_bs_erf.py b/base_bs_erf.py index 74a1bc2..1d09384 100644 --- a/base_bs_erf.py +++ b/base_bs_erf.py @@ -70,73 +70,75 @@ def gen_data(nopt): rnd.uniform(TL, TH, nopt), ) -############################################## - -def run(name, alg, sizes=15, step=2, nopt=1024, nparr=True, dask=False, pass_args=False): - import argparse - parser = argparse.ArgumentParser() - parser.add_argument('--steps', required=False, default=sizes, help="Number of steps") - parser.add_argument('--step', required=False, default=step, help="Factor for each step") - parser.add_argument('--chunk', required=False, default=2000000,help="Chunk size for Dask") - parser.add_argument('--size', required=False, default=nopt, help="Initial data size") - parser.add_argument('--repeat',required=False, default=100, help="Iterations inside measured region") - parser.add_argument('--dask', required=False, default="sq", help="Dask scheduler: sq, mt, mp") - parser.add_argument('--text', required=False, default="", help="Print with each result") - - args = parser.parse_args() - sizes= int(args.steps) - step = int(args.step) - nopt = int(args.size) - chunk= int(args.chunk) - repeat=int(args.repeat) - kwargs={} - - if(dask): - import dask - import dask.multiprocessing - import dask.array as da - dask_modes = { - "sq": dask.async.get_sync, - "mt": dask.threaded.get, - "mp": dask.multiprocessing.get - } - kwargs = {"schd": dask_modes[args.dask]} - name += "-"+args.dask - - for i in xrange(sizes): - price, strike, t = gen_data(nopt) - if not nparr: - call = [0.0 for i in range(nopt)] - put = [-1.0 for i in range(nopt)] - price=list(price) - strike=list(strike) - t=list(t) - repeat=1 # !!!!! ignore repeat count - if dask: - assert(not pass_args) - price = da.from_array(price, chunks=(chunk,), name=False) - strike = da.from_array(strike, chunks=(chunk,), name=False) - t = da.from_array(t, chunks=(chunk,), name=False) - if pass_args: - call = np.zeros(nopt, dtype=np.float64) - put = -np.ones(nopt, dtype=np.float64) - iterations = xrange(repeat) - print("ERF: {}: Size: {}".format(name, nopt)), - sys.stdout.flush() - - if pass_args: - alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, call, put) #warmup - t0 = now() - for _ in iterations: - alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, call, put, **kwargs) - else: - alg(nopt, price, strike, t, RISK_FREE, VOLATILITY) #warmup - t0 = now() - for _ in iterations: - alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, **kwargs) - mops = get_mops(t0, nopt) - print("MOPS: {}".format(mops*2*repeat), args.text) - nopt *= step - repeat -= step - if repeat < 1: - repeat = 1 +############################################## + +def run(name, alg, sizes=15, step=2, nopt=1024, nparr=True, dask=False, pass_args=False, verbose=True): + import argparse + parser = argparse.ArgumentParser() + parser.add_argument('--steps', required=False, default=sizes, help="Number of steps") + parser.add_argument('--step', required=False, default=step, help="Factor for each step") + parser.add_argument('--chunk', required=False, default=2000000,help="Chunk size for Dask") + parser.add_argument('--size', required=False, default=nopt, help="Initial data size") + parser.add_argument('--repeat',required=False, default=100, help="Iterations inside measured region") + parser.add_argument('--dask', required=False, default="sq", help="Dask scheduler: sq, mt, mp") + parser.add_argument('--text', required=False, default="", help="Print with each result") + + args = parser.parse_args() + sizes= int(args.steps) + step = int(args.step) + nopt = int(args.size) + chunk= int(args.chunk) + repeat=int(args.repeat) + kwargs={} + + if(dask): + import dask + import dask.multiprocessing + import dask.array as da + dask_modes = { + "sq": dask.async.get_sync, + "mt": dask.threaded.get, + "mp": dask.multiprocessing.get + } + kwargs = {"schd": dask_modes[args.dask]} + name += "-"+args.dask + + for i in xrange(sizes): + price, strike, t = gen_data(nopt) + if not nparr: + call = [0.0 for i in range(nopt)] + put = [-1.0 for i in range(nopt)] + price=list(price) + strike=list(strike) + t=list(t) + repeat=1 # !!!!! ignore repeat count + if dask: + assert(not pass_args) + price = da.from_array(price, chunks=(chunk,), name=False) + strike = da.from_array(strike, chunks=(chunk,), name=False) + t = da.from_array(t, chunks=(chunk,), name=False) + if pass_args: + call = np.zeros(nopt, dtype=np.float64) + put = -np.ones(nopt, dtype=np.float64) + iterations = xrange(repeat) + if verbose: + print("ERF: {}: Size: {}".format(name, nopt)), + sys.stdout.flush() + + if pass_args: + alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, call, put) #warmup + t0 = now() + for _ in iterations: + alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, call, put, **kwargs) + else: + alg(nopt, price, strike, t, RISK_FREE, VOLATILITY) #warmup + t0 = now() + for _ in iterations: + alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, **kwargs) + mops = get_mops(t0, nopt) + if verbose: + print("MOPS: {}".format(mops*2*repeat), args.text) + nopt *= step + repeat -= step + if repeat < 1: + repeat = 1 diff --git a/bs_apply.py b/bs_apply.py new file mode 100644 index 0000000..72a3367 --- /dev/null +++ b/bs_apply.py @@ -0,0 +1,16 @@ +import numpy as np + +class bs_runner(object): + def __init__(self, bs_impl, pool, nump): + self.bs_impl = bs_impl + self.pool = pool + self.nump = nump + + def __call__(self, nopt, price, strike, t, rate, vol): + noptpp = int(nopt/self.nump) + call = np.empty(nopt, dtype=np.float64) + put = np.empty(nopt, dtype=np.float64) + asyncs = [self.pool.apply_async(self.bs_impl, (noptpp, price[i:i+noptpp], strike[i:i+noptpp], t[i:i+noptpp], rate, vol)) for i in range(0, nopt, noptpp)] + for a,i in zip(asyncs, range(len(asyncs))): + call[i:i+noptpp], put[i:i+noptpp] = a.get() + return call, put diff --git a/bs_apply_ipyparallel.py b/bs_apply_ipyparallel.py new file mode 100644 index 0000000..b552efe --- /dev/null +++ b/bs_apply_ipyparallel.py @@ -0,0 +1,25 @@ +import ipyparallel +from ipyparallel import Client + + +class bs(object): + def __init__(self, bs_impl): + self.bs_impl = bs_impl + + def __call__(self, nopt, rate, vol): + return self.bs_impl(nopt, price, strike, t, rate, vol) + + +class bs_runner(object): + def __init__(self, bs_impl): + self.bs_impl = bs_impl + self.client = Client() + self.dview = self.client[:] + + def __call__(self, nopt, price, strike, t, rate, vol): + self.dview.scatter('price', price) + self.dview.scatter('strike', strike) + self.dview.scatter('t', t) + self.dview.push(dict(call=0,put=-1)) + r = self.dview.apply(bs(self.bs_impl), t, rate, vol) + return self.dview.gather('call').get(), self.dview.gather('put').get() diff --git a/bs_erf_distarray.py b/bs_erf_distarray.py new file mode 100644 index 0000000..126ecd1 --- /dev/null +++ b/bs_erf_distarray.py @@ -0,0 +1,51 @@ +import distarray.globalapi as da +from distarray.globalapi import log, sqrt, exp +import numpy as np +invsqrt = lambda x: 1.0/sqrt(x) +from base_bs_erf import run, erf + + +def black_scholes(ctxt, nopt, price, strike, t, rate, vol): + mr = -rate + sig_sig_two = vol * vol * 2 + + P = price + S = strike + T = t + + a = log(P / S) + b = T * mr + + z = T * sig_sig_two + c = 0.25 * z + y = invsqrt(z) + + w1 = (a - b + c) * y + w2 = (a - b - c) * y + + d1 = 0.5 + 0.5 * ctxt.fromarray(erf(np.asarray(w1))) + d2 = 0.5 + 0.5 * ctxt.fromarray(erf(np.asarray(w2))) + + Se = exp(b) * S + + call = P * d1 - Se * d2 + put = call - P + Se + + return call, put + + +class bs_runner(object): + def __init__(self, ctxt): + self.ctxt = ctxt + + def __call__(self, nopt, price, strike, t, rate, vol): + dprice = self.ctxt.fromarray(price) + dstrike = self.ctxt.fromarray(strike) + dt = self.ctxt.fromarray(t) + ret = black_scholes(self.ctxt, nopt, dprice, dstrike, dt, rate, vol) + return ret[0].toarray(), ret[1].toarray() + + +if __name__ == '__main__': + bsr = bs_runner(da.Context()) + run(__file__, bsr, pass_args=False) diff --git a/bs_erf_naive.py b/bs_erf_naive.py deleted file mode 100644 index db576c9..0000000 --- a/bs_erf_naive.py +++ /dev/null @@ -1,33 +0,0 @@ -import base_bs_erf -from math import log, sqrt, exp, erf -import numpy as np -invsqrt = lambda x: 1.0/sqrt(x) - -def black_scholes ( nopt, price, strike, t, rate, vol, call, put ): - mr = -rate - sig_sig_two = vol * vol * 2 - - for i in range(nopt): - P = float( price [i] ) - S = strike [i] - T = t [i] - - a = log(P / S) - b = T * mr - - z = T * sig_sig_two - c = 0.25 * z - y = invsqrt(z) - - w1 = (a - b + c) * y - w2 = (a - b - c) * y - - d1 = 0.5 + 0.5 * erf(w1) - d2 = 0.5 + 0.5 * erf(w2) - - Se = exp(b) * S - - call [i] = P * d1 - Se * d2 - put [i] = call [i] - P + Se - -base_bs_erf.run("Naive-loop", black_scholes, 4, 8, nparr=False, pass_args=True) diff --git a/bs_erf_naive_apply_ipyparallel.py b/bs_erf_naive_apply_ipyparallel.py new file mode 100644 index 0000000..25c5d2d --- /dev/null +++ b/bs_erf_naive_apply_ipyparallel.py @@ -0,0 +1,7 @@ +from bs_apply_ipyparallel import bs_runner +from bs_naive import black_scholes +import base_bs_erf + +if __name__ == '__main__': + bsr = bs_runner(black_scholes) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_naive_apply_pool.py b/bs_erf_naive_apply_pool.py new file mode 100644 index 0000000..41b642a --- /dev/null +++ b/bs_erf_naive_apply_pool.py @@ -0,0 +1,10 @@ +from multiprocessing import Pool +from multiprocessing import cpu_count +from bs_apply import bs_runner +from bs_naive import black_scholes +import base_bs_erf + +if __name__ == '__main__': + n = int(cpu_count()/2) + bsr = bs_runner(black_scholes, Pool(n), n) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_naive_apply_threadpool.py b/bs_erf_naive_apply_threadpool.py new file mode 100644 index 0000000..bdacf57 --- /dev/null +++ b/bs_erf_naive_apply_threadpool.py @@ -0,0 +1,9 @@ +from multiprocessing.pool import ThreadPool +from multiprocessing import cpu_count +from bs_apply import bs_runner +from bs_naive import black_scholes +import base_bs_erf + +if __name__ == '__main__': + bsr = bs_runner(black_scholes, ThreadPool(cpu_count()), cpu_count()) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_naive_map_pool.py b/bs_erf_naive_map_pool.py new file mode 100644 index 0000000..af2bb23 --- /dev/null +++ b/bs_erf_naive_map_pool.py @@ -0,0 +1,10 @@ +from multiprocessing import Pool +from multiprocessing import cpu_count +from bs_map import bs_runner +from bs_naive import black_scholes_map +import base_bs_erf + +if __name__ == '__main__': + n = int(cpu_count()/2) + bsr = bs_runner(black_scholes_map, Pool(n)) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_naive_map_processpoolexecutor.py b/bs_erf_naive_map_processpoolexecutor.py new file mode 100644 index 0000000..eab190d --- /dev/null +++ b/bs_erf_naive_map_processpoolexecutor.py @@ -0,0 +1,11 @@ +from concurrent.futures import ProcessPoolExecutor +from multiprocessing import cpu_count +from bs_executor import bs_runner +from bs_naive import black_scholes_map +import base_bs_erf + +if __name__ == '__main__': + n = int(cpu_count()/2) + with ProcessPoolExecutor(n) as executor: + bsr = bs_runner(black_scholes_map, executor, n) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_naive_map_threadpool.py b/bs_erf_naive_map_threadpool.py new file mode 100644 index 0000000..b8aba7e --- /dev/null +++ b/bs_erf_naive_map_threadpool.py @@ -0,0 +1,9 @@ +from multiprocessing.pool import ThreadPool +from multiprocessing import cpu_count +from bs_map import bs_runner +from bs_naive import black_scholes_map +import base_bs_erf + +if __name__ == '__main__': + bsr = bs_runner(black_scholes_map, ThreadPool(cpu_count())) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_naive_map_threadpoolexecutor.py b/bs_erf_naive_map_threadpoolexecutor.py new file mode 100644 index 0000000..72d5a3c --- /dev/null +++ b/bs_erf_naive_map_threadpoolexecutor.py @@ -0,0 +1,11 @@ +from concurrent.futures import ThreadPoolExecutor +from multiprocessing import cpu_count +from bs_executor import bs_runner +from bs_naive import black_scholes_map +import base_bs_erf + +if __name__ == '__main__': + n = cpu_count() + with ThreadPoolExecutor(n) as executor: + bsr = bs_runner(black_scholes_map, executor, n) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_naive_mpi.py b/bs_erf_naive_mpi.py new file mode 100644 index 0000000..a619225 --- /dev/null +++ b/bs_erf_naive_mpi.py @@ -0,0 +1,8 @@ +from mpi4py import MPI +from bs_mpi import bs_runner +from bs_naive import black_scholes +import base_bs_erf + +if __name__ == '__main__': + bsr = bs_runner(black_scholes) + base_bs_erf.run(__file__, bsr, pass_args=True, verbose=MPI.COMM_WORLD.Get_rank()==0) diff --git a/bs_erf_naive_sequential.py b/bs_erf_naive_sequential.py new file mode 100644 index 0000000..1290bef --- /dev/null +++ b/bs_erf_naive_sequential.py @@ -0,0 +1,5 @@ +import base_bs_erf +from bs_naive import black_scholes_args + +if __name__ == '__main__': + base_bs_erf.run(__file__, black_scholes_args, nparr=False, pass_args=True) diff --git a/bs_erf_naive_threading.py b/bs_erf_naive_threading.py new file mode 100644 index 0000000..3271b23 --- /dev/null +++ b/bs_erf_naive_threading.py @@ -0,0 +1,8 @@ +from multiprocessing import cpu_count +from bs_threading import bs_runner +from bs_naive import black_scholes_args +import base_bs_erf + +if __name__ == '__main__': + bsr = bs_runner(black_scholes_args, cpu_count()) + base_bs_erf.run(__file__, bsr, pass_args=True) diff --git a/bs_erf_numba_vec_par.py b/bs_erf_numba_vec_par.py index 202c4b9..c3e8d0d 100644 --- a/bs_erf_numba_vec_par.py +++ b/bs_erf_numba_vec_par.py @@ -2,35 +2,36 @@ import numba as nb from math import log, sqrt, exp, erf -def black_scholes_numba_opt(price, strike, t, mr, sig_sig_two, vol, call, put): - P = float( price [0] ) - S = strike [0] - T = t [0] - - a = log(P / S) - b = T * mr[0] - - z = T * sig_sig_two[0] - c = 0.25 * z - y = 1./sqrt(z) - - w1 = (a - b + c) * y - w2 = (a - b - c) * y - - d1 = 0.5 + 0.5 * erf(w1) - d2 = 0.5 + 0.5 * erf(w2) - - Se = exp(b) * S - - call [0] = P * d1 - Se * d2 - put [0] = call [0] - P + Se - -black_scholes_numba_opt_vec = nb.guvectorize('(f8[:],f8[:],f8[:],f8[:],f8[:],f8[:],f8[:],f8[:])','(),(),(),(),(),(),(),()', nopython=True, target="parallel")(black_scholes_numba_opt) +@nb.guvectorize('(f8[:],f8[:],f8[:],f8[:],f8[:],f8[:],f8[:])', '(),(),(),(),(),(),()', nopython=True, target="parallel") +def black_scholes_numba_opt(price, strike, t, mr, sig_sig_two, call, put): + P = float(price[0]) + S = strike[0] + T = t[0] + + a = log(P / S) + b = T * mr[0] + + z = T * sig_sig_two[0] + c = 0.25 * z + y = 1./sqrt(z) + + w1 = (a - b + c) * y + w2 = (a - b - c) * y + + d1 = 0.5 + 0.5 * erf(w1) + d2 = 0.5 + 0.5 * erf(w2) + + Se = exp(b) * S + + cal[0] = P * d1 - Se * d2 + put[0] = call[0] - P + Se + @nb.jit def black_scholes(nopt, price, strike, t, rate, vol, call, put): - sig_sig_two = vol*vol*2 - mr = -rate - black_scholes_numba_opt_vec(price, strike, t, mr, sig_sig_two, vol, call, put) + sig_sig_two = vol*vol*2 + mr = -rate + black_scholes_numba_opt(price, strike, t, mr, sig_sig_two, call, put) -base_bs_erf.run("Numba@vec-par", black_scholes, pass_args=True) +if __name__ == '__main__': + base_bs_erf.run(__file__, black_scholes, pass_args=True) diff --git a/bs_erf_numpy.py b/bs_erf_numpy.py deleted file mode 100644 index 92a19fe..0000000 --- a/bs_erf_numpy.py +++ /dev/null @@ -1,34 +0,0 @@ -import base_bs_erf -import numpy as np -from numpy import log, exp -from base_bs_erf import erf, invsqrt - -def black_scholes ( nopt, price, strike, t, rate, vol ): - mr = -rate - sig_sig_two = vol * vol * 2 - - P = price - S = strike - T = t - - a = log(P / S) - b = T * mr - - z = T * sig_sig_two - c = 0.25 * z - y = invsqrt(z) - - w1 = (a - b + c) * y - w2 = (a - b - c) * y - - d1 = 0.5 + 0.5 * erf(w1) - d2 = 0.5 + 0.5 * erf(w2) - - Se = exp(b) * S - - call = P * d1 - Se * d2 - put = call - P + Se - - return (call, put) - -base_bs_erf.run("Numpy", black_scholes) diff --git a/bs_erf_numpy_apply_ipyparallel.py b/bs_erf_numpy_apply_ipyparallel.py new file mode 100644 index 0000000..50393de --- /dev/null +++ b/bs_erf_numpy_apply_ipyparallel.py @@ -0,0 +1,7 @@ +from bs_apply_ipyparallel import bs_runner +from bs_numpy import black_scholes +import base_bs_erf + +if __name__ == '__main__': + bsr = bs_runner(black_scholes) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_numpy_apply_pool.py b/bs_erf_numpy_apply_pool.py new file mode 100644 index 0000000..0fd220c --- /dev/null +++ b/bs_erf_numpy_apply_pool.py @@ -0,0 +1,10 @@ +from multiprocessing import Pool +from multiprocessing import cpu_count +from bs_apply import bs_runner +from bs_numpy import black_scholes +import base_bs_erf + +if __name__ == '__main__': + n = int(cpu_count()/2) + bsr = bs_runner(black_scholes, Pool(n), n) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_numpy_apply_threadpool.py b/bs_erf_numpy_apply_threadpool.py new file mode 100644 index 0000000..3eea4b8 --- /dev/null +++ b/bs_erf_numpy_apply_threadpool.py @@ -0,0 +1,9 @@ +from multiprocessing.pool import ThreadPool +from multiprocessing import cpu_count +from bs_apply import bs_runner +from bs_numpy import black_scholes +import base_bs_erf + +if __name__ == '__main__': + bsr = bs_runner(black_scholes, ThreadPool(cpu_count()), cpu_count()) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_numpy_map_pool.py b/bs_erf_numpy_map_pool.py new file mode 100644 index 0000000..b250bbc --- /dev/null +++ b/bs_erf_numpy_map_pool.py @@ -0,0 +1,10 @@ +from multiprocessing import Pool +from multiprocessing import cpu_count +from bs_map import bs_runner +from bs_numpy import black_scholes +import base_bs_erf + +if __name__ == '__main__': + n = int(cpu_count()/2) + bsr = bs_runner(black_scholes, Pool(n)) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_numpy_map_processpoolexecutor.py b/bs_erf_numpy_map_processpoolexecutor.py new file mode 100644 index 0000000..a4b56bd --- /dev/null +++ b/bs_erf_numpy_map_processpoolexecutor.py @@ -0,0 +1,11 @@ +from concurrent.futures import ProcessPoolExecutor +from multiprocessing import cpu_count +from bs_executor import bs_runner +from bs_numpy import black_scholes +import base_bs_erf + +if __name__ == '__main__': + n = int(cpu_count()/2) + with ProcessPoolExecutor(n) as executor: + bsr = bs_runner(black_scholes, executor, n) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_numpy_map_threadpool.py b/bs_erf_numpy_map_threadpool.py new file mode 100644 index 0000000..a1cfab6 --- /dev/null +++ b/bs_erf_numpy_map_threadpool.py @@ -0,0 +1,9 @@ +from multiprocessing.pool import ThreadPool +from multiprocessing import cpu_count +from bs_map import bs_runner +from bs_numpy import black_scholes +import base_bs_erf + +if __name__ == '__main__': + bsr = bs_runner(black_scholes, ThreadPool(cpu_count())) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_numpy_map_threadpoolexecutor.py b/bs_erf_numpy_map_threadpoolexecutor.py new file mode 100644 index 0000000..a84f929 --- /dev/null +++ b/bs_erf_numpy_map_threadpoolexecutor.py @@ -0,0 +1,11 @@ +from concurrent.futures import ThreadPoolExecutor +from multiprocessing import cpu_count +from bs_executor import bs_runner +from bs_numpy import black_scholes +import base_bs_erf + +if __name__ == '__main__': + n = cpu_count() + with ThreadPoolExecutor(n) as executor: + bsr = bs_runner(black_scholes, executor, n) + base_bs_erf.run(__file__, bsr, pass_args=False) diff --git a/bs_erf_numpy_mpi.py b/bs_erf_numpy_mpi.py new file mode 100644 index 0000000..9df7fca --- /dev/null +++ b/bs_erf_numpy_mpi.py @@ -0,0 +1,8 @@ +from mpi4py import MPI +from bs_mpi import bs_runner +from bs_numpy import black_scholes +import base_bs_erf + +if __name__ == '__main__': + bsr = bs_runner(black_scholes) + base_bs_erf.run(__file__, bsr, pass_args=True, verbose=MPI.COMM_WORLD.Get_rank()==0) diff --git a/bs_erf_numpy_sequential.py b/bs_erf_numpy_sequential.py new file mode 100644 index 0000000..668c47c --- /dev/null +++ b/bs_erf_numpy_sequential.py @@ -0,0 +1,5 @@ +import base_bs_erf +from bs_numpy import black_scholes + +if __name__ == '__main__': + base_bs_erf.run(__file__, black_scholes) diff --git a/bs_erf_numpy_threading.py b/bs_erf_numpy_threading.py new file mode 100644 index 0000000..2fed7c4 --- /dev/null +++ b/bs_erf_numpy_threading.py @@ -0,0 +1,8 @@ +from multiprocessing import cpu_count +from bs_threading import bs_runner +from bs_numpy import black_scholes_args +import base_bs_erf + +if __name__ == '__main__': + bsr = bs_runner(black_scholes_args, cpu_count()) + base_bs_erf.run(__file__, bsr, pass_args=True) diff --git a/bs_executor.py b/bs_executor.py new file mode 100644 index 0000000..4302335 --- /dev/null +++ b/bs_executor.py @@ -0,0 +1,21 @@ +# we can put bs and bs_runner into one as soon as our driver +# does not expect a callable but calls a member function instead +class bs(object): + def __init__(self, bs_impl, nopt, rate, vol): + self.nopt = nopt + self.rate = rate + self.vol = vol + self.bs_impl = bs_impl + + def __call__(self, *iterables): + return self.bs_impl(self.nopt, *iterables, self.rate, self.vol) + + +class bs_runner(object): + def __init__(self, bs_impl, executor, nump): + self.bs_impl = bs_impl + self.executor = executor + self.nump = nump + + def __call__(self, nopt, price, strike, t, rate, vol): + return self.executor.map(bs(self.bs_impl, nopt, rate, vol), price, strike, t, chunksize=int(nopt/self.nump)) diff --git a/bs_map.py b/bs_map.py new file mode 100644 index 0000000..507cc04 --- /dev/null +++ b/bs_map.py @@ -0,0 +1,21 @@ +# we can put bs and bs_runner into one as soon as our driver +# does not expect a callable but calls a member function instead +class bs(object): + def __init__(self, bs_impl, nopt, rate, vol): + self.nopt = nopt + self.rate = rate + self.vol = vol + self.bs_impl = bs_impl + + def __call__(self, zipped): + return self.bs_impl(self.nopt, *zipped, self.rate, self.vol) + + +class bs_runner(object): + def __init__(self, bs_impl, pool): + self.bs_impl = bs_impl + self.pool = pool + + def __call__(self, nopt, price, strike, t, rate, vol): + z = list(zip(price, strike, t)) + return self.pool.map(bs(self.bs_impl, nopt, rate, vol), z) diff --git a/bs_mpi.py b/bs_mpi.py new file mode 100644 index 0000000..adf22f7 --- /dev/null +++ b/bs_mpi.py @@ -0,0 +1,27 @@ +import numpy as np +from mpi4py import MPI + +class bs_runner(object): + def __init__(self, bs_impl): + self.bs_impl = bs_impl + self.nump = MPI.COMM_WORLD.size + + def __call__(self, nopt, price, strike, t, rate, vol, call, put): + comm = MPI.COMM_WORLD + noptpp = int(nopt/self.nump) + + myprice = np.empty(noptpp, dtype=np.float64) + mystrike = np.empty(noptpp, dtype=np.float64) + myt = np.empty(noptpp, dtype=np.float64) + + # Scatter data into arrays + comm.Scatter(price, myprice, root=0) + comm.Scatter(strike, mystrike, root=0) + comm.Scatter(t, myt, root=0) + + mycall, myput = self.bs_impl(noptpp, myprice, mystrike, myt, rate, vol) + + comm.Gather(mycall, call) + comm.Gather(myput, put) + + return call, put diff --git a/bs_naive.py b/bs_naive.py new file mode 100644 index 0000000..ee563f1 --- /dev/null +++ b/bs_naive.py @@ -0,0 +1,64 @@ +from math import log, sqrt, exp, erf +import numpy as np +invsqrt = lambda x: 1.0/sqrt(x) + + +def black_scholes_args(nopt, price, strike, t, rate, vol, call, put): + mr = -rate + sig_sig_two = vol * vol * 2 + for i in range(nopt): + P = float(price[i]) + S = strike[i] + T = t[i] + + a = log(P / S) + b = T * mr + + z = T * sig_sig_two + c = 0.25 * z + y = invsqrt(z) + + w1 = (a - b + c) * y + w2 = (a - b - c) * y + + d1 = 0.5 + 0.5 * erf(w1) + d2 = 0.5 + 0.5 * erf(w2) + + Se = exp(b) * S + + call[i] = P * d1 - Se * d2 + put[i] = call[i] - P + Se + return call, put + + +def black_scholes(nopt, price, strike, t, rate, vol): + call = np.zeros(nopt, dtype=np.float64) + put = -np.ones(nopt, dtype=np.float64) + return black_scholes_args(nopt, price, strike, t, rate, vol, call, put) + + +def black_scholes_map(nopt, price, strike, t, rate, vol): + mr = -rate + sig_sig_two = vol * vol * 2 + P = float(price) + S = strike + T = t + + a = log(P / S) + b = T * mr + + z = T * sig_sig_two + c = 0.25 * z + y = invsqrt(z) + + w1 = (a - b + c) * y + w2 = (a - b - c) * y + + d1 = 0.5 + 0.5 * erf(w1) + d2 = 0.5 + 0.5 * erf(w2) + + Se = exp(b) * S + + call = P * d1 - Se * d2 + put = call - P + Se + return call, put diff --git a/bs_numpy.py b/bs_numpy.py new file mode 100644 index 0000000..1dcdc60 --- /dev/null +++ b/bs_numpy.py @@ -0,0 +1,35 @@ +import numpy as np +from numpy import log, exp +from base_bs_erf import erf, invsqrt + +def black_scholes(nopt, price, strike, t, rate, vol): + mr = -rate + sig_sig_two = vol * vol * 2 + + P = price + S = strike + T = t + + a = log(P / S) + b = T * mr + + z = T * sig_sig_two + c = 0.25 * z + y = invsqrt(z) + + w1 = (a - b + c) * y + w2 = (a - b - c) * y + + d1 = 0.5 + 0.5 * erf(w1) + d2 = 0.5 + 0.5 * erf(w2) + + Se = exp(b) * S + + call = P * d1 - Se * d2 + put = call - P + Se + + return call, put + + +def black_scholes_args(nopt, price, strike, t, rate, vol, call, put): + call[:], put[:] = black_scholes(nopt, price, strike, t, rate, vol) diff --git a/bs_threading.py b/bs_threading.py new file mode 100644 index 0000000..2aaed55 --- /dev/null +++ b/bs_threading.py @@ -0,0 +1,18 @@ +import threading + + +class bs_runner(object): + def __init__(self, bs_impl, nump): + self.bs_impl = bs_impl + self.nump = nump + + def __call__(self, nopt, price, strike, t, rate, vol, call, put): + noptpp = int(nopt/self.nump) + threads = [] + for i in range(0, nopt, noptpp): + thr = threading.Thread(target=self.bs_impl, args=(noptpp, price[i:i+noptpp], strike[i:i+noptpp], t[i:i+noptpp], rate, vol, call[i:i+noptpp], put[i:i+noptpp])) + thr.start() + threads.append(thr) + for thr in threads: + thr.join() + return call, put diff --git a/run.sh b/run.sh index fb33568..c9c2a09 100755 --- a/run.sh +++ b/run.sh @@ -1,4 +1,24 @@ #!/bin/bash +export MKL_NUM_THREADS=1 + mkdir -p logs -for i in bs_erf_*.py; do echo -e "\n$i:"; ${PYTHON:-python} $i $* | tee -a logs/$i.log; done +for i in `ls bs_erf_*.py | egrep -v '_mpi|_ipyparallel|pool'`; do + echo -e "\n$i:" + ${PYTHON:-python} $i $* | tee -a logs/$i.log +done +for i in `ls bs_erf_*.py | grep _mpi`; do + echo -e "\n$i:" + mpirun -genv I_MPI_SHM_LMT=shm -n 16 ${PYTHON:-python} $i $* | tee -a logs/$i.log; +done +ipcluster start -n 16 --daemonize=True +sync; sleep 20; sync +for i in `ls bs_erf_*.py | grep _ipyparallel`; do + echo -e "\n$i:" + ${PYTHON:-python} $i $* | tee -a logs/$i.log +done +ipcluster stop +for i in `ls bs_erf_*pool*.py`; do + echo -e "\n$i:" + ${PYTHON:-python} $i $* | tee -a logs/$i.log +done diff --git a/set_python_envs.sh b/set_python_envs.sh index ad8d30c..ea65c86 100755 --- a/set_python_envs.sh +++ b/set_python_envs.sh @@ -7,4 +7,4 @@ bash ./Miniconda3-latest-Linux-x86_64.sh -b -p $DIR -f #export ACCEPT_INTEL_PYTHON_EULA=yes CONDA=$DIR/bin/conda [ -x $CONDA ] || exit 1 -$CONDA create -y -n intel3 -c intel python=3 numpy numexpr scipy tbb dask numba cython +$CONDA create -y -n intel3 -c intel python=3 intelpython3_core numpy numexpr scipy tbb dask numba cython mpi4py ipyparallel distarray