Skip to content
1 change: 1 addition & 0 deletions nx_parallel/algorithms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
from .cluster import *
from .link_prediction import *
from .dag import *
from .mis import *
189 changes: 189 additions & 0 deletions nx_parallel/algorithms/mis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
from typing import Any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isnt used

from joblib import Parallel, delayed
import nx_parallel as nxp
import networkx as nx

__all__ = ["maximal_independent_set"]

# Import the actual NetworkX implementation
from networkx.algorithms.mis import maximal_independent_set as _nx_mis_dispatcher
_nx_mis = _nx_mis_dispatcher.orig_func
Comment on lines +8 to +10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be simpler -- not even an import. And since we only use it once, maybe we can just use the expression directly without the abbreviation.

# abbreviation for the actual NetworkX implementation
_nx_mis = nx.maximal_independent_set.orig_func



@nxp._configure_if_nx_active(should_run=nxp.should_run_if_large(50000))
def maximal_independent_set(G, nodes=None, seed=None, get_chunks="chunks"):
"""Returns a random maximal independent set guaranteed to contain
a given set of nodes.
This parallel implementation processes nodes in chunks across multiple
cores, using a Luby-style randomized parallel algorithm for speedup
on large graphs.
An independent set is a set of nodes such that the subgraph
of G induced by these nodes contains no edges. A maximal
independent set is an independent set such that it is not possible
to add a new node and still get an independent set.
The parallel computation divides nodes into chunks and processes them
in parallel, iteratively building the independent set faster than
sequential processing on large graphs.
networkx.maximal_independent_set: https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.mis.maximal_independent_set.html
Parameters
----------
G : NetworkX graph
An undirected graph.
nodes : list or iterable, optional
Nodes that must be part of the independent set. This set of nodes
must be independent. If not provided, a random starting node is chosen.
seed : integer, random_state, or None (default)
Indicator of random number generation state.
See :ref:`Randomness<randomness>`.
get_chunks : str, function (default = "chunks")
A function that takes in a list of nodes and returns chunks.
The default chunking divides nodes into n_jobs chunks.
Returns
-------
indep_nodes : list
List of nodes that are part of a maximal independent set.
Raises
------
NetworkXUnfeasible
If the nodes in the provided list are not part of the graph or
do not form an independent set, an exception is raised.
NetworkXNotImplemented
If `G` is directed.
Examples
--------
>>> import networkx as nx
>>> import nx_parallel as nxp
>>> G = nx.path_graph(5)
>>> nxp.maximal_independent_set(G) # doctest: +SKIP
[4, 0, 2]
>>> nxp.maximal_independent_set(G, [1]) # doctest: +SKIP
[1, 3]
Notes
-----
This algorithm does not solve the maximum independent set problem.
The parallel version uses a chunk-based parallel algorithm that
provides speedup on large graphs (>= 50000 nodes). For smaller graphs,
the NetworkX sequential version is used automatically.
"""
if hasattr(G, "graph_object"):
G = G.graph_object

# Validate directed graph
if G.is_directed():
raise nx.NetworkXNotImplemented("NX-PARALLEL: Not implemented for directed graphs.")

# Note: When called through nx.maximal_independent_set with backend="parallel",
# the @py_random_state(2) decorator in NetworkX runs BEFORE @_dispatchable,
# so seed is already a Random object by the time it reaches this backend function.
# However, keeping this conversion for defensive purposes in case this function
# is called directly via nxp.maximal_independent_set().
import random
if seed is not None and hasattr(seed, 'random'):
rng = seed
elif seed is not None:
rng = random.Random(seed)
else:
rng = random._inst

# Check if we should run parallel version
# This is needed when backend is explicitly specified
should_run_result = maximal_independent_set.should_run(G, nodes, seed)
if should_run_result is not True:
# Fall back to NetworkX sequential (unwrapped version needs Random object)
return _nx_mis(G, nodes=nodes, seed=rng)
Comment on lines +102 to +107
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This definitely gets called by the backend machinery. This code should never be reached.


