|
| 1 | +"""Tests for the weighted_modularity module""" |
| 2 | + |
| 3 | +#----------------------------------------------------------------------------- |
| 4 | +# Imports |
| 5 | +#----------------------------------------------------------------------------- |
| 6 | +import os |
| 7 | +import unittest |
| 8 | + |
| 9 | + |
| 10 | +# Third party |
| 11 | +import networkx as nx |
| 12 | +import nose.tools as nt |
| 13 | +import numpy as np |
| 14 | +import numpy.testing as npt |
| 15 | + |
| 16 | +# Our own |
| 17 | +from .. import util |
| 18 | +from .. import weighted_modularity as wm |
| 19 | + |
| 20 | + |
| 21 | +def get_test_data(): |
| 22 | + """ grabs local txt file with adj matrices |
| 23 | + Returns |
| 24 | + ======= |
| 25 | + graph : networkx graph |
| 26 | + communities : list of sets |
| 27 | + """ |
| 28 | + pth, _ = os.path.split(__file__) |
| 29 | + testdir = os.path.join(pth, 'tdata_corr_txt') |
| 30 | + data_file = os.path.join(testdir, '101_Block01.txt') |
| 31 | + mat = np.loadtxt(data_file) |
| 32 | + mat[mat<0] = 0 |
| 33 | + graph = nx.from_numpy_matrix(mat) |
| 34 | + # graph has 85 nodes, make generic communities |
| 35 | + communities = [set(range(42)), set(range(42,86))] |
| 36 | + return graph, communities |
| 37 | + |
| 38 | +class TestWeightedPartition(unittest.TestCase): |
| 39 | + |
| 40 | + def setUp(self): |
| 41 | + ## generate a default graph and communities |
| 42 | + graph, communities = get_test_data() |
| 43 | + self.graph = graph |
| 44 | + self.communities = communities |
| 45 | + |
| 46 | + def test_init(self): |
| 47 | + part = wm.WeightedPartition(self.graph) |
| 48 | + self.assertEqual(type(part.degrees), type({})) |
| 49 | + npt.assert_array_almost_equal(part.total_edge_weight, 1500.5653444) |
| 50 | + # generated communities |
| 51 | + comm = [set([node]) for node in self.graph.nodes()] |
| 52 | + self.assertEqual(part.communities, comm) |
| 53 | + # test communities cannot be replaced by garbage |
| 54 | + with self.assertRaises(TypeError): |
| 55 | + part.communities = 11 |
| 56 | + # doesnt work if nodes are missing from partition |
| 57 | + with self.assertRaises(ValueError): |
| 58 | + part.communities = [set([1,2,3])] |
| 59 | + # but we can pass a valid community partition |
| 60 | + part.communities = comm |
| 61 | + self.assertEqual(part.communities, comm) |
| 62 | + |
| 63 | + def test_communities_degree(self): |
| 64 | + ## if no community, method will raise error |
| 65 | + part = wm.WeightedPartition(self.graph) |
| 66 | + part = wm.WeightedPartition(self.graph, self.communities) |
| 67 | + cdegree = part.communities_degree() |
| 68 | + self.assertEqual(round(cdegree[0]), 1462.0) |
| 69 | + |
| 70 | + |
| 71 | + def test_set_communities(self): |
| 72 | + part = wm.WeightedPartition(self.graph, self.communities) |
| 73 | + self.assertEqual(part.communities, self.communities) |
| 74 | + with self.assertRaises(TypeError): |
| 75 | + # raise error if not list of sets |
| 76 | + part.set_communities(part.communities[0]) |
| 77 | + with self.assertRaises(TypeError): |
| 78 | + part.set_communities('a') |
| 79 | + with self.assertRaises(ValueError): |
| 80 | + ## missing nodes |
| 81 | + comm = self.graph.nodes()[:-3] |
| 82 | + part.set_communities([set(comm)]) |
| 83 | + |
| 84 | + def test_allnodes_in_communities(self): |
| 85 | + """checks communities contain all nodes |
| 86 | + with no repetition""" |
| 87 | + part = wm.WeightedPartition(self.graph) |
| 88 | + self.assertTrue(part._allnodes_in_communities(self.communities)) |
| 89 | + self.assertFalse(part._allnodes_in_communities([self.communities[0]])) |
| 90 | + |
| 91 | + |
| 92 | + def test_get_node_community(self): |
| 93 | + part = wm.WeightedPartition(self.graph, self.communities) |
| 94 | + self.assertEqual(part.get_node_community(0), 0) |
| 95 | + self.assertEqual(part.get_node_community(self.graph.nodes()[-1]),1) |
| 96 | + with self.assertRaises(ValueError): |
| 97 | + part.get_node_community(-1) |
| 98 | + part = wm.WeightedPartition(self.graph) |
| 99 | + self.assertEqual(part.get_node_community(0), 0) |
| 100 | + |
| 101 | + def test_node_degree(self): |
| 102 | + part = wm.WeightedPartition(self.graph) # one comm per node |
| 103 | + node = 0 |
| 104 | + res = part.node_degree(node) |
| 105 | + npt.assert_almost_equal(res, 37.94151675 ) |
| 106 | + |
| 107 | + def test_modularity(self): |
| 108 | + part = wm.WeightedPartition(self.graph, self.communities) |
| 109 | + npt.assert_almost_equal(part.modularity(), 0.0555463) |
| 110 | + |
| 111 | + |
| 112 | + def test_degree_by_community(self): |
| 113 | + part = wm.WeightedPartition(self.graph) # one comm per node |
| 114 | + ## summ of all links in or out of communities |
| 115 | + ## since one per scommunity, just weighted degree of each node |
| 116 | + tot_per_comm = part.degree_by_community() |
| 117 | + degw = self.graph.degree(weight='weight').values() |
| 118 | + self.assertEqual(tot_per_comm, degw) |
| 119 | + ## This isnt true of we have communities with multiple nodes |
| 120 | + part_2comm = wm.WeightedPartition(self.graph, self.communities) |
| 121 | + self.assertEqual(part_2comm == degw, False) |
| 122 | + |
| 123 | + def test_degree_within_community(self): |
| 124 | + part = wm.WeightedPartition(self.graph) # one comm per node |
| 125 | + weights = part.degree_within_community() |
| 126 | + ## this inlcudes self links so |
| 127 | + self.assertEqual(weights[0], 1.0) |
| 128 | + |
| 129 | + |
| 130 | + |
| 131 | + def test_node_degree_by_community(self): |
| 132 | + part = wm.WeightedPartition(self.graph) # one comm per node |
| 133 | + node = 0 |
| 134 | + node2comm_weights = part.node_degree_by_community(node) |
| 135 | + # self loops not added to weight |
| 136 | + # so communities made only of node should be zero |
| 137 | + npt.assert_equal(node2comm_weights[0],0) |
| 138 | + # this should be equal to weight between two nodes |
| 139 | + neighbor = 1 |
| 140 | + expected = self.graph[node][neighbor]['weight'] |
| 141 | + npt.assert_equal(node2comm_weights[neighbor],expected) |
| 142 | + part = wm.WeightedPartition(self.graph, self.communities) |
| 143 | + node2comm_weights = part.node_degree_by_community(node) |
| 144 | + npt.assert_equal(len(node2comm_weights), 2) |
| 145 | + |
| 146 | + |
| 147 | +class TestLouvainCommunityDetection(unittest.TestCase): |
| 148 | + |
| 149 | + def setUp(self): |
| 150 | + ## generate a default graph and communities |
| 151 | + graph, communities = get_test_data() |
| 152 | + self.graph = graph |
| 153 | + self.communities = communities |
| 154 | + self.louvain = wm.LouvainCommunityDetection(graph) |
| 155 | + self.louvain_comm = wm.LouvainCommunityDetection(graph, communities) |
| 156 | + |
| 157 | + def test_init(self): |
| 158 | + louvain = self.louvain |
| 159 | + self.assertEqual(louvain.graph, self.graph) |
| 160 | + self.assertEqual(louvain.initial_communities, None) |
| 161 | + self.assertEqual(louvain.minthr, 0.0000001) |
| 162 | + |
| 163 | + |
| 164 | + def test_communities_without_node(self): |
| 165 | + part = wm.WeightedPartition(self.graph) # one comm per node |
| 166 | + node = 0 |
| 167 | + updated_comm = self.louvain._communities_without_node(part, node) |
| 168 | + self.assertEqual(updated_comm[0], set([])) |
| 169 | + part = wm.WeightedPartition(self.graph, self.communities) |
| 170 | + updated_comm = self.louvain_comm._communities_without_node(part, node) |
| 171 | + ## make sure we dont break communities from original partition |
| 172 | + self.assertEqual(part.communities, self.communities) |
| 173 | + self.assertEqual(0 not in updated_comm[0], True) |
| 174 | + |
| 175 | + def test_communities_nodes_alledgesw(self): |
| 176 | + part = wm.WeightedPartition(self.graph, self.communities) |
| 177 | + node = 0 |
| 178 | + weights = self.louvain_comm._communities_nodes_alledgesw(part, node) |
| 179 | + npt.assert_almost_equal(weights[0], 1424.0220362) |
| 180 | + ## test with possible empty node set |
| 181 | + part = wm.WeightedPartition(self.graph) |
| 182 | + weights = self.louvain._communities_nodes_alledgesw(part, node) |
| 183 | + self.assertEqual(weights[0], 0) |
| 184 | + # other communities are made up of just one node |
| 185 | + self.assertEqual(weights[1], self.graph.degree(weight='weight')[1]) |
| 186 | + |
| 187 | + |
| 188 | + def test_calc_delta_modularity(self): |
| 189 | + part = wm.WeightedPartition(self.graph) # one comm per node |
| 190 | + node = 0 |
| 191 | + change = self.louvain._calc_delta_modularity(node, part) |
| 192 | + self.assertEqual(len(change), len(part.communities)) |
| 193 | + # change is an array |
| 194 | + self.assertEqual(change.shape[0], len(part.communities)) |
| 195 | + self.assertEqual(change[0] < change[1], True) |
| 196 | + # this is one comm per node, so once removed from own |
| 197 | + # comm, this delta_weight will be zero |
| 198 | + self.assertEqual(change[node] , 0) |
| 199 | + |
| 200 | + def test_move_node(self): |
| 201 | + part = wm.WeightedPartition(self.graph) # one comm per node |
| 202 | + #move first node to second community |
| 203 | + node = 0 |
| 204 | + comm = 1 |
| 205 | + newpart = self.louvain._move_node(part, node, comm) |
| 206 | + self.assertEqual(set([0,1]) in newpart.communities, True) |
| 207 | + ## what happens if node or comm missing |
| 208 | + with self.assertRaises(ValueError): |
| 209 | + newpart = self.louvain._move_node(part, -1, comm) |
| 210 | + invalid_communities = len(part.communities) + 1 |
| 211 | + with self.assertRaises(IndexError): |
| 212 | + newpart = self.louvain._move_node(part, node, invalid_communities) |
| 213 | + |
| 214 | + def test_gen_dendogram(self): |
| 215 | + graph = nx.Graph() |
| 216 | + nodeslist = [0,1,2,3,4] |
| 217 | + graph.add_nodes_from(nodeslist, weight=True) |
| 218 | + louvain = wm.LouvainCommunityDetection(graph) |
| 219 | + self.assertRaises(IOError, louvain._gen_dendogram) |
| 220 | + |
| 221 | + def test_run(self): |
| 222 | + karate = nx.karate_club_graph() |
| 223 | + louvain = wm.LouvainCommunityDetection(karate) |
| 224 | + final_partitions = louvain.run() |
| 225 | + self.assertEqual(final_partitions[-1].modularity() > .38, |
| 226 | + True) |
| 227 | + self.assertEqual(len(final_partitions), 2) |
| 228 | + |
| 229 | + |
| 230 | + def test_combine(self): |
| 231 | + |
| 232 | + first = [set([0,1,2]), set([3,4,5]), set([6,7])] |
| 233 | + second = [set([0,2]), set([1])] |
| 234 | + npt.assert_raises(ValueError, self.louvain._combine, second, first) |
| 235 | + res = self.louvain._combine(first, second) |
| 236 | + npt.assert_equal(res, [set([0,1,2,6,7]), set([3,4,5])]) |
| 237 | + |
| 238 | +def test_meta_graph(): |
| 239 | + graph, communities = get_test_data() |
| 240 | + part = wm.WeightedPartition(graph) |
| 241 | + metagraph,_ = wm.meta_graph(part) |
| 242 | + ## each node is a comm, so no change to metagraph |
| 243 | + npt.assert_equal(metagraph.nodes(), graph.nodes()) |
| 244 | + ## two communitties |
| 245 | + part = wm.WeightedPartition(graph, communities) |
| 246 | + metagraph,mapping = wm.meta_graph(part) |
| 247 | + npt.assert_equal(metagraph.nodes(), [0,1]) |
| 248 | + npt.assert_equal(metagraph.edges(), [(0,0),(0,1), (1,1)]) |
| 249 | + # mapping should map new node 0 to communities[0] |
| 250 | + npt.assert_equal(mapping[0], communities[0]) |
| 251 | + ## weight should not be lost between graphs |
| 252 | + npt.assert_almost_equal(metagraph.size(weight='weight'), |
| 253 | + graph.size(weight='weight')) |
0 commit comments