diff --git a/pathpy/HigherOrderNetwork.py b/pathpy/HigherOrderNetwork.py index 2bebe63..cf9142f 100644 --- a/pathpy/HigherOrderNetwork.py +++ b/pathpy/HigherOrderNetwork.py @@ -34,6 +34,9 @@ import scipy.linalg as _la import scipy as _sp +import functools as fu +import multiprocessing as _mp + from pathpy.Log import Log from pathpy.Log import Severity @@ -53,7 +56,7 @@ class HigherOrderNetwork: """ - def __init__(self, paths, k=1, separator='-', nullModel=False, method='FirstOrderTransitions', lanczosVecs=15, maxiter=1000): + def __init__(self, paths, k=1, separator='-', nullModel=False, method='FirstOrderTransitions', lanczosVecs=15, maxiter=1000, n_cores = 1): """ Generates a k-th-order representation based on the given path statistics. @@ -78,6 +81,8 @@ def __init__(self, paths, k=1, separator='-', nullModel=False, method='FirstOrde a k-order edge is set to the transition probability T['v_k', 'v_k+1'] in the first order network. For method='KOrderPi' the entry pi['v1-...-v_k'] in the stationary distribution of the k-order network is used instead. + + @param n_cores: for some calculations Parallel Computing on multiple cores is used. This parameter determines the number of cores used """ assert nullModel == False or (nullModel and k>1) @@ -108,6 +113,9 @@ def __init__(self, paths, k=1, separator='-', nullModel=False, method='FirstOrde ## A dictionary containing the sets of predecessors of all nodes self.predecessors = _co.defaultdict( lambda: set() ) + ## number of cores used in calculating the transition matrix + self.cores = n_cores + if k>1: # For k>1 we need the first-order network to generate the null model # and calculate the degrees of freedom @@ -864,6 +872,41 @@ def getAdjacencyMatrix(self, includeSubPaths = True, weighted = True, transposed return _sparse.coo_matrix( (data, (row, col)), shape=(self.vcount(), self.vcount()) ).tocsr() + def partialTransitionMatrix(node_data, degree_data, includeSubPaths, chunk): + """ + takes a chunk of the self.edges dictionary and returns its part of the sparse matrix + row, data, col + """ + + nodes = node_data.copy() + D = degree_data + + row = [] + col = [] + data = [] + + for index, (s, t) in enumerate(chunk): + value = chunk[(s, t)] + if (value[1] > 0) or (includeSubPaths and value[0]>0): + row.append(nodes.index(t)) + col.append(nodes.index(s)) + + if includeSubPaths: + count = value.sum() + else: + count = value[1] + + assert D[nodes.index(s)]>0, 'Encountered zero out-degree for node ' + str(s) + ' while weight of link (' + str(s) + ', ' + str(t) + ') is non-zero.' + prob = count / D[nodes.index(s)] + if prob < 0 or prob > 1: + tn.Log.add('Encountered transition probability outside [0,1] range.', Severity.ERROR) + raise ValueError() + data.append( prob ) + + #print(index/len(chunk), end = '\r', flush = True) + + return [row, col, data] + def getTransitionMatrix(self, includeSubPaths=True): """ @@ -873,31 +916,49 @@ def getTransitionMatrix(self, includeSubPaths=True): @param includeSubpaths: whether or not to include subpath statistics in the transition probability calculation (default True) """ - row = [] - col = [] - data = [] D = self.degrees(includeSubPaths=includeSubPaths, weighted=True, mode='OUT') - for (s,t) in self.edges: - # either s->t has been observed as a longest path, or we are interested in subpaths as well - - # the following makes sure that we do not accidentially consider zero-weight edges (automatically added by default_dic) - if (self.edges[(s,t)][1] > 0) or (includeSubPaths and self.edges[(s,t)][0]>0): - row.append(self.nodes.index(t)) - col.append(self.nodes.index(s)) - if includeSubPaths: - count = self.edges[(s,t)].sum() + + edges_length= len(self.edges) + + n_chunks = self.cores + + ### Splits dictionary into roughly equal parts which will be passed on to a worker pool + def split_dict_equally(input_dict, chunks=n_chunks): + """Splits dict by keys. Returns a list of dictionaries.""" + """ + Credit to http://enginepewpew.blogspot.ch/2012/03/splitting-dictionary-into-equal-chunks.html + """ + # prep with empty dicts + return_list = [dict() for idx in range(chunks)] + idx = 0 + for k,v in input_dict.items(): + return_list[idx][k] = v + if idx < chunks-1: # indexes start at 0 + idx += 1 else: - count = self.edges[(s,t)][1] - #print((s,t)) - #print(D[self.nodes.index(s)]) - #print(count) - assert D[self.nodes.index(s)]>0, 'Encountered zero out-degree for node ' + str(s) + ' while weight of link (' + str(s) + ', ' + str(t) + ') is non-zero.' - prob = count / D[self.nodes.index(s)] - if prob < 0 or prob > 1: - tn.Log.add('Encountered transition probability outside [0,1] range.', Severity.ERROR) - raise ValueError() - data.append( prob ) - + idx = 0 + return return_list + + + # split the dictionary into smaler chunks to be passed to the processingpool + dicts = split_dict_equally(self.edges, n_chunks) + + # partially calculate some functions, so that it may be passed to the pool + partiallyCalculated = fu.partial(HigherOrderNetwork.partialTransitionMatrix, self.nodes, D, includeSubPaths) + + a = 0 + # initializing the processingPool + with _mp.Pool(n_chunks) as pool: + # calculating the elements for the sparse matrix + a = pool.map(partiallyCalculated, dicts) + pool.close() + + # assigning the correct data to the variables, due to the way the amag().get() returns values + row = _np.hstack([a[i][0] for i in range(n_chunks)]) + col = _np.hstack([a[i][1] for i in range(n_chunks)]) + data = _np.hstack([a[i][2] for i in range(n_chunks)]) + + data = _np.array(data) data = data.reshape(data.size,)