# Validate nodes parameter
if nodes is not None:
nodes_set = set(nodes)
if not nodes_set.issubset(G):
raise nx.NetworkXUnfeasible(f"{nodes} is not a subset of the nodes of G")
neighbors = set.union(*[set(G.adj[v]) for v in nodes_set]) if nodes_set else set()
if set.intersection(neighbors, nodes_set):
raise nx.NetworkXUnfeasible(f"{nodes} is not an independent set of G")
else:
nodes_set = set()

n_jobs = nxp.get_n_jobs()

# Parallel strategy: Run complete MIS algorithm on node chunks independently
all_nodes = list(G)

# Remove required nodes and their neighbors from consideration
if nodes_set:
available = set(all_nodes) - nodes_set
for node in nodes_set:
available.difference_update(G.neighbors(node))
available = list(available)
else:
available = all_nodes

# Shuffle for randomness
rng.shuffle(available)

# Split into chunks
if get_chunks == "chunks":
chunks = list(nxp.chunks(available, n_jobs))
else:
chunks = list(get_chunks(available))

# Precompute adjacency
adj_dict = {node: set(G.neighbors(node)) for node in G.nodes()}

def _process_chunk_independent(chunk, chunk_seed):
"""Process chunk completely independently - build local MIS."""
local_rng = random.Random(chunk_seed)
local_mis = []
local_excluded = set()

# Shuffle chunk for randomness
chunk_list = list(chunk)
local_rng.shuffle(chunk_list)

for node in chunk_list:
if node not in local_excluded:
# Add to MIS
local_mis.append(node)
local_excluded.add(node)
# Mark neighbors as excluded (only within this chunk)
for neighbor in adj_dict[node]:
if neighbor in chunk_list:
local_excluded.add(neighbor)

return local_mis

# Generate seeds for each chunk
chunk_seeds = [rng.randint(0, 2**31 - 1) for _ in range(len(chunks))]

# Process chunks in parallel
results = Parallel()(
delayed(_process_chunk_independent)(chunk, chunk_seeds[i])
for i, chunk in enumerate(chunks)
)

# Merge results: resolve conflicts between chunks
indep_set = list(nodes_set) if nodes_set else []
excluded = set(all_nodes) - set(available) if nodes_set else set()

# Process results in order, greedily adding non-conflicting nodes
for local_mis in results:
for node in local_mis:
if node not in excluded:
indep_set.append(node)
excluded.add(node)
excluded.update(adj_dict[node])

return indep_set
125 changes: 125 additions & 0 deletions nx_parallel/algorithms/tests/test_mis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import networkx as nx
import nx_parallel as nxp
import pytest


def test_maximal_independent_set_basic():
G = nx.path_graph(5)
H = nxp.ParallelGraph(G)
result = nxp.maximal_independent_set(H)

result_set = set(result)
for node in result:
neighbors = set(G.neighbors(node))
assert not result_set.intersection(neighbors)

for node in G.nodes():
if node not in result_set:
neighbors = set(G.neighbors(node))
assert result_set.intersection(neighbors)


def test_maximal_independent_set_with_required_nodes():
G = nx.path_graph(7)
H = nxp.ParallelGraph(G)
required_nodes = [1, 3]
result = nxp.maximal_independent_set(H, nodes=required_nodes)

assert 1 in result
assert 3 in result

result_set = set(result)
for node in result:
neighbors = set(G.neighbors(node))
assert not result_set.intersection(neighbors)


def test_maximal_independent_set_invalid_nodes():
G = nx.path_graph(5)
H = nxp.ParallelGraph(G)

with pytest.raises(nx.NetworkXUnfeasible):
nxp.maximal_independent_set(H, nodes=[10, 20])

with pytest.raises(nx.NetworkXUnfeasible):
nxp.maximal_independent_set(H, nodes=[0, 1])


def test_maximal_independent_set_directed_graph():
G = nx.DiGraph([(0, 1), (1, 2)])
H = nxp.ParallelGraph(G)

with pytest.raises(nx.NetworkXNotImplemented):
nxp.maximal_independent_set(H)


def test_maximal_independent_set_deterministic_with_seed():
G = nx.karate_club_graph()
H = nxp.ParallelGraph(G)

result1 = nxp.maximal_independent_set(H, seed=42)
result2 = nxp.maximal_independent_set(H, seed=42)

