diff --git a/.bazelrc b/.bazelrc index b40a354..b24765d 100644 --- a/.bazelrc +++ b/.bazelrc @@ -21,7 +21,8 @@ test --test_strategy=standalone # By default build in C++17 mode using the Homegrown scheduler for parallelism. # build --repo_env=CC=g++ build --cxxopt=-std=c++17 -build --cxxopt=-mcx16 # 16 byte CAS +# Apply -mcx16 only for x86_64 +build:x86_64 --cxxopt=-mcx16 # use 16 byte CAS build --cxxopt=-DHOMEGROWN # use the homegrown scheduler build --cxxopt=-DLONG # use 8 byte vertex identifiers build --cxxopt=-DAMORTIZEDPD # use amortized_bytepd encoding scheme for compressed graphs @@ -75,4 +76,4 @@ build:sage --define use_numa=true # Disable Bzlmod for every Bazel command # Turn it on after we migrate to use bzlmod -common --enable_bzlmod=False \ No newline at end of file +common --enable_bzlmod=False diff --git a/.bazelversion b/.bazelversion new file mode 100644 index 0000000..6abaeb2 --- /dev/null +++ b/.bazelversion @@ -0,0 +1 @@ +6.2.0 diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml new file mode 100644 index 0000000..3a18bf1 --- /dev/null +++ b/.github/workflows/main.yaml @@ -0,0 +1,44 @@ +name: Bazel Build and Test + +on: + push: + branches: [ main, master ] + pull_request: + branches: [ main, master ] + +jobs: + build-and-test: + runs-on: ${{ matrix.runner }} + strategy: + matrix: + runner: + - ubuntu-24.04 # amd64 + - ubuntu-24.04-arm # arm64 + fail-fast: false + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Install Bazelisk + run: | + ARCH=$([[ "${{ matrix.runner }}" == "ubuntu-24.04" ]] && echo "amd64" || echo "arm64") + curl -LO "https://github.com/bazelbuild/bazelisk/releases/latest/download/bazelisk-linux-${ARCH}" + chmod +x bazelisk-linux-* + sudo mv bazelisk-linux-* /usr/local/bin/bazel + + - name: Cache Bazel + uses: actions/cache@v4 + with: + path: | + ~/.cache/bazel + ~/.cache/bazelisk + key: bazel-${{ matrix.runner }}-${{ hashFiles('WORKSPACE', '**/BUILD', '**/*.bzl') }} + restore-keys: | + bazel-${{ matrix.runner }}- + + - name: Run Bazel Build + run: | + ARCH=$([[ "${{ matrix.runner }}" == "ubuntu-24.04" ]] && echo "amd64" || echo "arm64") + bazel build //clusterers:cluster-in-memory_main + bazel build //clusterers:stats-in-memory_main diff --git a/WORKSPACE b/WORKSPACE index 9606350..f583bab 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -5,7 +5,7 @@ load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") git_repository( name = "parcluster", - remote = "git@github.com:ParAlg/InMemoryClusteringAPI.git", + remote = "https://github.com/ParAlg/InMemoryClusteringAPI.git", commit = "ccf763ad6dcf0e36c3b0b212d429ef19c2cbf837", strip_prefix = "include/", ) diff --git a/cluster.py b/cluster.py index c147fb5..9c20768 100755 --- a/cluster.py +++ b/cluster.py @@ -1,312 +1,558 @@ import os -import sys import time import runner_utils import traceback +import argparse import pandas as pd + def write_snap_connectivity(file_path, output_path=None, num_lines=3): # Open the file for reading - with open(file_path, 'r') as file: + with open(file_path, "r") as file: lines = file.readlines() # Remove the first 'num_lines' lines remaining_lines = lines[num_lines:] # Remove the first number before tab for each line - remaining_lines = [line.split('\t', 1)[1] for line in remaining_lines] + remaining_lines = [line.split("\t", 1)[1] for line in remaining_lines] # Write the remaining lines to the same file or a new file if output_path is None: output_path = file_path - with open(output_path, 'w') as file: + with open(output_path, "w") as file: file.writelines(remaining_lines) + # Graph must be in edge format -def runSnap(clusterer, graph, graph_idx, round, runtime_dict): - if (runner_utils.gbbs_format == "true"): - raise ValueError("SNAP can only be run using edge list format") - use_input_graph = runner_utils.input_directory + graph - out_prefix = runner_utils.output_directory + clusterer + "_" + str(graph_idx) + "_" + str(round) - out_clustering = out_prefix + ".cluster" - out_filename = out_prefix + ".out" - snap_binary = "community" - args = "" - output_postfix = "" - print("Compiling snap binaries. This might take a while if it's the first time.") - if (clusterer == "SnapGirvanNewman"): - runner_utils.shellGetOutput("(cd external/snap/examples/%s && make all)" % snap_binary) - alg_number = 1 - args = " -a:" + str(alg_number) - elif (clusterer == "SnapInfomap"): - runner_utils.shellGetOutput("(cd external/snap/examples/%s && make all)" % snap_binary) - alg_number = 3 - args = " -a:" + str(alg_number) - elif (clusterer == "SnapCNM"): - runner_utils.shellGetOutput("(cd external/snap/examples/%s && make all)" % snap_binary) - alg_number = 2 - args = " -a:" + str(alg_number) - elif (clusterer == "SnapConnectivity"): - snap_binary = "concomp" - args = " -wcconly:T" - output_postfix = ".wcc.txt" - runner_utils.shellGetOutput("(cd external/snap/examples/%s && make all)" % snap_binary) - elif (clusterer == "SnapKCore"): - snap_binary = "kcores" - args = " -s:F" # Save the k-core network (for every k) (default:'T') - runner_utils.shellGetOutput("(cd external/snap/examples/%s && make all)" % snap_binary) - else: - raise("Clusterer is not implemented.") - print("Compilation done.") - cmds = runner_utils.timeout + " external/snap/examples/%s/%s -i:" % (snap_binary, snap_binary) + use_input_graph + " -o:" + out_clustering + args - # print(cmds) - if runner_utils.postprocess_only != "true": - runner_utils.appendToFile('Snap: \n', out_filename) - runner_utils.appendToFile("Input graph: " + graph + "\n", out_filename) - out_time = runner_utils.shellGetOutput(cmds) - runner_utils.appendToFile(out_time, out_filename) - # postprocess to match our clustering format - if (clusterer == "SnapConnectivity"): - os.rename(out_clustering + output_postfix, out_clustering) - write_snap_connectivity(out_clustering) - print("postprocessing..." + out_filename) - with open(out_filename,'r') as f: - run_info = f.readlines() - for elem in run_info[1:]: - if elem.startswith('Wealy Connected Component Time:') or elem.startswith('KCore Time:') or elem.startswith('Cluster Time:'): - runtime_dict['Cluster Time'] = elem.split(' ')[-1].strip() - - - -def runNeo4j(clusterer, graph, thread, config, weighted, out_prefix, runtime_dict): - if (runner_utils.gbbs_format == "true"): - raise ValueError("Neo4j can only be run using edge list format") - use_input_graph = runner_utils.input_directory + graph - out_clustering = out_prefix + ".cluster" - out_filename = out_prefix + ".out" - alg_name = clusterer[5:] - thread = int(thread) - if runner_utils.postprocess_only != "true": - import cluster_neo4j - out_time = cluster_neo4j.runNeo4j(use_input_graph, graph, alg_name, thread, config, weighted, out_clustering) - runner_utils.appendToFile(out_time, out_filename) - print("postprocessing..." + out_filename) - with open(out_filename,'r') as f: - run_info = f.readlines() - for elem in run_info[1:]: - if elem.startswith("Time:"): - runtime_dict['Cluster Time'] = elem.split(' ')[-1].strip() +def runSnap( + clusterer, graph, graph_idx, round, runtime_dict, config: runner_utils.ClusterConfig +): + if config.gbbs_format == "true": + raise ValueError("SNAP can only be run using edge list format") + use_input_graph = config.input_directory + graph + out_prefix = ( + config.output_directory + clusterer + "_" + str(graph_idx) + "_" + str(round) + ) + out_clustering = out_prefix + ".cluster" + out_filename = out_prefix + ".out" + snap_binary = "community" + args = "" + output_postfix = "" + print("Compiling snap binaries. This might take a while if it's the first time.") + if clusterer == "SnapGirvanNewman": + runner_utils.shellGetOutput( + "(cd external/snap/examples/%s && make all)" % snap_binary + ) + alg_number = 1 + args = " -a:" + str(alg_number) + elif clusterer == "SnapInfomap": + runner_utils.shellGetOutput( + "(cd external/snap/examples/%s && make all)" % snap_binary + ) + alg_number = 3 + args = " -a:" + str(alg_number) + elif clusterer == "SnapCNM": + runner_utils.shellGetOutput( + "(cd external/snap/examples/%s && make all)" % snap_binary + ) + alg_number = 2 + args = " -a:" + str(alg_number) + elif clusterer == "SnapConnectivity": + snap_binary = "concomp" + args = " -wcconly:T" + output_postfix = ".wcc.txt" + runner_utils.shellGetOutput( + "(cd external/snap/examples/%s && make all)" % snap_binary + ) + elif clusterer == "SnapKCore": + snap_binary = "kcores" + args = " -s:F" # Save the k-core network (for every k) (default:'T') + runner_utils.shellGetOutput( + "(cd external/snap/examples/%s && make all)" % snap_binary + ) + else: + raise ("Clusterer is not implemented.") + print("Compilation done.") + cmds = ( + config.timeout + + " external/snap/examples/%s/%s -i:" % (snap_binary, snap_binary) + + use_input_graph + + " -o:" + + out_clustering + + args + ) + # print(cmds) + if config.postprocess_only != "true": + runner_utils.appendToFile("Snap: \n", out_filename) + runner_utils.appendToFile("Input graph: " + graph + "\n", out_filename) + out_time = runner_utils.shellGetOutput(cmds) + runner_utils.appendToFile(out_time, out_filename) + # postprocess to match our clustering format + if clusterer == "SnapConnectivity": + os.rename(out_clustering + output_postfix, out_clustering) + write_snap_connectivity(out_clustering) + print("postprocessing..." + out_filename) + with open(out_filename, "r") as f: + run_info = f.readlines() + for elem in run_info[1:]: + if ( + elem.startswith("Wealy Connected Component Time:") + or elem.startswith("KCore Time:") + or elem.startswith("Cluster Time:") + ): + runtime_dict["Cluster Time"] = elem.split(" ")[-1].strip() + + +def runNeo4j( + clusterer, + graph, + thread, + config, + weighted, + out_prefix, + runtime_dict, + cluster_config: runner_utils.ClusterConfig, +): + if cluster_config.gbbs_format == "true": + raise ValueError("Neo4j can only be run using edge list format") + use_input_graph = cluster_config.input_directory + graph + out_clustering = out_prefix + ".cluster" + out_filename = out_prefix + ".out" + alg_name = clusterer[5:] + thread = int(thread) + if cluster_config.postprocess_only != "true": + import cluster_neo4j + + out_time = cluster_neo4j.runNeo4j( + use_input_graph, graph, alg_name, thread, config, weighted, out_clustering + ) + runner_utils.appendToFile(out_time, out_filename) + print("postprocessing..." + out_filename) + with open(out_filename, "r") as f: + run_info = f.readlines() + for elem in run_info[1:]: + if elem.startswith("Time:"): + runtime_dict["Cluster Time"] = elem.split(" ")[-1].strip() + # Graph must be in edge format -def runTectonic(clusterer, graph, thread, config, out_prefix, runtime_dict): - if (runner_utils.gbbs_format == "true"): - raise ValueError("Tectonic can only be run using edge list format") - runner_utils.readSystemConfig(sys.argv[2]) - use_input_graph = runner_utils.input_directory + graph - out_clustering_tmp = out_prefix + ".tmpcluster" - out_clustering = out_prefix + ".cluster" - out_filename = out_prefix + ".out" - runner_utils.shellGetOutput("(cd external/Tectonic/mace && make)") - runner_utils.shellGetOutput("(cd external/Tectonic && make all)") - threshold = "0.06" - # no_pruning = True - split = [x.strip() for x in config.split(',')] - for config_item in split: - config_split = [x.strip() for x in config_item.split(':')] - if config_split: - if config_split[0].startswith("threshold"): - if config_split[1]!="": - threshold = config_split[1] - # elif config_split[0].startswith("no_pruning"): - # no_pruning = True if config_split[1].startswith("True") else False - - if runner_utils.postprocess_only == "true": +def runTectonic( + clusterer, + graph, + thread, + config, + out_prefix, + runtime_dict, + cluster_config: runner_utils.ClusterConfig, + system_config: runner_utils.SystemConfig, +): + if cluster_config.gbbs_format == "true": + raise ValueError("Tectonic can only be run using edge list format") + use_input_graph = cluster_config.input_directory + graph + out_clustering_tmp = out_prefix + ".tmpcluster" + out_clustering = out_prefix + ".cluster" + out_filename = out_prefix + ".out" + runner_utils.shellGetOutput("(cd external/Tectonic/mace && make)") + runner_utils.shellGetOutput("(cd external/Tectonic && make all)") + threshold = "0.06" + # no_pruning = True + split = [x.strip() for x in config.split(",")] + for config_item in split: + config_split = [x.strip() for x in config_item.split(":")] + if config_split: + if config_split[0].startswith("threshold"): + if config_split[1] != "": + threshold = config_split[1] + # elif config_split[0].startswith("no_pruning"): + # no_pruning = True if config_split[1].startswith("True") else False + + if cluster_config.postprocess_only == "true": + print("postprocessing..." + out_filename) + with open(out_filename, "r") as f: + run_info = f.readlines() + for elem in run_info[1:]: + if elem.startswith("Cluster Time:"): + runtime_dict["Cluster Time"] = elem.split(" ")[-1].strip() + else: + # Timing from here + start_time = time.time() + # relabel the graph so the node vertices are consecutive. The result format: each line i is the neighbors of i, and each edge only appear once in the smaller id's line. + num_vert = runner_utils.shellGetOutput( + system_config.python_ver + + " external/Tectonic/relabel-graph-no-comm.py " + + use_input_graph + + " " + + out_prefix + + ".mace" + + " " + + out_prefix + + ".pickle" + ) + num_vert = num_vert.strip() + runner_utils.shellGetOutput( + "external/Tectonic/mace/mace C -l 3 -u 3 " + + out_prefix + + ".mace " + + out_prefix + + ".triangles" + ) + runner_utils.shellGetOutput( + system_config.python_ver + + " external/Tectonic/mace-to-list.py " + + out_prefix + + ".mace " + + out_prefix + + ".edges" + ) + # if (no_pruning): + runner_utils.shellGetOutput( + system_config.python_ver + + " external/Tectonic/weighted-edges-no-mixed.py " + + out_prefix + + ".triangles " + + out_prefix + + ".edges " + + out_prefix + + ".weighted " + + out_prefix + + ".mixed " + + num_vert + ) + cluster = runner_utils.shellGetOutput( + "external/Tectonic/tree-clusters-parameter-no-mixed " + + out_prefix + + ".weighted " + + num_vert + + " " + + threshold + ) + # else: + # runner_utils.shellGetOutput(system_config.python_ver + " external/Tectonic/weighted-edges.py " + out_prefix + ".triangles " + out_prefix + ".edges " + out_prefix + ".weighted " + out_prefix + ".mixed " + num_vert) + # cluster = runner_utils.shellGetOutput("external/Tectonic/tree-clusters-parameter " + out_prefix + ".weighted " + num_vert + " " + threshold) + end_time = time.time() + # Output running time to out_filename + runner_utils.appendToFile(cluster, out_clustering_tmp) + runner_utils.shellGetOutput( + system_config.python_ver + + " external/Tectonic/relabel-clusters.py " + + use_input_graph + + " " + + out_clustering_tmp + + " " + + out_clustering + + " " + + out_prefix + + ".pickle" + ) + runner_utils.appendToFile("Tectonic: \n", out_filename) + runner_utils.appendToFile("Input graph: " + graph + "\n", out_filename) + runner_utils.appendToFile(config + "\n", out_filename) + runner_utils.appendToFile( + "Cluster Time: " + str(end_time - start_time) + "\n", out_filename + ) + runtime_dict["Cluster Time"] = str(end_time - start_time) + + ## remove intermediate files + runner_utils.shellGetOutput("rm " + out_prefix + ".triangles ") + runner_utils.shellGetOutput("rm " + out_prefix + ".mace ") + runner_utils.shellGetOutput("rm " + out_prefix + ".edges ") + runner_utils.shellGetOutput("rm " + out_prefix + ".weighted ") + runner_utils.shellGetOutput("rm " + out_prefix + ".tmpcluster ") + runner_utils.shellGetOutput("rm " + out_prefix + ".pickle") + + +# cd external/Tectonic/ +# cd mace; make +# python2 relabel-graph.py com-amazon.ungraph.txt com-amazon.top5000.cmty.txt amazon.mace amazon.communities +# mace/mace C -l 3 -u 3 amazon.mace amazon.triangles +# python2 mace-to-list.py amazon.mace amazon.edges +# python2 weighted-edges.py amazon.triangles amazon.edges amazon.weighted amazon.mixed $(head -n1 amazon.communities) +# g++-12 -std=c++11 -o tree-clusters tree-clusters.cpp +# ./tree-clusters amazon.weighted 334863 > amazon.clusters +# python2 grade-clusters.py amazon.communities amazon.clusters amazon.grading + + +def run_tigergraph( + conn, + clusterer, + graph, + thread, + config, + weighted, + out_prefix, + runtime_dict, + cluster_config: runner_utils.ClusterConfig, +): + if cluster_config.gbbs_format == "true": + raise ValueError("Tigergraph can only be run using edge list format") + use_input_graph = cluster_config.input_directory + graph + out_clustering = out_prefix + ".cluster" + out_filename = out_prefix + ".out" + if cluster_config.postprocess_only != "true": + import cluster_tg + + out_time = cluster_tg.run_tigergraph( + conn, clusterer, out_clustering, thread, config, weighted + ) + runner_utils.appendToFile("Tigergraph: \n", out_filename) + runner_utils.appendToFile("Clusterer: " + clusterer + "\n", out_filename) + runner_utils.appendToFile("Input graph: " + graph + "\n", out_filename) + runner_utils.appendToFile("Threads: " + str(thread) + "\n", out_filename) + runner_utils.appendToFile("Config: " + config + "\n", out_filename) + runner_utils.appendToFile(out_time, out_filename) print("postprocessing..." + out_filename) - with open(out_filename,'r') as f: - run_info = f.readlines() - for elem in run_info[1:]: - if elem.startswith('Cluster Time:'): - runtime_dict['Cluster Time'] = elem.split(' ')[-1].strip() - else: - # Timing from here - start_time = time.time() - # relabel the graph so the node vertices are consecutive. The result format: each line i is the neighbors of i, and each edge only appear once in the smaller id's line. - num_vert = runner_utils.shellGetOutput(runner_utils.python_ver + " external/Tectonic/relabel-graph-no-comm.py " + use_input_graph + " " + out_prefix + ".mace" + " " + out_prefix + ".pickle") - num_vert = num_vert.strip() - runner_utils.shellGetOutput("external/Tectonic/mace/mace C -l 3 -u 3 "+ out_prefix + ".mace " + out_prefix + ".triangles") - runner_utils.shellGetOutput(runner_utils.python_ver + " external/Tectonic/mace-to-list.py " + out_prefix + ".mace " + out_prefix + ".edges") - # if (no_pruning): - runner_utils.shellGetOutput(runner_utils.python_ver + " external/Tectonic/weighted-edges-no-mixed.py " + out_prefix + ".triangles " + out_prefix + ".edges " + out_prefix + ".weighted " + out_prefix + ".mixed " + num_vert) - cluster = runner_utils.shellGetOutput("external/Tectonic/tree-clusters-parameter-no-mixed " + out_prefix + ".weighted " + num_vert + " " + threshold) - # else: - # runner_utils.shellGetOutput(runner_utils.python_ver + " external/Tectonic/weighted-edges.py " + out_prefix + ".triangles " + out_prefix + ".edges " + out_prefix + ".weighted " + out_prefix + ".mixed " + num_vert) - # cluster = runner_utils.shellGetOutput("external/Tectonic/tree-clusters-parameter " + out_prefix + ".weighted " + num_vert + " " + threshold) - end_time = time.time() - # Output running time to out_filename - runner_utils.appendToFile(cluster, out_clustering_tmp) - runner_utils.shellGetOutput(runner_utils.python_ver + " external/Tectonic/relabel-clusters.py " + use_input_graph + " " + out_clustering_tmp + " " + out_clustering + " " + out_prefix + ".pickle") - runner_utils.appendToFile("Tectonic: \n", out_filename) - runner_utils.appendToFile("Input graph: " + graph + "\n", out_filename) - runner_utils.appendToFile(config + "\n", out_filename) - runner_utils.appendToFile("Cluster Time: " + str(end_time - start_time) + "\n", out_filename) - runtime_dict['Cluster Time'] = str(end_time - start_time) - - ## remove intermediate files - runner_utils.shellGetOutput("rm " + out_prefix + ".triangles ") - runner_utils.shellGetOutput("rm " + out_prefix + ".mace ") - runner_utils.shellGetOutput("rm " + out_prefix + ".edges ") - runner_utils.shellGetOutput("rm " + out_prefix + ".weighted ") - runner_utils.shellGetOutput("rm " + out_prefix + ".tmpcluster ") - runner_utils.shellGetOutput("rm " + out_prefix + ".pickle") - -#cd external/Tectonic/ -#cd mace; make -#python2 relabel-graph.py com-amazon.ungraph.txt com-amazon.top5000.cmty.txt amazon.mace amazon.communities -#mace/mace C -l 3 -u 3 amazon.mace amazon.triangles -#python2 mace-to-list.py amazon.mace amazon.edges -#python2 weighted-edges.py amazon.triangles amazon.edges amazon.weighted amazon.mixed $(head -n1 amazon.communities) -#g++-12 -std=c++11 -o tree-clusters tree-clusters.cpp -#./tree-clusters amazon.weighted 334863 > amazon.clusters -#python2 grade-clusters.py amazon.communities amazon.clusters amazon.grading - -def run_tigergraph(conn, clusterer, graph, thread, config, weighted, out_prefix, runtime_dict): - if (runner_utils.gbbs_format == "true"): - raise ValueError("Tigergraph can only be run using edge list format") - use_input_graph = runner_utils.input_directory + graph - out_clustering = out_prefix + ".cluster" - out_filename = out_prefix + ".out" - if runner_utils.postprocess_only != "true": - import cluster_tg - out_time = cluster_tg.run_tigergraph(conn, clusterer, out_clustering, thread, config, weighted) - runner_utils.appendToFile("Tigergraph: \n", out_filename) - runner_utils.appendToFile("Clusterer: " + clusterer + "\n", out_filename) - runner_utils.appendToFile("Input graph: " + graph + "\n", out_filename) - runner_utils.appendToFile("Threads: " + str(thread) + "\n", out_filename) - runner_utils.appendToFile("Config: " + config + "\n", out_filename) - runner_utils.appendToFile(out_time, out_filename) - print("postprocessing..." + out_filename) - with open(out_filename,'r') as f: - run_info = f.readlines() - for elem in run_info[1:]: - if elem.startswith('Total Time:'): - runtime_dict['Cluster Time'] = elem.split(' ')[-1].strip() - -def runAll(config_filename): - runner_utils.readConfig(config_filename) - - runtimes = [] - for graph_idx, graph in enumerate(runner_utils.graphs): - if graph == "SKIP": - continue - neo4j_graph_loaded = False - tigergraph_loaded = False - conn = None - for clusterer_idx, clusterer in enumerate(runner_utils.clusterers): - if clusterer == "SKIP": - continue - try: - if clusterer.startswith("Snap"): - if not os.path.exists(runner_utils.output_directory): - os.makedirs(runner_utils.output_directory) - for i in range(runner_utils.num_rounds): - runtime_dict = {} - runtime_dict['Clusterer Name'] = clusterer - runtime_dict["Input Graph"] = graph - runtime_dict["Threads"] = 1 - runtime_dict["Config"] = "" - runtime_dict["Round"] = i - runSnap(clusterer, graph, graph_idx, i, runtime_dict) - runtimes.append(runtime_dict) - continue - for thread_idx, thread in enumerate(runner_utils.num_threads): - configs = runner_utils.clusterer_configs[clusterer_idx] if runner_utils.clusterer_configs[clusterer_idx] is not None else [""] - config_prefix = runner_utils.clusterer_config_names[clusterer_idx] + "{" if runner_utils.clusterer_configs[clusterer_idx] is not None else "" - config_postfix = "}" if runner_utils.clusterer_configs[clusterer_idx] is not None else "" - for config_idx, config in enumerate(configs): - for i in range(runner_utils.num_rounds): - runtime_dict = {} - runtime_dict['Clusterer Name'] = clusterer - runtime_dict["Input Graph"] = graph - runtime_dict["Threads"] = thread - runtime_dict["Config"] = config - runtime_dict["Round"] = i - out_prefix = runner_utils.output_directory + clusterer + "_" + str(graph_idx) + "_" + thread + "_" + str(config_idx) + "_" + str(i) - if not os.path.exists(runner_utils.output_directory): - os.makedirs(runner_utils.output_directory) - if clusterer.startswith("NetworKit"): - import cluster_nk - cluster_nk.runNetworKit(clusterer, graph, thread, config, out_prefix, runtime_dict) - elif clusterer == "Tectonic": - runTectonic(clusterer, graph, thread, config, out_prefix, runtime_dict) - elif clusterer.startswith("Neo4j"): - if int(thread) > 4: - print("neo4j only run up to 4 threads") - continue - if (not neo4j_graph_loaded) and (runner_utils.postprocess_only != "true"): - use_input_graph = runner_utils.input_directory + graph - import cluster_neo4j - cluster_neo4j.projectGraph(graph, use_input_graph) - neo4j_graph_loaded = True - weighted = runner_utils.weighted == "true" - runNeo4j(clusterer, graph, thread, config + ', num_rounds: ' + str(i), weighted, out_prefix, runtime_dict) - elif clusterer.startswith("TigerGraph"): - weighted = runner_utils.weighted == "true" - if (not tigergraph_loaded) and (runner_utils.postprocess_only != "true"): - from pyTigerGraph import TigerGraphConnection - import cluster_tg - conn = TigerGraphConnection( - host='http://127.0.0.1', - username='tigergraph', - password='tigergraph', - ) - print("connected") - cluster_tg.remove_tigergraph(conn) - cluster_tg.load_tigergraph(conn, graph, runner_utils.input_directory, runner_utils.output_directory, runner_utils.tigergraph_nodes, runner_utils.tigergraph_edges, weighted) - tigergraph_loaded = True - run_tigergraph(conn, clusterer, graph, thread, config, weighted, out_prefix, runtime_dict) - else: - out_filename = out_prefix + ".out" - out_clustering = out_prefix + ".cluster" - use_thread = "" if (thread == "" or thread == "ALL") else "PARLAY_NUM_THREADS=" + thread - use_input_graph = runner_utils.input_directory + graph - if (runner_utils.gbbs_format == "true" and "ungraph" in graph): - print("warning: use gbbs format is true, but seems like snap format is used from graph file name") - ss = (use_thread + " " + runner_utils.timeout + " bazel run //clusterers:cluster-in-memory_main -- --" - "input_graph=" + use_input_graph + " --is_gbbs_format=" + runner_utils.gbbs_format + " --float_weighted=" + runner_utils.weighted + " --clusterer_name=" + clusterer + " " - "--clusterer_config='" + config_prefix + config + config_postfix + "' " - "--output_clustering=" + out_clustering) - if runner_utils.postprocess_only.lower() != "true": - print(ss) - out = runner_utils.shellGetOutput(ss) - runner_utils.appendToFile(ss + "\n", out_filename) - runner_utils.appendToFile(out, out_filename) - print("postprocessing... " + out_filename) - with open(out_filename,'r') as f: - run_info = f.readlines() - for elem in run_info[1:]: - if elem.startswith('Cluster Time:'): - runtime_dict['Cluster Time'] = elem.split(' ')[-1].strip() - runtimes.append(runtime_dict) - except Exception as e: - # Print the stack trace - traceback.print_exc() - if neo4j_graph_loaded: - cluster_neo4j.clearDB(graph) - if tigergraph_loaded: - cluster_tg.remove_tigergraph(conn) - - runtime_dataframe = pd.DataFrame(runtimes) - if not os.path.exists(runner_utils.csv_output_directory): - os.makedirs(runner_utils.csv_output_directory) - runtime_dataframe.to_csv(runner_utils.csv_output_directory + '/runtimes.csv', mode='a', - columns=["Clusterer Name","Input Graph","Threads","Config","Round","Cluster Time"]) + with open(out_filename, "r") as f: + run_info = f.readlines() + for elem in run_info[1:]: + if elem.startswith("Total Time:"): + runtime_dict["Cluster Time"] = elem.split(" ")[-1].strip() + + +def runAll( + config: runner_utils.ClusterConfig, system_config: runner_utils.SystemConfig = None +): + runtimes = [] + for graph_idx, graph in enumerate(config.graphs): + if graph == "SKIP": + continue + neo4j_graph_loaded = False + tigergraph_loaded = False + conn = None + for clusterer_idx, clusterer in enumerate(config.clusterers): + if clusterer == "SKIP": + continue + try: + if clusterer.startswith("Snap"): + if not os.path.exists(config.output_directory): + os.makedirs(config.output_directory) + for i in range(config.num_rounds): + runtime_dict = {} + runtime_dict["Clusterer Name"] = clusterer + runtime_dict["Input Graph"] = graph + runtime_dict["Threads"] = 1 + runtime_dict["Config"] = "" + runtime_dict["Round"] = i + runSnap(clusterer, graph, graph_idx, i, runtime_dict, config) + runtimes.append(runtime_dict) + continue + for thread_idx, thread in enumerate(config.num_threads): + configs = ( + config.clusterer_configs[clusterer_idx] + if config.clusterer_configs[clusterer_idx] is not None + else [""] + ) + config_prefix = ( + config.clusterer_config_names[clusterer_idx] + "{" + if config.clusterer_configs[clusterer_idx] is not None + else "" + ) + config_postfix = ( + "}" + if config.clusterer_configs[clusterer_idx] is not None + else "" + ) + for config_idx, config_item in enumerate(configs): + for i in range(config.num_rounds): + runtime_dict = {} + runtime_dict["Clusterer Name"] = clusterer + runtime_dict["Input Graph"] = graph + runtime_dict["Threads"] = thread + runtime_dict["Config"] = config_item + runtime_dict["Round"] = i + out_prefix = ( + config.output_directory + + clusterer + + "_" + + str(graph_idx) + + "_" + + thread + + "_" + + str(config_idx) + + "_" + + str(i) + ) + if not os.path.exists(config.output_directory): + os.makedirs(config.output_directory) + if clusterer.startswith("NetworKit"): + import cluster_nk + + cluster_nk.runNetworKit( + clusterer, + graph, + thread, + config_item, + out_prefix, + runtime_dict, + config, + ) + elif clusterer == "Tectonic": + runTectonic( + clusterer, + graph, + thread, + config_item, + out_prefix, + runtime_dict, + config, + system_config, + ) + elif clusterer.startswith("Neo4j"): + if int(thread) > 4: + print("neo4j only run up to 4 threads") + continue + if (not neo4j_graph_loaded) and ( + config.postprocess_only != "true" + ): + use_input_graph = config.input_directory + graph + import cluster_neo4j + + cluster_neo4j.projectGraph(graph, use_input_graph) + neo4j_graph_loaded = True + weighted = config.weighted == "true" + runNeo4j( + clusterer, + graph, + thread, + config_item + ", num_rounds: " + str(i), + weighted, + out_prefix, + runtime_dict, + config, + ) + elif clusterer.startswith("TigerGraph"): + weighted = config.weighted == "true" + if (not tigergraph_loaded) and ( + config.postprocess_only != "true" + ): + from pyTigerGraph import TigerGraphConnection + import cluster_tg + + conn = TigerGraphConnection( + host="http://127.0.0.1", + username="tigergraph", + password="tigergraph", + ) + print("connected") + cluster_tg.remove_tigergraph(conn) + cluster_tg.load_tigergraph( + conn, + graph, + config.input_directory, + config.output_directory, + config.tigergraph_nodes, + config.tigergraph_edges, + weighted, + ) + tigergraph_loaded = True + run_tigergraph( + conn, + clusterer, + graph, + thread, + config_item, + weighted, + out_prefix, + runtime_dict, + config, + ) + else: + out_filename = out_prefix + ".out" + out_clustering = out_prefix + ".cluster" + use_thread = ( + "" + if (thread == "" or thread == "ALL") + else "PARLAY_NUM_THREADS=" + thread + ) + use_input_graph = config.input_directory + graph + if config.gbbs_format == "true" and "ungraph" in graph: + print( + "warning: use gbbs format is true, but seems like snap format is used from graph file name" + ) + ss = ( + use_thread + + " " + + config.timeout + + " bazel run //clusterers:cluster-in-memory_main -- --" + "input_graph=" + + use_input_graph + + " --is_gbbs_format=" + + config.gbbs_format + + " --float_weighted=" + + config.weighted + + " --clusterer_name=" + + clusterer + + " " + "--clusterer_config='" + + config_prefix + + config_item + + config_postfix + + "' " + "--output_clustering=" + out_clustering + ) + if config.postprocess_only.lower() != "true": + print(ss) + out = runner_utils.shellGetOutput(ss) + runner_utils.appendToFile(ss + "\n", out_filename) + runner_utils.appendToFile(out, out_filename) + print("postprocessing... " + out_filename) + with open(out_filename, "r") as f: + run_info = f.readlines() + for elem in run_info[1:]: + if elem.startswith("Cluster Time:"): + runtime_dict["Cluster Time"] = elem.split( + " " + )[-1].strip() + runtimes.append(runtime_dict) + except Exception as e: + # Print the stack trace + traceback.print_exc() + if neo4j_graph_loaded: + import cluster_neo4j + cluster_neo4j.clearDB(graph) + if tigergraph_loaded: + import cluster_tg + + cluster_tg.remove_tigergraph(conn) + + runtime_dataframe = None + runtime_dataframe = pd.DataFrame(runtimes) + if not os.path.exists(config.csv_output_directory): + os.makedirs(config.csv_output_directory) + runtime_dataframe.to_csv( + config.csv_output_directory + "/runtimes.csv", + mode="a", + columns=[ + "Clusterer Name", + "Input Graph", + "Threads", + "Config", + "Round", + #"Cluster Time", + ], + ) def main(): - args = sys.argv[1:] - runAll(args[0]) + parser = argparse.ArgumentParser(description="Run clustering algorithms on graphs") + parser.add_argument("config_file", help="Path to the cluster configuration file") + parser.add_argument( + "--system-config", + help="Path to the system configuration file (required for Tectonic)", + ) + + args = parser.parse_args() + + # Read configurations + config = runner_utils.readConfig(args.config_file) + system_config = None + if args.system_config: + system_config = runner_utils.readSystemConfig(args.system_config) + + runAll(config, system_config) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/cluster_nk.py b/cluster_nk.py index 8585a9e..cf09018 100644 --- a/cluster_nk.py +++ b/cluster_nk.py @@ -6,6 +6,7 @@ import os import sys + def capture_output(func, *args, **kwargs): # Backup the original stdout original_stdout_fd = os.dup(sys.stdout.fileno()) @@ -24,270 +25,321 @@ def capture_output(func, *args, **kwargs): os.close(original_stdout_fd) # Read the captured output from the pipe - with os.fdopen(read_fd, 'r') as pipe: + with os.fdopen(read_fd, "r") as pipe: captured = pipe.read() return captured, result + # Parallel Louvain def runNetworKitPLM(G, config): - use_refine = False - use_gamma = 1.0 - use_par = "balanced" - use_maxIter = 32 - use_turbo = True - use_recurse = True - split = [x.strip() for x in config.split(',')] - for config_item in split: - config_split = [x.strip() for x in config_item.split(':')] - if config_split: - if config_split[0].startswith("refine"): - use_refine = True if config_split[1].lower().startswith("true") else False - elif config_split[0].startswith("gamma"): - use_gamma = float(config_split[1]) - elif config_split[0].startswith("par"): - use_par = config_split[1] - elif config_split[0].startswith("maxIter"): - use_maxIter = int(config_split[1]) - elif config_split[0].startswith("turbo"): - use_turbo = True if config_split[1].lower().startswith("true") else False - elif config_split[0].startswith("recurse"): - use_recurse = True if config_split[1].lower().startswith("true") else False - f = io.StringIO() - with redirect_stdout(f): - print(config) - start_time = time.time() - #Communities detected in 0.76547 [s] - communities = nk.community.detectCommunities(G, algo=nk.community.PLM(G, refine=use_refine, gamma=use_gamma, par=use_par, maxIter=use_maxIter, turbo=use_turbo, recurse=use_recurse)) - end_time = time.time() - print("Communities detected in %f \n" % (end_time - start_time)) - out = f.getvalue() - # str(end_time - start_time) - return out, communities + use_refine = False + use_gamma = 1.0 + use_par = "balanced" + use_maxIter = 32 + use_turbo = True + use_recurse = True + split = [x.strip() for x in config.split(",")] + for config_item in split: + config_split = [x.strip() for x in config_item.split(":")] + if config_split: + if config_split[0].startswith("refine"): + use_refine = ( + True if config_split[1].lower().startswith("true") else False + ) + elif config_split[0].startswith("gamma"): + use_gamma = float(config_split[1]) + elif config_split[0].startswith("par"): + use_par = config_split[1] + elif config_split[0].startswith("maxIter"): + use_maxIter = int(config_split[1]) + elif config_split[0].startswith("turbo"): + use_turbo = ( + True if config_split[1].lower().startswith("true") else False + ) + elif config_split[0].startswith("recurse"): + use_recurse = ( + True if config_split[1].lower().startswith("true") else False + ) + f = io.StringIO() + with redirect_stdout(f): + print(config) + start_time = time.time() + # Communities detected in 0.76547 [s] + communities = nk.community.detectCommunities( + G, + algo=nk.community.PLM( + G, + refine=use_refine, + gamma=use_gamma, + par=use_par, + maxIter=use_maxIter, + turbo=use_turbo, + recurse=use_recurse, + ), + ) + end_time = time.time() + print("Communities detected in %f \n" % (end_time - start_time)) + out = f.getvalue() + # str(end_time - start_time) + return out, communities + def runNetworKitPLP(G, config): - use_updateThreshold = None - use_maxIterations = None - split = [x.strip() for x in config.split(',')] - for config_item in split: - config_split = [x.strip() for x in config_item.split(':')] - if config_split: - if config_split[0].startswith("updateThreshold"): - if (config_split[1]!="None"): - use_updateThreshold = int(config_split[1]) - elif config_split[0].startswith("maxIterations"): - use_maxIterations = int(config_split[1]) - kwargs = {} - if use_updateThreshold: - kwargs["updateThreshold"] = use_updateThreshold - if use_maxIterations: - kwargs["maxIterations"] = use_maxIterations - print("running NetworKitPLP...") - start_time = time.time() - out, communities = capture_output(nk.community.detectCommunities, G, algo=nk.community.PLP(G, baseClustering=None, **kwargs)) - end_time = time.time() - out += "\nCommunities detected in %f \n" % (end_time - start_time) - return out, communities + use_updateThreshold = None + use_maxIterations = None + split = [x.strip() for x in config.split(",")] + for config_item in split: + config_split = [x.strip() for x in config_item.split(":")] + if config_split: + if config_split[0].startswith("updateThreshold"): + if config_split[1] != "None": + use_updateThreshold = int(config_split[1]) + elif config_split[0].startswith("maxIterations"): + use_maxIterations = int(config_split[1]) + kwargs = {} + if use_updateThreshold: + kwargs["updateThreshold"] = use_updateThreshold + if use_maxIterations: + kwargs["maxIterations"] = use_maxIterations + print("running NetworKitPLP...") + start_time = time.time() + out, communities = capture_output( + nk.community.detectCommunities, + G, + algo=nk.community.PLP(G, baseClustering=None, **kwargs), + ) + end_time = time.time() + out += "\nCommunities detected in %f \n" % (end_time - start_time) + return out, communities def runNetworKitLPDegreeOrdered(G, config): - print("running NetworKitLPDegreeOrdered...") - start_time = time.time() - out, communities = capture_output(nk.community.detectCommunities, G, algo=nk.community.LPDegreeOrdered(G)) - end_time = time.time() - out += "\nCommunities detected in %f \n" % (end_time - start_time) - return out, communities + print("running NetworKitLPDegreeOrdered...") + start_time = time.time() + out, communities = capture_output( + nk.community.detectCommunities, G, algo=nk.community.LPDegreeOrdered(G) + ) + end_time = time.time() + out += "\nCommunities detected in %f \n" % (end_time - start_time) + return out, communities def runNetworKitParallelLeiden(G, config): - use_randomize = True - use_gamma = 1.0 - use_iterations = 3 - split = [x.strip() for x in config.split(',')] - for config_item in split: - config_split = [x.strip() for x in config_item.split(':')] - if config_split: - if config_split[0].startswith("randomize"): - use_randomize = True if config_split[1].lower().startswith("true") else False - elif config_split[0].startswith("iterations"): - use_iterations = int(config_split[1]) - elif config_split[0].startswith("gamma"): - use_gamma = float(config_split[1]) - # print(use_randomize, use_iterations, use_gamma) - f = io.StringIO() - with redirect_stdout(f): - print(config) - start_time = time.time() - communities = nk.community.detectCommunities(G, algo=nk.community.ParallelLeiden(G, randomize=use_randomize, iterations=use_iterations, gamma=use_gamma)) - end_time = time.time() - print("Communities detected in %f \n" % (end_time - start_time)) - out = f.getvalue() - return out, communities + use_randomize = True + use_gamma = 1.0 + use_iterations = 3 + split = [x.strip() for x in config.split(",")] + for config_item in split: + config_split = [x.strip() for x in config_item.split(":")] + if config_split: + if config_split[0].startswith("randomize"): + use_randomize = ( + True if config_split[1].lower().startswith("true") else False + ) + elif config_split[0].startswith("iterations"): + use_iterations = int(config_split[1]) + elif config_split[0].startswith("gamma"): + use_gamma = float(config_split[1]) + # print(use_randomize, use_iterations, use_gamma) + f = io.StringIO() + with redirect_stdout(f): + print(config) + start_time = time.time() + communities = nk.community.detectCommunities( + G, + algo=nk.community.ParallelLeiden( + G, randomize=use_randomize, iterations=use_iterations, gamma=use_gamma + ), + ) + end_time = time.time() + print("Communities detected in %f \n" % (end_time - start_time)) + out = f.getvalue() + return out, communities + def runNetworKitConnectivity(G, config): - f = io.StringIO() - if (G.isDirected()): - raise ValueError("NetworkIt Connected Components can only run for undirected graphs.") - with redirect_stdout(f): - print(config) - start_time = time.time() - # returns type List[List[int]], each nested list is a cluster, i.e. conencted component - cc = nk.components.ParallelConnectedComponents(G, False) - cc.run() - clusters = cc.getComponents() - end_time = time.time() - print("Communities detected in %f \n" % (end_time - start_time)) - out = f.getvalue() - return out, clusters + f = io.StringIO() + if G.isDirected(): + raise ValueError( + "NetworkIt Connected Components can only run for undirected graphs." + ) + with redirect_stdout(f): + print(config) + start_time = time.time() + # returns type List[List[int]], each nested list is a cluster, i.e. conencted component + cc = nk.components.ParallelConnectedComponents(G, False) + cc.run() + clusters = cc.getComponents() + end_time = time.time() + print("Communities detected in %f \n" % (end_time - start_time)) + out = f.getvalue() + return out, clusters def runNetworKitKCore(G, config): - # The graph may not contain self-loops. - f = io.StringIO() - k = 0 - split = [x.strip() for x in config.split(',')] - for config_item in split: - config_split = [x.strip() for x in config_item.split(':')] - if config_split: - if config_split[0] == "threshold": - k = int(config_split[1]) - if k == 0: - raise RuntimeError("k must be set.") - run_connectivity = True - clusters = [] - with redirect_stdout(f): - start_time = time.time() - coreDec = nk.centrality.CoreDecomposition(G) - coreDec.run() - if run_connectivity: - cores = coreDec.scores() - kCore = [] - other_nodes = [] - try: - for index, score in enumerate(cores): - if score >= k: - kCore.append(index) - else: - other_nodes.append(index) - except IndexError: - raise RuntimeError("There is no core for the specified k") + # The graph may not contain self-loops. + f = io.StringIO() + k = 0 + split = [x.strip() for x in config.split(",")] + for config_item in split: + config_split = [x.strip() for x in config_item.split(":")] + if config_split: + if config_split[0] == "threshold": + k = int(config_split[1]) + if k == 0: + raise RuntimeError("k must be set.") + run_connectivity = True + clusters = [] + with redirect_stdout(f): + start_time = time.time() + coreDec = nk.centrality.CoreDecomposition(G) + coreDec.run() + if run_connectivity: + cores = coreDec.scores() + kCore = [] + other_nodes = [] + try: + for index, score in enumerate(cores): + if score >= k: + kCore.append(index) + else: + other_nodes.append(index) + except IndexError: + raise RuntimeError("There is no core for the specified k") - C = nk.graphtools.subgraphFromNodes(G, kCore) - cc = nk.components.ParallelConnectedComponents(C, False) - cc.run() - clusters = cc.getComponents() - # nodes that are not in the core are in their separate connected component. - for i in other_nodes: - clusters.append([i]) - end_time = time.time() - print("Communities detected in %f \n" % (end_time - start_time)) - out = f.getvalue() - return out, clusters + C = nk.graphtools.subgraphFromNodes(G, kCore) + cc = nk.components.ParallelConnectedComponents(C, False) + cc.run() + clusters = cc.getComponents() + # nodes that are not in the core are in their separate connected component. + for i in other_nodes: + clusters.append([i]) + end_time = time.time() + print("Communities detected in %f \n" % (end_time - start_time)) + out = f.getvalue() + return out, clusters def extractNetworKitTime(out): - split = [x.strip() for x in out.split('\n')] - for line in split: - if line.startswith("Communities detected in"): - line_split = [x.strip() for x in line.split(' ')] - return line_split[3] - return "" + split = [x.strip() for x in out.split("\n")] + for line in split: + if line.startswith("Communities detected in"): + line_split = [x.strip() for x in line.split(" ")] + return line_split[3] + return "" + def is_bin_extension(filename): - return os.path.splitext(filename)[1].lower() == '.bin' + return os.path.splitext(filename)[1].lower() == ".bin" -def runNetworKit(clusterer, graph, thread, config, out_prefix, runtime_dict): - if (runner_utils.gbbs_format == "true"): - raise ValueError("NetworKit can only be run using edge list format") - out_filename = out_prefix + ".out" - out_clustering = out_prefix + ".cluster" - use_input_graph = runner_utils.input_directory + graph - # if(not (use_input_graph.endswith("ungraph.txt") or use_input_graph.endswith("ngraph.txt"))): - # raise ValueError("input graph file name must ends with ungraph.txt or ngraph.txt") - # G = nk.readGraph(use_input_graph, nk.Format.EdgeListTabZero) - if runner_utils.postprocess_only != "true": - start_time = time.time() - G = None - if(is_bin_extension(use_input_graph)): - G = nk.readGraph(use_input_graph, nk.Format.NetworkitBinary) - else: - reader = nk.graphio.EdgeListReader('\t', 0, commentPrefix='#', directed=False) #continuous=False, - G = reader.read(use_input_graph) - end_time = time.time() - print("Read Graph in %f \n" % (end_time - start_time)) - # print([edge for edge in G.iterEdgesWeights()]) - if (thread != "" and thread != "ALL"): - nk.setNumberOfThreads(int(thread)) - # This is k-core with a thresholding argument (double-check) - #nk.community.kCoreCommunityDetection(G, k, algo=None, inspect=False) - cluster_flag = False - if (clusterer == "NetworKitPLM"): - print_time, communities = runNetworKitPLM(G, config) - elif (clusterer == "NetworKitPLP"): - print_time, communities = runNetworKitPLP(G, config) - elif (clusterer == "NetworKitLPDegreeOrdered"): - print_time, communities = runNetworKitLPDegreeOrdered(G, config) - elif (clusterer == "NetworKitParallelLeiden"): - print_time, communities = runNetworKitParallelLeiden(G, config) - elif (clusterer == "NetworKitConnectivity"): - cluster_flag = True - print_time, clusters = runNetworKitConnectivity(G, config) - elif (clusterer == "NetworKitKCore"): - cluster_flag = True - print_time, clusters = runNetworKitKCore(G, config) - else: - raise ValueError("NetworKit clusterer not supported") - runner_utils.appendToFile('NetworKit: \n', out_filename) - runner_utils.appendToFile('Graph: ' + graph + '\n', out_filename) - runner_utils.appendToFile('Clusterer: ' + clusterer + '\n', out_filename) - runner_utils.appendToFile('Threads: ' + thread + '\n', out_filename) - runner_utils.appendToFile('Config: ' + config + '\n', out_filename) - runner_utils.appendToFile(print_time, out_filename) - runner_utils.appendToFile("Cluster Time: " + extractNetworKitTime(print_time) + "\n", out_filename) - # Create an empty list to hold all the lines you want to write to the file - if runner_utils.write_clustering != "false": - print("writing results...") - start_time = time.time() - lines_to_write = [] +def runNetworKit( + clusterer, + graph, + thread, + config, + out_prefix, + runtime_dict, + cluster_config: "runner_utils.ClusterConfig", +): + if cluster_config.gbbs_format == "true": + raise ValueError("NetworKit can only be run using edge list format") + out_filename = out_prefix + ".out" + out_clustering = out_prefix + ".cluster" + use_input_graph = cluster_config.input_directory + graph + # if(not (use_input_graph.endswith("ungraph.txt") or use_input_graph.endswith("ngraph.txt"))): + # raise ValueError("input graph file name must ends with ungraph.txt or ngraph.txt") + # G = nk.readGraph(use_input_graph, nk.Format.EdgeListTabZero) + if cluster_config.postprocess_only != "true": + start_time = time.time() + G = None + if is_bin_extension(use_input_graph): + G = nk.readGraph(use_input_graph, nk.Format.NetworkitBinary) + else: + reader = nk.graphio.EdgeListReader( + "\t", 0, commentPrefix="#", directed=False + ) # continuous=False, + G = reader.read(use_input_graph) + end_time = time.time() + print("Read Graph in %f \n" % (end_time - start_time)) + # print([edge for edge in G.iterEdgesWeights()]) + if thread != "" and thread != "ALL": + nk.setNumberOfThreads(int(thread)) + # This is k-core with a thresholding argument (double-check) + # nk.community.kCoreCommunityDetection(G, k, algo=None, inspect=False) + cluster_flag = False + if clusterer == "NetworKitPLM": + print_time, communities = runNetworKitPLM(G, config) + elif clusterer == "NetworKitPLP": + print_time, communities = runNetworKitPLP(G, config) + elif clusterer == "NetworKitLPDegreeOrdered": + print_time, communities = runNetworKitLPDegreeOrdered(G, config) + elif clusterer == "NetworKitParallelLeiden": + print_time, communities = runNetworKitParallelLeiden(G, config) + elif clusterer == "NetworKitConnectivity": + cluster_flag = True + print_time, clusters = runNetworKitConnectivity(G, config) + elif clusterer == "NetworKitKCore": + cluster_flag = True + print_time, clusters = runNetworKitKCore(G, config) + else: + raise ValueError("NetworKit clusterer not supported") + runner_utils.appendToFile("NetworKit: \n", out_filename) + runner_utils.appendToFile("Graph: " + graph + "\n", out_filename) + runner_utils.appendToFile("Clusterer: " + clusterer + "\n", out_filename) + runner_utils.appendToFile("Threads: " + thread + "\n", out_filename) + runner_utils.appendToFile("Config: " + config + "\n", out_filename) + runner_utils.appendToFile(print_time, out_filename) + runner_utils.appendToFile( + "Cluster Time: " + extractNetworKitTime(print_time) + "\n", out_filename + ) + + # Create an empty list to hold all the lines you want to write to the file + if cluster_config.write_clustering != "false": + print("writing results...") + start_time = time.time() + lines_to_write = [] + + if not cluster_flag: + use_original_networkit = True + if use_original_networkit: + communities.compact() # Change subset IDs to be consecutive, starting at 0. + num_clusters = communities.numberOfSubsets() + # cluster_index = 0 + for cluster_index in range(num_clusters): + cluster_list = communities.getMembers(cluster_index) + lines_to_write.append("\t".join(str(x) for x in cluster_list)) + # if cluster_index % 500 == 0: + # progress_percentage = (cluster_index + 1) * 100.0 / num_clusters + # print(f"Processing: {progress_percentage:.2f}% done") + cluster_index += 1 + # Write all lines to the file at once + # with open(out_clustering, 'a+') as file: + # file.write('\n'.join(lines_to_write) + '\n') + else: + # cluster_lists = communities.getSubsets() + # for cluster_list in cluster_lists: + # lines_to_write.append("\t".join(str(x) for x in cluster_list)) + nk.community.writeCommunitiesNestedFormat( + communities, out_clustering + ) + else: + for cluster_list in clusters: + lines_to_write.append("\t".join(str(x) for x in cluster_list)) - if not cluster_flag: - use_original_networkit = True - if use_original_networkit: - communities.compact() # Change subset IDs to be consecutive, starting at 0. - num_clusters = communities.numberOfSubsets() - # cluster_index = 0 - for cluster_index in range(num_clusters): - cluster_list = communities.getMembers(cluster_index) - lines_to_write.append("\t".join(str(x) for x in cluster_list)) - # if cluster_index % 500 == 0: - # progress_percentage = (cluster_index + 1) * 100.0 / num_clusters - # print(f"Processing: {progress_percentage:.2f}% done") - cluster_index += 1 # Write all lines to the file at once - # with open(out_clustering, 'a+') as file: - # file.write('\n'.join(lines_to_write) + '\n') - else: - # cluster_lists = communities.getSubsets() - # for cluster_list in cluster_lists: - # lines_to_write.append("\t".join(str(x) for x in cluster_list)) - nk.community.writeCommunitiesNestedFormat(communities, out_clustering) - else: - for cluster_list in clusters: - lines_to_write.append("\t".join(str(x) for x in cluster_list)) + with open(out_clustering, "a+") as file: + file.write("\n".join(lines_to_write) + "\n") + end_time = time.time() + print("Wrote result in %f \n" % (end_time - start_time)) - # Write all lines to the file at once - with open(out_clustering, 'a+') as file: - file.write('\n'.join(lines_to_write) + '\n') - end_time = time.time() - print("Wrote result in %f \n" % (end_time - start_time)) - - print("postprocessing..." + out_filename) - try: - with open(out_filename,'r') as f: - run_info = f.readlines() - for elem in run_info[1:]: - if elem.startswith('Cluster Time:'): - runtime_dict['Cluster Time'] = elem.split(' ')[-1].strip() - except Exception as e: - print(f"An error occurred: {e}") \ No newline at end of file + print("postprocessing..." + out_filename) + try: + with open(out_filename, "r") as f: + run_info = f.readlines() + for elem in run_info[1:]: + if elem.startswith("Cluster Time:"): + runtime_dict["Cluster Time"] = elem.split(" ")[-1].strip() + except Exception as e: + print(f"An error occurred: {e}") diff --git a/clusterers/stats/stats_utils.h b/clusterers/stats/stats_utils.h index 9cd7d83..5f80aa0 100644 --- a/clusterers/stats/stats_utils.h +++ b/clusterers/stats/stats_utils.h @@ -97,12 +97,11 @@ inline gbbs::symmetric_graph get_subgraph(const Gbb auto subgraph_edges = parlay::pack(edges, flags); + auto num = n; if(keep_ids){ - return gbbs::sym_graph_from_edges(subgraph_edges, graph_.Graph()->n); - }else{ - return gbbs::sym_graph_from_edges(subgraph_edges, n); + num = graph_.Graph()->n; } - + return gbbs::symmetric_graph::from_edges(subgraph_edges, num); } // return the number of edges in a subgraph that has the vertices in V diff --git a/runner_utils.py b/runner_utils.py index 9ffc1f5..f4771b0 100644 --- a/runner_utils.py +++ b/runner_utils.py @@ -1,202 +1,317 @@ -import os import sys import signal -import time import subprocess -import re import itertools +from dataclasses import dataclass +from typing import List, Optional -def signal_handler(signal,frame): - print("bye\n") - sys.exit(0) -signal.signal(signal.SIGINT,signal_handler) -def shellGetOutput(str1) : - process = subprocess.Popen(str1,shell=True,stdout=subprocess.PIPE, - stderr=subprocess.PIPE, universal_newlines=True) - output, err = process.communicate() - - if (len(err) > 0): - print(str1+"\n"+output+err) - return output +def signal_handler(signal, frame): + print("bye\n") + sys.exit(0) + + +signal.signal(signal.SIGINT, signal_handler) + + +@dataclass +class SystemConfig: + """Configuration for system tools""" + + gplusplus_ver: str = "" + python_ver: str = "" + + +@dataclass +class ClusterConfig: + """Configuration for clustering runs""" + + input_directory: str = "" + output_directory: str = "" + csv_output_directory: str = "" + clusterers: List[str] = None + graphs: List[str] = None + num_threads: List[str] = None + clusterer_configs: List[List[str]] = None + clusterer_config_names: List[str] = None + num_rounds: int = 1 + timeout: str = "" + gbbs_format: str = "false" + weighted: str = "false" + tigergraph_edges: Optional[str] = None + tigergraph_nodes: Optional[str] = None + postprocess_only: str = "false" + write_clustering: str = "true" + + def __post_init__(self): + if self.clusterers is None: + self.clusterers = [] + if self.graphs is None: + self.graphs = [] + if self.num_threads is None: + self.num_threads = ["ALL"] + if self.clusterer_configs is None: + self.clusterer_configs = [] + if self.clusterer_config_names is None: + self.clusterer_config_names = [] + + +@dataclass +class StatsConfig: + """Configuration for statistics computation""" + + communities: List[str] = None + stats_config: str = "" + deterministic: str = "false" + + def __post_init__(self): + if self.communities is None: + self.communities = [] + + +@dataclass +class GraphConfig: + """Configuration for graph plotting""" + + x_axis: List[str] = None + x_axis_index: List[int] = None + x_axis_modifier: List[str] = None + y_axis: List[str] = None + y_axis_index: List[int] = None + y_axis_modifier: List[str] = None + legend: List[str] = None + output_graph_filename: List[str] = None + + def __post_init__(self): + for attr in [ + "x_axis", + "x_axis_index", + "x_axis_modifier", + "y_axis", + "y_axis_index", + "y_axis_modifier", + "legend", + "output_graph_filename", + ]: + if getattr(self, attr) is None: + setattr(self, attr, []) + + +def shellGetOutput(str1): + process = subprocess.Popen( + str1, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + ) + output, err = process.communicate() + + if len(err) > 0: + print(str1 + "\n" + output + err) + return output + def appendToFile(out, filename): - with open(filename, "a+") as out_file: - out_file.writelines(out) + with open(filename, "a+") as out_file: + out_file.writelines(out) + def makeConfigCombos(current_configs): - config_combos = itertools.product(*current_configs) - config_combos_formatted = [] - for config in config_combos: - config_combos_formatted.append(",".join(config)) - return config_combos_formatted + config_combos = itertools.product(*current_configs) + config_combos_formatted = [] + for config in config_combos: + config_combos_formatted.append(",".join(config)) + return config_combos_formatted + def makeConfigCombosModularity(current_configs): - config_combos = itertools.product(*current_configs) - config_combos_formatted = [] - for config in config_combos: - config_txt = "" - other_configs = [] - for config_item in config: - if (config_item.startswith("resolution")): - config_txt += config_item - else: - other_configs.append(config_item) - config_txt += ", correlation_config: {" - config_txt += ",".join(other_configs) - config_txt += "}" - config_combos_formatted.append(config_txt) - # for i in config_combos_formatted: - # print(i) - # exit(1) - return config_combos_formatted - -def readSystemConfig(filename): - global gplusplus_ver, python_ver - with open(filename, "r") as in_file: - for line in in_file: - line = line.strip() - split = [x.strip() for x in line.split(':')] - if split: - if split[0].startswith("g++"): - gplusplus_ver = split[1] - elif split[0].startswith("Python"): - python_ver = split[1] - -def readConfig(filename): - global input_directory, output_directory, csv_output_directory, clusterers, graphs, num_threads - global clusterer_configs, num_rounds, timeout, clusterer_config_names - global gbbs_format - global weighted - global tigergraph_edges, tigergraph_nodes - global postprocess_only, write_clustering - num_threads = num_rounds = timeout = gbbs_format = weighted = tigergraph_edges = tigergraph_nodes = None - postprocess_only = "false" - write_clustering = "true" - clusterers = [] - with open(filename, "r") as in_file: - for line in in_file: - line = line.strip() - split = [x.strip() for x in line.split(':')] - if split: - if split[0].startswith("Input directory"): - input_directory = split[1] - elif split[0].startswith("Output directory"): - output_directory = split[1] - elif split[0].startswith("CSV output directory"): - csv_output_directory = split[1] - elif split[0].startswith("Clusterers"): - clusterers = [x.strip() for x in split[1].split(';')] - clusterer_configs = len(clusterers)*[None] - clusterer_config_names = len(clusterers)*[None] - elif split[0].startswith("Graphs"): - graphs = [x.strip() for x in split[1].split(';')] - elif split[0].startswith("Number of threads") and len(split) > 1: - num_threads = [x.strip() for x in split[1].split(';')] - elif split[0].startswith("Number of rounds") and len(split) > 1: - num_rounds = 1 if split[1] == "" else int(split[1]) - elif split[0].startswith("Timeout") and len(split) > 1: - timeout = split[1] - elif split[0].startswith("GBBS format") and len(split) > 1: - gbbs_format = split[1] - elif split[0].startswith("TigerGraph files") and len(split) > 1: - tigergraph_files = [x.strip() for x in split[1].split(';')] - tigergraph_edges = tigergraph_files[0] - tigergraph_nodes = tigergraph_files[1] - elif split[0].startswith("TigerGraph nodes") and len(split) > 1: - tigergraph_nodes = [x.strip() for x in split[1].split(';')] - elif split[0].startswith("TigerGraph edges") and len(split) > 1: - tigergraph_edges = [x.strip() for x in split[1].split(';')] - elif split[0].startswith("Wighted") and len(split) > 1: - weighted = split[1] - elif split[0].startswith("Postprocess only"): - postprocess_only = split[1] - elif split[0].startswith("Write clustering"): - write_clustering = split[1] - else: - for index, clusterer_name in enumerate(clusterers): - if split[0] == clusterer_name: - clusterer_config_names[index] = in_file.readline().strip() - current_configs = [] - next_line = in_file.readline().strip() - while next_line != "": - arg_name = next_line.split(':', 1) - arg_name[0] = arg_name[0].strip() - args = [x.strip() for x in arg_name[1].split(';')] - current_configs.append([arg_name[0] + ": " + x for x in args]) - try: - next_line = in_file.readline().strip() - except StopIteration as err: - break - if (clusterer_name == "ParallelModularityClusterer"): - clusterer_configs[index] = makeConfigCombosModularity(current_configs) - else: - clusterer_configs[index] = makeConfigCombos(current_configs) - break - num_threads = ["ALL"] if num_threads is None or not num_threads else num_threads - timeout = "" if (timeout is None or timeout == "" or timeout == "NONE") else "timeout " + timeout - num_rounds = 1 if (num_rounds is None) else num_rounds - gbbs_format = "false" if (gbbs_format is None or gbbs_format == "") else gbbs_format - weighted = "false" if (weighted is None or weighted == "") else weighted - - -def readStatsConfig(filename): - global communities, stats_config, deterministic - communities = [] - stats_config_list = [] - stats_config = "" - deterministic = "false" - with open(filename, "r") as in_file: - for line in in_file: - line = line.strip() - split = [x.strip() for x in line.split(':')] - if split: - if split[0].startswith("Input communities") and len(split) > 1: - communities = [x.strip() for x in split[1].split(';')] - elif split[0].startswith("Deterministic") and len(split) > 1: - deterministic = split[1] - elif split[0].startswith("statistics_config"): - next_line = in_file.readline().strip() - while next_line != "": - stats_config_list.append(next_line) - try: - next_line = in_file.readline().strip() - except StopIteration as err: - break - stats_config = ",".join(stats_config_list) - -def readGraphConfig(filename): - global x_axis, x_axis_index, x_axis_modifier - global y_axis, y_axis_index, y_axis_modifier - global legend, output_graph_filename - x_axis = [] - x_axis_index = [] - x_axis_modifier = [] - y_axis = [] - y_axis_index = [] - y_axis_modifier = [] - legend = [] - output_graph_filename = [] - with open(filename, "r") as in_file: - for line in in_file: - line = line.strip() - split = [x.strip() for x in line.split(':')] - if split: - if split[0].startswith("x axis"): - split = [x.strip() for x in split[1].split(' ')] - x_axis.append(split[0]) - if len(split) > 1: - x_axis_modifier.append(split[1]) - else: - x_axis_modifier.append("") - elif split[0].startswith("y axis"): - split = [x.strip() for x in split[1].split(' ')] - y_axis.append(split[0]) - if len(split) > 1: - y_axis_modifier.append(split[1]) - else: - y_axis_modifier.append("") - elif split[0].startswith("Legend"): - legend.append(split[1]) - elif split[0].startswith("Graph filename"): - output_graph_filename.append(split[1]) \ No newline at end of file + config_combos = itertools.product(*current_configs) + config_combos_formatted = [] + for config in config_combos: + config_txt = "" + other_configs = [] + for config_item in config: + if config_item.startswith("resolution"): + config_txt += config_item + else: + other_configs.append(config_item) + config_txt += ", correlation_config: {" + config_txt += ",".join(other_configs) + config_txt += "}" + config_combos_formatted.append(config_txt) + # for i in config_combos_formatted: + # print(i) + # exit(1) + return config_combos_formatted + + +def readSystemConfig(filename: str) -> SystemConfig: + """Read system configuration from file""" + config = SystemConfig() + with open(filename, "r") as in_file: + for line in in_file: + line = line.strip() + split = [x.strip() for x in line.split(":")] + if split: + if split[0].startswith("g++"): + config.gplusplus_ver = split[1] + elif split[0].startswith("Python"): + config.python_ver = split[1] + return config + + +def readConfig(filename: str) -> ClusterConfig: + """Read cluster configuration from file""" + config = ClusterConfig() + + with open(filename, "r") as in_file: + for line in in_file: + line = line.strip() + split = [x.strip() for x in line.split(":")] + if split: + if split[0].startswith("Input directory"): + config.input_directory = split[1] + elif split[0].startswith("Output directory"): + config.output_directory = split[1] + elif split[0].startswith("CSV Output directory"): + config.csv_output_directory = split[1] + elif split[0].startswith("Clusterers"): + config.clusterers = [x.strip() for x in split[1].split(";")] + config.clusterer_configs = len(config.clusterers) * [None] + config.clusterer_config_names = len(config.clusterers) * [None] + elif split[0].startswith("Graphs"): + config.graphs = [x.strip() for x in split[1].split(";")] + elif split[0].startswith("Number of threads") and len(split) > 1: + config.num_threads = [x.strip() for x in split[1].split(";")] + elif split[0].startswith("Number of rounds") and len(split) > 1: + config.num_rounds = 1 if split[1] == "" else int(split[1]) + elif split[0].startswith("Timeout") and len(split) > 1: + config.timeout = split[1] + elif split[0].startswith("GBBS format") and len(split) > 1: + config.gbbs_format = split[1] + elif split[0].startswith("TigerGraph files") and len(split) > 1: + tigergraph_files = [x.strip() for x in split[1].split(";")] + config.tigergraph_edges = tigergraph_files[0] + config.tigergraph_nodes = tigergraph_files[1] + elif split[0].startswith("TigerGraph nodes") and len(split) > 1: + config.tigergraph_nodes = [x.strip() for x in split[1].split(";")] + elif split[0].startswith("TigerGraph edges") and len(split) > 1: + config.tigergraph_edges = [x.strip() for x in split[1].split(";")] + elif split[0].startswith("Wighted") and len(split) > 1: + config.weighted = split[1] + elif split[0].startswith("Postprocess only"): + config.postprocess_only = split[1] + elif split[0].startswith("Write clustering"): + config.write_clustering = split[1] + else: + for index, clusterer_name in enumerate(config.clusterers): + if split[0] == clusterer_name: + config.clusterer_config_names[index] = ( + in_file.readline().strip() + ) + current_configs = [] + next_line = in_file.readline().strip() + while next_line != "": + arg_name = next_line.split(":", 1) + arg_name[0] = arg_name[0].strip() + args = [x.strip() for x in arg_name[1].split(";")] + current_configs.append( + [arg_name[0] + ": " + x for x in args] + ) + try: + next_line = in_file.readline().strip() + except StopIteration as err: + break + if clusterer_name == "ParallelModularityClusterer": + config.clusterer_configs[index] = ( + makeConfigCombosModularity(current_configs) + ) + else: + config.clusterer_configs[index] = makeConfigCombos( + current_configs + ) + break + + # Set defaults + config.num_threads = ( + ["ALL"] + if config.num_threads is None or not config.num_threads + else config.num_threads + ) + config.timeout = ( + "" + if (config.timeout is None or config.timeout == "" or config.timeout == "NONE") + else "timeout " + config.timeout + ) + config.num_rounds = 1 if (config.num_rounds is None) else config.num_rounds + config.gbbs_format = ( + "false" + if (config.gbbs_format is None or config.gbbs_format == "") + else config.gbbs_format + ) + config.weighted = ( + "false" + if (config.weighted is None or config.weighted == "") + else config.weighted + ) + + return config + + +def readStatsConfig(filename: str) -> StatsConfig: + """Read stats configuration from file""" + config = StatsConfig() + stats_config_list = [] + + with open(filename, "r") as in_file: + for line in in_file: + line = line.strip() + split = [x.strip() for x in line.split(":")] + if split: + if split[0].startswith("Input communities") and len(split) > 1: + config.communities = [x.strip() for x in split[1].split(";")] + elif split[0].startswith("Deterministic") and len(split) > 1: + config.deterministic = split[1] + elif split[0].startswith("statistics_config"): + next_line = in_file.readline().strip() + while next_line != "": + stats_config_list.append(next_line) + try: + next_line = in_file.readline().strip() + except StopIteration as err: + break + config.stats_config = ",".join(stats_config_list) + + return config + + +def readGraphConfig(filename: str) -> GraphConfig: + """Read graph configuration from file""" + config = GraphConfig() + + with open(filename, "r") as in_file: + for line in in_file: + line = line.strip() + split = [x.strip() for x in line.split(":")] + if split: + if split[0].startswith("x axis"): + split = [x.strip() for x in split[1].split(" ")] + config.x_axis.append(split[0]) + if len(split) > 1: + config.x_axis_modifier.append(split[1]) + else: + config.x_axis_modifier.append("") + elif split[0].startswith("y axis"): + split = [x.strip() for x in split[1].split(" ")] + config.y_axis.append(split[0]) + if len(split) > 1: + config.y_axis_modifier.append(split[1]) + else: + config.y_axis_modifier.append("") + elif split[0].startswith("Legend"): + config.legend.append(split[1]) + elif split[0].startswith("Graph filename"): + config.output_graph_filename.append(split[1]) + + return config diff --git a/stats.py b/stats.py index a173614..4f7087f 100644 --- a/stats.py +++ b/stats.py @@ -1,144 +1,231 @@ import os -import sys -import signal -import time -import subprocess -import re -import itertools import runner_utils import json import pandas as pd +import argparse from stats_precision_recall_pair import compute_precision_recall_pair + def getRunTime(clusterer, out_prefix): - cluster_time = -1 - out_filename = out_prefix + ".out" - - if clusterer.startswith("TigerGraph"): - with open(out_filename,'r') as f: - run_info = f.readlines() - for elem in run_info[1:]: - if elem.startswith('Total Time:'): - cluster_time = elem.split(' ')[-1].strip() - elif clusterer.startswith("Snap"): - with open(out_filename,'r') as f: - run_info = f.readlines() - for elem in run_info[1:]: - if elem.startswith('Wealy Connected Component Time:') or elem.startswith('KCore Time:') or elem.startswith('Cluster Time:'): - cluster_time = elem.split(' ')[-1].strip() - elif clusterer.startswith("Neo4j"): - with open(out_filename,'r') as f: - run_info = f.readlines() - for elem in run_info[1:]: - if elem.startswith("Time:"): - cluster_time = elem.split(' ')[-1].strip() - else: # our cluseterer and Networkit and Tectonic - with open(out_filename,'r') as f: - run_info = f.readlines() - for elem in run_info[1:]: - if elem.startswith('Cluster Time:'): - cluster_time = elem.split(' ')[-1].strip() - - return cluster_time - -def runStats(out_prefix, graph, graph_idx, stats_dict): - out_statistics = out_prefix + ".stats" - out_statistics_pair = out_prefix + ".pair.stats" - in_clustering = out_prefix + ".cluster" - if not os.path.exists(in_clustering) or not os.path.getsize(in_clustering) > 0: - # Either an error or a timeout happened - runner_utils.appendToFile("ERROR", out_statistics) - return - use_input_graph = runner_utils.input_directory + graph - input_communities = runner_utils.input_directory + runner_utils.communities[graph_idx] - if "precision_recall_pair_thresholds" in runner_utils.stats_config: - compute_precision_recall_pair(in_clustering, input_communities, out_statistics_pair, runner_utils.stats_config, stats_dict) - return - use_input_communities = "" if not runner_utils.communities else "--input_communities=" + input_communities - ss = ("bazel run //clusterers:stats-in-memory_main -- " - "--input_graph=" + use_input_graph + " " - "--is_gbbs_format=" + runner_utils.gbbs_format + " " - "--float_weighted=" + runner_utils.weighted + " " - "--input_clustering=" + in_clustering + " " - "--output_statistics=" + out_statistics + " " + use_input_communities + " " - "--statistics_config='" + runner_utils.stats_config + "'") - if(runner_utils.postprocess_only == "false"): - print(ss) - out = runner_utils.shellGetOutput(ss) - - out_statistics_file = open(out_statistics, "r") - out_statistics_string = out_statistics_file.read() - out_statistics_file.close() - parse_out_statistics = json.loads(out_statistics_string) - for k in parse_out_statistics: - v = parse_out_statistics[k] - if type(v) != dict: - stats_dict[k] = v - else: - for elem2 in v: - stats_dict[k+'_'+elem2] = v[elem2] - - - -def runAll(config_filename, stats_config_filename): - runner_utils.readConfig(config_filename) - runner_utils.readStatsConfig(stats_config_filename) - stats = [] - for clusterer_idx, clusterer in enumerate(runner_utils.clusterers): - if clusterer == "SKIP": - continue - for graph_idx, graph in enumerate(runner_utils.graphs): - if graph == "SKIP": - continue - if clusterer.startswith("Snap"): - for i in range(runner_utils.num_rounds): - if runner_utils.deterministic == "true" and i != 0: - continue - out_prefix = runner_utils.output_directory + clusterer + "_" + str(graph_idx) + "_" + str(i) - stats_dict = {} - stats_dict['Clusterer Name'] = clusterer - stats_dict["Input Graph"] = graph - stats_dict["Threads"] = 1 - stats_dict["Config"] = None - stats_dict["Round"] = i - stats_dict["Cluster Time"] = getRunTime(clusterer, out_prefix) - runStats(out_prefix, graph, graph_idx, stats_dict) - stats_dict["Ground Truth"] = runner_utils.communities[graph_idx] - stats.append(stats_dict) - continue - for thread_idx, thread in enumerate(runner_utils.num_threads): - if runner_utils.deterministic == "true" and thread != runner_utils.num_threads[0]: + cluster_time = -1 + out_filename = out_prefix + ".out" + + if clusterer.startswith("TigerGraph"): + with open(out_filename, "r") as f: + run_info = f.readlines() + for elem in run_info[1:]: + if elem.startswith("Total Time:"): + cluster_time = elem.split(" ")[-1].strip() + elif clusterer.startswith("Snap"): + with open(out_filename, "r") as f: + run_info = f.readlines() + for elem in run_info[1:]: + if ( + elem.startswith("Wealy Connected Component Time:") + or elem.startswith("KCore Time:") + or elem.startswith("Cluster Time:") + ): + cluster_time = elem.split(" ")[-1].strip() + elif clusterer.startswith("Neo4j"): + with open(out_filename, "r") as f: + run_info = f.readlines() + for elem in run_info[1:]: + if elem.startswith("Time:"): + cluster_time = elem.split(" ")[-1].strip() + else: # our cluseterer and Networkit and Tectonic + with open(out_filename, "r") as f: + run_info = f.readlines() + for elem in run_info[1:]: + if elem.startswith("Cluster Time:"): + cluster_time = elem.split(" ")[-1].strip() + + return cluster_time + + +def runStats( + out_prefix, + graph, + graph_idx, + stats_dict, + cluster_config: runner_utils.ClusterConfig, + stats_config: runner_utils.StatsConfig, +): + out_statistics = out_prefix + ".stats" + out_statistics_pair = out_prefix + ".pair.stats" + in_clustering = out_prefix + ".cluster" + if not os.path.exists(in_clustering) or not os.path.getsize(in_clustering) > 0: + # Either an error or a timeout happened + runner_utils.appendToFile("ERROR", out_statistics) + return + use_input_graph = cluster_config.input_directory + graph + input_communities = ( + cluster_config.input_directory + stats_config.communities[graph_idx] + ) + if "precision_recall_pair_thresholds" in stats_config.stats_config: + compute_precision_recall_pair( + in_clustering, + input_communities, + out_statistics_pair, + stats_config.stats_config, + stats_dict, + ) + return + use_input_communities = ( + "" + if not stats_config.communities + else "--input_communities=" + input_communities + ) + ss = ( + "bazel run //clusterers:stats-in-memory_main -- " + "--input_graph=" + use_input_graph + " " + "--is_gbbs_format=" + cluster_config.gbbs_format + " " + "--float_weighted=" + cluster_config.weighted + " " + "--input_clustering=" + in_clustering + " " + "--output_statistics=" + out_statistics + " " + use_input_communities + " " + "--statistics_config='" + stats_config.stats_config + "'" + ) + if cluster_config.postprocess_only == "false": + print(ss) + out = runner_utils.shellGetOutput(ss) + + out_statistics_file = open(out_statistics, "r") + out_statistics_string = out_statistics_file.read() + out_statistics_file.close() + parse_out_statistics = json.loads(out_statistics_string) + for k in parse_out_statistics: + v = parse_out_statistics[k] + if type(v) != dict: + stats_dict[k] = v + else: + for elem2 in v: + stats_dict[k + "_" + elem2] = v[elem2] + + +def runAll( + cluster_config: runner_utils.ClusterConfig, stats_config: runner_utils.StatsConfig +): + stats = [] + for clusterer_idx, clusterer in enumerate(cluster_config.clusterers): + if clusterer == "SKIP": continue - configs = runner_utils.clusterer_configs[clusterer_idx] if runner_utils.clusterer_configs[clusterer_idx] is not None else [""] - config_prefix = runner_utils.clusterer_config_names[clusterer_idx] + "{" if runner_utils.clusterer_configs[clusterer_idx] is not None else "" - config_postfix = "}" if runner_utils.clusterer_configs[clusterer_idx] is not None else "" - for config_idx, config in enumerate(configs): - for i in range(runner_utils.num_rounds): - if runner_utils.deterministic == "true" and i != 0: - continue - out_prefix = runner_utils.output_directory + clusterer + "_" + str(graph_idx) + "_" + thread + "_" + str(config_idx) + "_" + str(i) - try: - stats_dict = {} - stats_dict['Clusterer Name'] = clusterer - stats_dict["Input Graph"] = graph - stats_dict["Threads"] = thread - stats_dict["Config"] = config - stats_dict["Round"] = i - stats_dict["Cluster Time"] = getRunTime(clusterer, out_prefix) - runStats(out_prefix, graph, graph_idx, stats_dict) - stats_dict["Ground Truth"] = runner_utils.communities[graph_idx] - stats.append(stats_dict) - except FileNotFoundError: - print("Failed because file not found, ", out_prefix) - stats_dataframe = pd.DataFrame(stats) - if not os.path.exists(runner_utils.csv_output_directory): - os.makedirs(runner_utils.csv_output_directory) - stats_dataframe.to_csv(runner_utils.csv_output_directory + '/stats.csv', mode='a') + for graph_idx, graph in enumerate(cluster_config.graphs): + if graph == "SKIP": + continue + if clusterer.startswith("Snap"): + for i in range(cluster_config.num_rounds): + if stats_config.deterministic == "true" and i != 0: + continue + out_prefix = ( + cluster_config.output_directory + + clusterer + + "_" + + str(graph_idx) + + "_" + + str(i) + ) + stats_dict = {} + stats_dict["Clusterer Name"] = clusterer + stats_dict["Input Graph"] = graph + stats_dict["Threads"] = 1 + stats_dict["Config"] = None + stats_dict["Round"] = i + stats_dict["Cluster Time"] = getRunTime(clusterer, out_prefix) + runStats( + out_prefix, + graph, + graph_idx, + stats_dict, + cluster_config, + stats_config, + ) + stats_dict["Ground Truth"] = stats_config.communities[graph_idx] + stats.append(stats_dict) + continue + for thread_idx, thread in enumerate(cluster_config.num_threads): + if ( + stats_config.deterministic == "true" + and thread != cluster_config.num_threads[0] + ): + continue + configs = ( + cluster_config.clusterer_configs[clusterer_idx] + if cluster_config.clusterer_configs[clusterer_idx] is not None + else [""] + ) + config_prefix = ( + cluster_config.clusterer_config_names[clusterer_idx] + "{" + if cluster_config.clusterer_configs[clusterer_idx] is not None + else "" + ) + config_postfix = ( + "}" + if cluster_config.clusterer_configs[clusterer_idx] is not None + else "" + ) + for config_idx, config in enumerate(configs): + for i in range(cluster_config.num_rounds): + if stats_config.deterministic == "true" and i != 0: + continue + out_prefix = ( + cluster_config.output_directory + + clusterer + + "_" + + str(graph_idx) + + "_" + + thread + + "_" + + str(config_idx) + + "_" + + str(i) + ) + try: + stats_dict = {} + stats_dict["Clusterer Name"] = clusterer + stats_dict["Input Graph"] = graph + stats_dict["Threads"] = thread + stats_dict["Config"] = config + stats_dict["Round"] = i + stats_dict["Cluster Time"] = getRunTime( + clusterer, out_prefix + ) + runStats( + out_prefix, + graph, + graph_idx, + stats_dict, + cluster_config, + stats_config, + ) + stats_dict["Ground Truth"] = stats_config.communities[ + graph_idx + ] + stats.append(stats_dict) + except FileNotFoundError: + print("Failed because file not found, ", out_prefix) + stats_dataframe = pd.DataFrame(stats) + if not os.path.exists(cluster_config.csv_output_directory): + os.makedirs(cluster_config.csv_output_directory) + stats_dataframe.to_csv(cluster_config.csv_output_directory + "/stats.csv", mode="a") + def main(): - args = sys.argv[1:] - runAll(args[0], args[1]) + parser = argparse.ArgumentParser( + description="Compute statistics for clustering results" + ) + parser.add_argument("config_file", help="Path to the cluster configuration file") + parser.add_argument( + "stats_config_file", help="Path to the stats configuration file" + ) + + args = parser.parse_args() + + # Read configurations + cluster_config = runner_utils.readConfig(args.config_file) + stats_config = runner_utils.readStatsConfig(args.stats_config_file) + + runAll(cluster_config, stats_config) + if __name__ == "__main__": - main() \ No newline at end of file + main()