diff --git a/ldpop/lookup_table.py b/ldpop/lookup_table.py index 47f1324..10f2119 100644 --- a/ldpop/lookup_table.py +++ b/ldpop/lookup_table.py @@ -10,7 +10,7 @@ from .moran_finite import MoranStatesFinite from .compute_stationary import stationary -from multiprocessing import Pool +from concurrent.futures import ProcessPoolExecutor import logging import time import pandas @@ -75,40 +75,39 @@ def computeLikelihoods(n, exact, popSizes, theta, timeLens, rhoGrid, cores, # make the pool first to avoid copying large objects. # maxtasksperchild=1 to avoid memory issues - executor = Pool(cores, maxtasksperchild=1) - - # make the states and the rates - states = get_states(n, exact) - moranRates = MoranRates(states) - - # compute initial distributions and likelihoods - prevInit = states.getUnlinkedStationary(popSize=popSizes[-1], theta=theta) - inits = [] - - if load_stationary: - stationary_dists = numpy.load(load_stationary) - for stationary_dist in stationary_dists: - inits.append(stationary_dist) - else: - for rho in reversed(rhoGrid): - rates = moranRates.getRates(rho=rho, - popSize=popSizes[-1], - theta=theta) - prevInit = stationary(Q=rates, - init=prevInit, - norm_order=float('inf'), - epsilon=1e-2) - inits.append(prevInit) - ret = executor.map(getColumnHelper, - [(moranRates, rho, theta, popSizes, timeLens, prevInit, - stationaryNormOrder) - for rho, prevInit in zip(reversed(rhoGrid), inits)]) - logging.info('Cleaning up results...') - if store_stationary: - full_inits = numpy.array([result[1] for result in ret]) - numpy.save(store_stationary, full_inits) - ret = [states.ordered_log_likelihoods(result[0]) for result in ret] - executor.close() + with ProcessPoolExecutor(cores, max_tasks_per_child=1) as executor: + + # make the states and the rates + states = get_states(n, exact) + moranRates = MoranRates(states) + + # compute initial distributions and likelihoods + prevInit = states.getUnlinkedStationary(popSize=popSizes[-1], theta=theta) + inits = [] + + if load_stationary: + stationary_dists = numpy.load(load_stationary) + for stationary_dist in stationary_dists: + inits.append(stationary_dist) + else: + for rho in reversed(rhoGrid): + rates = moranRates.getRates(rho=rho, + popSize=popSizes[-1], + theta=theta) + prevInit = stationary(Q=rates, + init=prevInit, + norm_order=float('inf'), + epsilon=1e-2) + inits.append(prevInit) + ret = executor.map(getColumnHelper, + [(moranRates, rho, theta, popSizes, timeLens, prevInit, + stationaryNormOrder) + for rho, prevInit in zip(reversed(rhoGrid), inits)]) + logging.info('Cleaning up results...') + if store_stationary: + full_inits = numpy.array([result[1] for result in ret]) + numpy.save(store_stationary, full_inits) + ret = [states.ordered_log_likelihoods(result[0]) for result in ret] return ([(rho, lik) for rho, lik in zip(rhoGrid, reversed(ret))], states.ordered_indexes()) diff --git a/ldpop/proposal.py b/ldpop/proposal.py index cda3f3a..8dd4256 100644 --- a/ldpop/proposal.py +++ b/ldpop/proposal.py @@ -10,7 +10,7 @@ from .moran_finite import MoranStatesFinite from .moran_augmented import MoranRates -from multiprocessing import Pool +from concurrent.futures import ProcessPoolExecutor import math import pandas import logging @@ -72,15 +72,14 @@ def __init__(self, n, theta, rhos, pop_sizes, states = MoranStatesFinite(2*n) moranRates = MoranRates(states) - executor = Pool(processes) - likelihoodDictList = list(map( - states.ordered_log_likelihoods, - executor.map(ordered_wrapper, [(moranRates, rho, theta, - pop_sizes, epochLengths, - numTimePointsPerEpoch) - for rho in rhos]))) - executor.close() - executor.join() + with ProcessPoolExecutor(processes) as executor: + likelihoodDictList = list(map( + states.ordered_log_likelihoods, + executor.map(ordered_wrapper, [(moranRates, rho, theta, + pop_sizes, epochLengths, + numTimePointsPerEpoch) + for rho in rhos]))) + indexer = states.ordered_indexes() data = {} for rho, likelihoodDict in zip(rhos, likelihoodDictList):