assert result1 == result2


def test_maximal_independent_set_different_seeds():
G = nx.karate_club_graph()
H = nxp.ParallelGraph(G)

result1 = nxp.maximal_independent_set(H, seed=42)
result2 = nxp.maximal_independent_set(H, seed=100)

for result in [result1, result2]:
result_set = set(result)
for node in result:
neighbors = set(G.neighbors(node))
assert not result_set.intersection(neighbors)


def test_maximal_independent_set_complete_graph():
G = nx.complete_graph(5)
H = nxp.ParallelGraph(G)
result = nxp.maximal_independent_set(H)

assert len(result) == 1


def test_maximal_independent_set_empty_graph():
G = nx.empty_graph(5)
H = nxp.ParallelGraph(G)
result = nxp.maximal_independent_set(H)

assert len(result) == 5


def test_maximal_independent_set_large_graph():
G = nx.fast_gnp_random_graph(150, 0.1, seed=42)
H = nxp.ParallelGraph(G)
result = nxp.maximal_independent_set(H, seed=42)

result_set = set(result)
for node in result:
neighbors = set(G.neighbors(node))
assert not result_set.intersection(neighbors)

for node in G.nodes():
if node not in result_set:
neighbors = set(G.neighbors(node))
assert result_set.intersection(neighbors)


def test_maximal_independent_set_random_graph():
G = nx.fast_gnp_random_graph(50, 0.1, seed=42)
H = nxp.ParallelGraph(G)
result = nxp.maximal_independent_set(H, seed=42)

result_set = set(result)
for node in result:
neighbors = set(G.neighbors(node))
assert not result_set.intersection(neighbors)

for node in G.nodes():
if node not in result_set:
neighbors = set(G.neighbors(node))
assert result_set.intersection(neighbors)
2 changes: 2 additions & 0 deletions nx_parallel/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
"average_neighbor_degree",
# Connectivity
"all_pairs_node_connectivity",
# Maximal Independent Set
"maximal_independent_set",
]


Expand Down
1 change: 1 addition & 0 deletions nx_parallel/tests/test_get_chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def test_get_functions_with_get_chunks():
ignore_funcs = [
"number_of_isolates",
"is_reachable",
"maximal_independent_set",
]


Expand Down
39 changes: 30 additions & 9 deletions nx_parallel/utils/should_run_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,41 @@
]


def should_skip_parallel(*_):
def should_skip_parallel(*_, **__):
return "Fast algorithm; skip parallel execution"


def should_run_if_large(G, *_):
if hasattr(G, "graph_object"):
G = G.graph_object
def should_run_if_large(G=None, nodes_threshold=200, *_, **__):
# Detect if first arg is a graph (has both __len__ and nodes attributes)
is_graph = G is not None and hasattr(G, '__len__') and hasattr(G, 'nodes')

if is_graph:
# Direct usage: called with a graph as first argument
# Example: should_run_if_large(G) or func.should_run(G)
if hasattr(G, "graph_object"):
G = G.graph_object

if len(G) <= 200:
return "Graph too small for parallel execution"
return True
if len(G) < nodes_threshold:
return "Graph too small for parallel execution"
return True

# Factory usage: called with threshold (positional or keyword) but no graph
# Examples: should_run_if_large(50000) or should_run_if_large(nodes_threshold=50000)
# Use G if it's a number (threshold passed positionally), otherwise use nodes_threshold
threshold = G if G is not None and isinstance(G, (int, float)) else nodes_threshold

def wrapper(G, *_, **__):
if hasattr(G, "graph_object"):
G = G.graph_object

if len(G) < threshold:
return "Graph too small for parallel execution"
return True

return wrapper


def default_should_run(*_):
def default_should_run(*_, **__):
n_jobs = nxp.get_n_jobs()
print(f"{n_jobs=}")
if n_jobs in (None, 0, 1):
Expand All @@ -31,7 +52,7 @@ def default_should_run(*_):


def should_run_if_sparse(threshold=0.3):
def wrapper(G, *_):
def wrapper(G, *_, **__):
if hasattr(G, "graph_object"):
G = G.graph_object

Expand Down
Loading