diff --git a/_nx_parallel/__init__.py b/_nx_parallel/__init__.py index 28ed1e60..277c0757 100644 --- a/_nx_parallel/__init__.py +++ b/_nx_parallel/__init__.py @@ -181,6 +181,13 @@ def get_info(): 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n_jobs` number of chunks." }, }, + "latapy_clustering": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/bipartite/cluster.py#L10", + "additional_docs": "In the parallel implementation we divide the nodes into chunks and compute the bipartite clustering coefficient for all `node_chunk` in parallel.", + "additional_parameters": { + 'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` (or `nodes`) into `n_jobs` number of chunks." + }, + }, "local_efficiency": { "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/efficiency_measures.py#L11", "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and then computing and adding global efficiencies of all node in all chunks, in parallel, and then adding all these sums and dividing by the total number of nodes at the end.", diff --git a/nx_parallel/algorithms/bipartite/__init__.py b/nx_parallel/algorithms/bipartite/__init__.py index d0676b97..ff61d256 100644 --- a/nx_parallel/algorithms/bipartite/__init__.py +++ b/nx_parallel/algorithms/bipartite/__init__.py @@ -1 +1,2 @@ +from .cluster import * from .redundancy import * diff --git a/nx_parallel/algorithms/bipartite/cluster.py b/nx_parallel/algorithms/bipartite/cluster.py new file mode 100644 index 00000000..9c36efb9 --- /dev/null +++ b/nx_parallel/algorithms/bipartite/cluster.py @@ -0,0 +1,64 @@ +from joblib import Parallel, delayed +from networkx.algorithms.bipartite.cluster import modes +import networkx as nx +import nx_parallel as nxp + +__all__ = ["latapy_clustering", "clustering"] + + +@nxp._configure_if_nx_active() +def latapy_clustering(G, nodes=None, mode="dot", get_chunks="chunks"): + """In the parallel implementation we divide the nodes into chunks and compute + the bipartite clustering coefficient for all `node_chunk` in parallel. + + networkx.bipartite.latapy_clustering : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.bipartite.cluster.latapy_clustering.html + + Parameters + ---------- + get_chunks : str, function (default = "chunks") + A function that takes in an iterable of all the nodes as input and returns + an iterable `node_chunks`. The default chunking is done by slicing the + `G.nodes` (or `nodes`) into `n_jobs` number of chunks. + """ + + def _process_chunk(chunk): + ccs = {} + for v in chunk: + cc = 0.0 + nbrs2 = {u for nbr in G[v] for u in G[nbr]} - {v} + for u in nbrs2: + cc += cc_func(set(G[u]), set(G[v])) + if cc > 0.0: # len(nbrs2)>0 + cc /= len(nbrs2) + ccs[v] = cc + return ccs + + if hasattr(G, "graph_object"): + G = G.graph_object + + if not nx.algorithms.bipartite.is_bipartite(G): + raise nx.NetworkXError("Graph is not bipartite") + + try: + cc_func = modes[mode] + except KeyError as err: + raise nx.NetworkXError( + "Mode for bipartite clustering must be: dot, min or max" + ) from err + + if nodes is None: + nodes = G + n_jobs = nxp.get_n_jobs() + if get_chunks == "chunks": + node_chunks = nxp.chunks(nodes, n_jobs) + else: + node_chunks = get_chunks(nodes) + results = Parallel()(delayed(_process_chunk)(chunk) for chunk in node_chunks) + clusterings = {} + for result in results: + clusterings.update(result) + + return clusterings + + +clustering = latapy_clustering diff --git a/nx_parallel/interface.py b/nx_parallel/interface.py index 4674a0a8..2717c5f4 100644 --- a/nx_parallel/interface.py +++ b/nx_parallel/interface.py @@ -7,6 +7,7 @@ ALGORITHMS = [ # Bipartite "node_redundancy", + "latapy_clustering", # Isolates "number_of_isolates", # Vitality diff --git a/nx_parallel/tests/test_get_chunks.py b/nx_parallel/tests/test_get_chunks.py index 284e70ed..53bc309c 100644 --- a/nx_parallel/tests/test_get_chunks.py +++ b/nx_parallel/tests/test_get_chunks.py @@ -71,11 +71,16 @@ def random_chunking(nodes): "v_structures", "colliders", ] + bipartite_funcs = [ + "latapy_clustering", + ] if func in tournament_funcs: G = nx.tournament.random_tournament(15, seed=42) elif func in dag_funcs: G = nx.gn_graph(25, seed=42, create_using=nx.DiGraph) + elif func in bipartite_funcs: + G = nx.bipartite.random_graph(20, 25, 0.6, seed=42, directed=True) else: G = nx.fast_gnp_random_graph( 40, 0.6, seed=42, directed=func in not_implemented_undirected diff --git a/timing/new_heatmaps/heatmap_latapy_clustering_timing.png b/timing/new_heatmaps/heatmap_latapy_clustering_timing.png new file mode 100644 index 00000000..6e0ed63e Binary files /dev/null and b/timing/new_heatmaps/heatmap_latapy_clustering_timing.png differ diff --git a/timing/new_heatmaps/timing_individual_function.py b/timing/new_heatmaps/timing_individual_function.py index 5cc77d52..669ef35a 100644 --- a/timing/new_heatmaps/timing_individual_function.py +++ b/timing/new_heatmaps/timing_individual_function.py @@ -15,7 +15,7 @@ seed = random.Random(42) tournament_funcs = ["is_reachable", "is_strongly_connected"] -bipartite_funcs = ["node_redundancy"] +bipartite_funcs = ["node_redundancy", "latapy_clustering"] community_funcs = [ "ra_index_soundarajan_hopcroft", "cn_soundarajan_hopcroft", @@ -58,19 +58,21 @@ def record_result(stdTime, parallelTime, row, col): G = nx.bipartite.random_graph( n[ind], m[ind], p, directed=True, seed=seed ) - for cur_node in G.nodes: - neighbors = set(G.neighbors(cur_node)) - # have atleast 2 outgoing edges - while len(neighbors) < 2: - new_neighbor = seed.choice( - [ - node - for node in G.nodes - if node != cur_node and node not in neighbors - ] - ) - G.add_edge(cur_node, new_neighbor) - neighbors.add(new_neighbor) + A, B = range(n[ind]), range(n[ind], n[ind] + m[ind]) + for source_group, target_group in [(A, B), (B, A)]: + for cur_node in source_group: + neighbors = set(G.neighbors(cur_node)) + # have atleast 2 outgoing edges + while len(neighbors) < 2: + new_neighbor = seed.choice( + [ + node + for node in target_group + if node not in neighbors + ] + ) + G.add_edge(cur_node, new_neighbor) + neighbors.add(new_neighbor) else: print(f"Number of Nodes: {num}") G = nx.fast_gnp_random_graph(