diff --git a/algorithms/fsm/base.py b/algorithms/fsm/base.py index aaeaa0f..609685f 100644 --- a/algorithms/fsm/base.py +++ b/algorithms/fsm/base.py @@ -4,6 +4,8 @@ from graph.simple_graph import SimpleGraph +from subgraph.pattern import canonical_label + from algorithms.exploration.util import ( new_subgraphs_func, all_subgraphs_func) @@ -29,10 +31,13 @@ def add_edge(self, edge): @abstractmethod - def add_subgraph(self, subgraph): + def remove_edge(self, edge): pass - @abstractmethod + def add_subgraph(self, subgraph): + self.patterns[canonical_label(subgraph)] += 1 + + def remove_subgraph(self, subgraph): - pass + self.patterns[canonical_label(subgraph)] -= 1 diff --git a/algorithms/fsm/dynamic/exact_counting.py b/algorithms/fsm/dynamic/exact_counting.py new file mode 100644 index 0000000..2cf10e6 --- /dev/null +++ b/algorithms/fsm/dynamic/exact_counting.py @@ -0,0 +1,55 @@ +from datetime import datetime, timedelta + +from ..incremental.exact_counting import IncrementalExactCountingAlgorithm + +from subgraph.util import make_subgraph +from subgraph.pattern import canonical_label + + +class DynamicExactCountingAlgorithm(IncrementalExactCountingAlgorithm): + + + def __init__(self, k=3, **kwargs): + super().__init__(k=k) + + + def remove_edge(self, edge): + if edge not in self.graph: + return False + + self.graph.remove_edge(edge) + + e_add_start = datetime.now() + + u = edge.get_u() + v = edge.get_v() + + removals, replacements = self.get_all_subgraphs(u, v) + + for nodes in removals: + # collect the induced subgraph after removal of edge + # remove the subgraph with the removed edge included + edges = self.graph.get_induced_edges(nodes) + subgraph = make_subgraph(nodes, edges + [edge]) + self.remove_subgraph(subgraph) + + for nodes in replacements: + # collect the induced subgraph before removal of edge + # remove that subgraph + # update the subgraph by not including the removed edge + # add the updated subgraph + edges = self.graph.get_induced_edges(nodes) + + existing_subgraph = make_subgraph(nodes, edges + [edge]) + self.remove_subgraph(existing_subgraph) + + updated_subgraph = make_subgraph(nodes, edges) + self.add_subgraph(updated_subgraph) + + e_add_end = datetime.now() + ms = timedelta(microseconds=1) + self.metrics['edge_op'].append('del') + self.metrics['edge_op_ms'].append((e_add_end - e_add_start) / ms) + self.metrics['num_processed_subgraphs'].append(len(removals)) + + return True diff --git a/algorithms/fsm/dynamic/naive_reservoir.py b/algorithms/fsm/dynamic/naive_reservoir.py new file mode 100644 index 0000000..e1f20dd --- /dev/null +++ b/algorithms/fsm/dynamic/naive_reservoir.py @@ -0,0 +1,145 @@ +import random + +from datetime import datetime, timedelta + +from ..reservoir import ReservoirAlgorithm + +from subgraph.util import make_subgraph, make_subgraph_edge +from subgraph.pattern import canonical_label + + +class DynamicNaiveReservoirAlgorithm(ReservoirAlgorithm): + + def __init__(self, k=3, M=1000): + super().__init__(k=k, M=M) + + # the count of uncompensated deletions, where + self.c1 = 0 # i) the deleted element was in the sample, and + self.c2 = 0 # ii) the deleted element was not in the sample + + + def add_edge(self, edge): + if edge in self.graph: + return False + + e_add_start = datetime.now() + + u = edge.get_u() + v = edge.get_v() + + # replace update all existing subgraphs with u and v in the reservoir + s_rep_start = datetime.now() + + for old_subg in self.reservoir.get_common_subgraphs(u, v): + new_subg = make_subgraph(old_subg.nodes, old_subg.edges + (edge,)) + self.process_existing_subgraph(old_subg, new_subg) + + s_rep_end = datetime.now() + + # find new subgraph candidates for the reservoir + s_add_start = datetime.now() + additions = self.get_new_subgraphs(u, v) + + # perform reservoir sampling for each new subgraph candidate + I = 0 + for nodes in additions: + self.N += 1 + edges = self.graph.get_induced_edges(nodes) + subgraph = make_subgraph(nodes, edges+[edge]) + I += int(self.process_new_subgraph(subgraph)) + s_add_end = datetime.now() + + self.graph.add_edge(edge) + + e_add_end = datetime.now() + + ms = timedelta(microseconds=1) + self.metrics['edge_op'].append('add') + self.metrics['edge_op_ms'].append((e_add_end - e_add_start) / ms) + self.metrics['num_candidate_subgraphs'].append(len(additions)) + self.metrics['num_processed_subgraphs'].append(I) + self.metrics['reservoir_full_bool'].append(int(self.reservoir.is_full())) + + return True + + + def remove_edge(self, edge): + if edge not in self.graph: + return False + + e_del_start = datetime.now() + + self.graph.remove_edge(edge) + + u = edge.get_u() + v = edge.get_v() + + # find all nodesets representing subgraphs that will + # no longer be connected after the removal of this edge + removals = self.get_new_subgraphs(u, v) + D = len(removals) + + # we start compensating for subgraph deletions with variables c1 and c2 + # after the reservoir has filled up once + compensate_for_removals = self.reservoir.is_full() or (self.c1 + self.c2) > 0 + removals_from_sample = 0 + + # find all subgraphs in the reservoir containing nodes u and v + for old_subg in self.reservoir.get_common_subgraphs(u, v): + + if frozenset(old_subg.nodes) in removals: + # subgraph is no longer connected after edge removal, remove it + removals_from_sample += int(self.process_old_subgraph(old_subg)) + else: + # subgraph stays connected after edge removal, replace it + old_edges = old_subg.edges + edges = [e for e in old_edges if e != make_subgraph_edge(edge)] + new_subg = make_subgraph(old_subg.nodes, edges) + self.process_existing_subgraph(old_subg, new_subg) + + # update the count of uncompensated deletions from outside the sample + if compensate_for_removals: + self.c1 += removals_from_sample + self.c2 += D - removals_from_sample + + self.N -= D + + e_del_end = datetime.now() + + ms = timedelta(microseconds=1) + self.metrics['edge_op'].append('del') + self.metrics['edge_op_ms'].append((e_del_end - e_del_start) / ms) + self.metrics['num_candidate_subgraphs'].append(D) + self.metrics['num_processed_subgraphs'].append(removals_from_sample) + self.metrics['reservoir_full_bool'].append(int(self.reservoir.is_full())) + + return True + + + def process_new_subgraph(self, subgraph): + success = False + do_sampling = False + + if self.c1 + self.c2 == 0: + # there are no uncompensated deletions, + # so we can do normal reservoir sampling + do_sampling = True + else: + # there are uncompensated deletions, + # add subgraph to reservoir with probability c1 / (c1 + c2) + if random.random() < self.c1 / float(self.c1 + self.c2): + self.c1 -= 1 + do_sampling = True + else: + self.c2 -= 1 + + if do_sampling: + success, old_subgraph = self.reservoir.add(subgraph, N=self.N) + if success: self.add_subgraph(subgraph) + if old_subgraph: self.remove_subgraph(old_subgraph) + + return success + + + def process_old_subgraph(self, subgraph): + return self.reservoir.remove(subgraph) diff --git a/algorithms/fsm/dynamic/optimized_reservoir.py b/algorithms/fsm/dynamic/optimized_reservoir.py new file mode 100644 index 0000000..9fc0d12 --- /dev/null +++ b/algorithms/fsm/dynamic/optimized_reservoir.py @@ -0,0 +1,190 @@ +import random + +from datetime import datetime, timedelta + +from ..reservoir import ReservoirAlgorithm + +from graph.simple_graph import SimpleGraph + +from subgraph.util import make_subgraph, make_subgraph_edge +from subgraph.pattern import canonical_label + +from sampling import skip_rp +from sampling.skip_rs import SkipRS + + +class DynamicOptimizedReservoirAlgorithm(ReservoirAlgorithm): + + + def __init__(self, k=3, M=1000): + super().__init__(k=k, M=M) + + self.skip_rs = SkipRS(M) + + # number of overflowing subgraphs skipped + self.skip_sum_rs = 0 + self.skip_sum_rp = 0 + + # the count of uncompensated deletions, where + self.c1 = 0 # i) the deleted element was in the sample, and + self.c2 = 0 # ii) the deleted element was not in the sample + + + @property + def d(self): + return self.c1 + self.c2 + + + def add_edge(self, edge): + if edge in self.graph: + return False + + e_add_start = datetime.now() + + u = edge.get_u() + v = edge.get_v() + + # replace update all existing subgraphs with u and v in the reservoir + for old_subg in self.reservoir.get_common_subgraphs(u, v): + new_subg = make_subgraph(old_subg.nodes, old_subg.edges + (edge,)) + self.process_existing_subgraph(old_subg, new_subg) + + # find new subgraph candidates for the reservoir + subgraph_candidates = self.get_new_subgraphs(u, v) + + W = len(subgraph_candidates) + I = 0 # number of subgraph candidates to include in sample + + if not self.reservoir.is_full(): + # the reservoir is not full, which means we either + # haven't encountered enough subgraphs to fill it, or + # there are uncompensated deletions from the reservoir + + # in the former case, we simply add subgraphs into + # the reservoir until it is full + + if self.d == 0: + I = min(W, self.M - len(self.reservoir)) + self.s = I + self.N += I + + # RANDOM PAIRING STEP + + sum_rp = 0 + + while (self.d > 0) and (sum_rp < W): + num_picked_subgraphs = 0 + Z_rp = skip_rp.skip_records(self.c1, self.d) + + if sum_rp + Z_rp < W: + num_picked_subgraphs = int(self.c1 > 0) + else: + Z_rp = W - sum_rp + + I += num_picked_subgraphs + self.c1 -= num_picked_subgraphs + self.c2 -= Z_rp + + sum_rp += Z_rp + num_picked_subgraphs + + W -= sum_rp + + # RANDOM SAMPLING STEP + + # determine the number of candidates I to include in the sample + while self.s < W: + I += 1 + Z_rs = self.skip_rs.apply(self.N) + self.N += Z_rs + 1 + self.s += Z_rs + 1 + + # sample I subgraphs from among the candidates + if I < len(subgraph_candidates): + additions = random.sample(subgraph_candidates, I) + else: + additions = subgraph_candidates + + # add all sampled subgraphs + for nodes in additions: + edges = self.graph.get_induced_edges(nodes) + subgraph = make_subgraph(nodes, edges + [edge]) + self.process_new_subgraph(subgraph) + + self.graph.add_edge(edge) + self.s -= W + + e_add_end = datetime.now() + + ms = timedelta(microseconds=1) + self.metrics['edge_op'].append('add') + self.metrics['edge_op_ms'].append((e_add_end - e_add_start) / ms) + self.metrics['num_candidate_subgraphs'].append(len(subgraph_candidates)) + self.metrics['num_processed_subgraphs'].append(I) + self.metrics['reservoir_full_bool'].append(int(self.reservoir.is_full())) + + return True + + + def remove_edge(self, edge): + if edge not in self.graph: + return False + + e_del_start = datetime.now() + + self.graph.remove_edge(edge) + + u = edge.get_u() + v = edge.get_v() + + # find all nodesets representing subgraphs that will + # no longer be connected after the removal of this edge + removals = self.get_new_subgraphs(u, v) + D = len(removals) + + # we start compensating for subgraph deletions with variables c1 and c2 + # after the reservoir has filled up once + compensate_for_removals = self.reservoir.is_full() or self.d > 0 + removals_from_sample = 0 + + # find all subgraphs in the reservoir containing nodes u and v + for old_subg in self.reservoir.get_common_subgraphs(u, v): + + if frozenset(old_subg.nodes) in removals: + # subgraph is no longer connected after edge removal, remove it + removals_from_sample += int(self.process_old_subgraph(old_subg)) + else: + # subgraph stays connected after edge removal, replace it + old_edges = old_subg.edges + edges = [e for e in old_edges if e != make_subgraph_edge(edge)] + new_subg = make_subgraph(old_subg.nodes, edges) + self.process_existing_subgraph(old_subg, new_subg) + + if compensate_for_removals: + # update the counts of uncompensated deletions + self.c1 += removals_from_sample + self.c2 += D - removals_from_sample + + self.N -= D + + e_del_end = datetime.now() + + ms = timedelta(microseconds=1) + self.metrics['edge_op'].append('del') + self.metrics['edge_op_ms'].append((e_del_end - e_del_start) / ms) + self.metrics['num_candidate_subgraphs'].append(D) + self.metrics['num_processed_subgraphs'].append(removals_from_sample) + self.metrics['reservoir_full_bool'].append(int(self.reservoir.is_full())) + + return True + + + def process_new_subgraph(self, subgraph): + success, old_subgraph = self.reservoir.add(subgraph) + + if success: self.add_subgraph(subgraph) + if old_subgraph: self.remove_subgraph(old_subgraph) + + return success + + def process_old_subgraph(self, subgraph): + return self.reservoir.remove(subgraph) diff --git a/algorithms/fsm/incremental/exact_counting.py b/algorithms/fsm/incremental/exact_counting.py index a26abcb..d3b1fb9 100644 --- a/algorithms/fsm/incremental/exact_counting.py +++ b/algorithms/fsm/incremental/exact_counting.py @@ -48,15 +48,12 @@ def add_edge(self, edge): e_add_end = datetime.now() ms = timedelta(microseconds=1) - self.metrics['edge_add_ms'].append((e_add_end - e_add_start) / ms) - self.metrics['new_subgraph_count'].append(len(additions)) + self.metrics['edge_op'].append('add') + self.metrics['edge_op_ms'].append((e_add_end - e_add_start) / ms) + self.metrics['num_processed_subgraphs'].append(len(additions)) return True - def add_subgraph(self, subgraph): - self.patterns.update([canonical_label(subgraph)]) - - - def remove_subgraph(self, subgraph): - self.patterns.subtract([canonical_label(subgraph)]) + def remove_edge(self, edge): + pass diff --git a/algorithms/fsm/incremental/naive_reservoir.py b/algorithms/fsm/incremental/naive_reservoir.py index 786c11f..f7390d3 100644 --- a/algorithms/fsm/incremental/naive_reservoir.py +++ b/algorithms/fsm/incremental/naive_reservoir.py @@ -24,16 +24,11 @@ def add_edge(self, edge): v = edge.get_v() # replace update all existing subgraphs with u and v in the reservoir - s_rep_start = datetime.now() - for old_subg in self.reservoir.get_common_subgraphs(u, v): new_subg = make_subgraph(old_subg.nodes, old_subg.edges + (edge,)) self.process_existing_subgraph(old_subg, new_subg) - s_rep_end = datetime.now() - # find new subgraph candidates for the reservoir - s_add_start = datetime.now() additions = self.get_new_subgraphs(u, v) # perform reservoir sampling for each new subgraph candidate @@ -43,18 +38,16 @@ def add_edge(self, edge): edges = self.graph.get_induced_edges(nodes) subgraph = make_subgraph(nodes, edges+[edge]) I += int(self.process_new_subgraph(subgraph)) - s_add_end = datetime.now() self.graph.add_edge(edge) e_add_end = datetime.now() ms = timedelta(microseconds=1) - self.metrics['edge_add_ms'].append((e_add_end - e_add_start) / ms) - self.metrics['subgraph_add_ms'].append((s_add_end - s_add_start) / ms) - self.metrics['subgraph_replace_ms'].append((s_rep_end - s_rep_start) / ms) - self.metrics['new_subgraph_count'].append(len(additions)) - self.metrics['included_subgraph_count'].append(I) + self.metrics['edge_op'].append('add') + self.metrics['edge_op_ms'].append((e_add_end - e_add_start) / ms) + self.metrics['num_candidate_subgraphs'].append(len(additions)) + self.metrics['num_processed_subgraphs'].append(I) self.metrics['reservoir_full_bool'].append(int(self.reservoir.is_full())) return True @@ -69,15 +62,9 @@ def process_new_subgraph(self, subgraph): return success - def process_existing_subgraph(self, old_subgraph, new_subgraph): - self.reservoir.replace(old_subgraph, new_subgraph) - self.remove_subgraph(old_subgraph) - self.add_subgraph(new_subgraph) - - - def add_subgraph(self, subgraph): - self.patterns[canonical_label(subgraph)] += 1 + def process_old_subgraph(self, subgraph): + raise NotImplementedError() - def remove_subgraph(self, subgraph): - self.patterns[canonical_label(subgraph)] -= 1 + def remove_edge(self, edge): + raise NotImplementedError() diff --git a/algorithms/fsm/incremental/optimized_reservoir.py b/algorithms/fsm/incremental/optimized_reservoir.py index fc4214f..21d7df9 100644 --- a/algorithms/fsm/incremental/optimized_reservoir.py +++ b/algorithms/fsm/incremental/optimized_reservoir.py @@ -30,14 +30,11 @@ def add_edge(self, edge): v = edge.get_v() # replace update all existing subgraphs with u and v in the reservoir - s_rep_start = datetime.now() for old_subg in self.reservoir.get_common_subgraphs(u, v): new_subg = make_subgraph(old_subg.nodes, old_subg.edges + (edge,)) self.process_existing_subgraph(old_subg, new_subg) - s_rep_end = datetime.now() # find new subgraph candidates for the reservoir - s_add_start = datetime.now() subgraph_candidates = self.get_new_subgraphs(u, v) W = len(subgraph_candidates) @@ -69,19 +66,16 @@ def add_edge(self, edge): subgraph = make_subgraph(nodes, edges+[edge]) self.process_new_subgraph(subgraph) - s_add_end = datetime.now() - self.graph.add_edge(edge) self.s -= W e_add_end = datetime.now() ms = timedelta(microseconds=1) - self.metrics['edge_add_ms'].append((e_add_end - e_add_start) / ms) - self.metrics['subgraph_add_ms'].append((s_add_end - s_add_start) / ms) - self.metrics['subgraph_replace_ms'].append((s_rep_end - s_rep_start) / ms) - self.metrics['new_subgraph_count'].append(W) - self.metrics['included_subgraph_count'].append(I) + self.metrics['edge_op'].append('add') + self.metrics['edge_op_ms'].append((e_add_end - e_add_start) / ms) + self.metrics['num_candidate_subgraphs'].append(W) + self.metrics['num_processed_subgraphs'].append(I) self.metrics['reservoir_full_bool'].append(int(self.reservoir.is_full())) self.metrics['skiprs_treshold_bool'].append(int(self.skip_rs.is_threshold_reached(self.N))) @@ -95,17 +89,3 @@ def process_new_subgraph(self, subgraph): if old_subgraph: self.remove_subgraph(old_subgraph) return success - - - def process_existing_subgraph(self, old_subgraph, new_subgraph): - self.reservoir.replace(old_subgraph, new_subgraph) - self.remove_subgraph(old_subgraph) - self.add_subgraph(new_subgraph) - - - def add_subgraph(self, subgraph): - self.patterns[canonical_label(subgraph)] += 1 - - - def remove_subgraph(self, subgraph): - self.patterns[canonical_label(subgraph)] -= 1 diff --git a/algorithms/fsm/reservoir.py b/algorithms/fsm/reservoir.py index 86eb7fa..cc316de 100644 --- a/algorithms/fsm/reservoir.py +++ b/algorithms/fsm/reservoir.py @@ -9,7 +9,7 @@ class ReservoirAlgorithm(BaseAlgorithm, metaclass=ABCMeta): def __init__(self, M=None, **kwargs): self.M = M # reservoir size self.N = 0 # number of subgraphs encountered - self.reservoir = SubgraphReservoir(size=M) + self.reservoir = SubgraphReservoir(M) super().__init__(M=M, **kwargs) @@ -17,3 +17,14 @@ def __init__(self, M=None, **kwargs): @abstractmethod def process_new_subgraph(self, subgraph): pass + + + @abstractmethod + def process_old_subgraph(self, subgraph): + pass + + + def process_existing_subgraph(self, old_subgraph, new_subgraph): + self.reservoir.replace(old_subgraph, new_subgraph) + self.remove_subgraph(old_subgraph) + self.add_subgraph(new_subgraph) diff --git a/continuous_accuracy.py b/continuous_accuracy.py new file mode 100644 index 0000000..f5ddb79 --- /dev/null +++ b/continuous_accuracy.py @@ -0,0 +1,297 @@ +import os +import csv +import time +import uuid + +import numpy as np +import matplotlib.pyplot as plt + +from datetime import datetime, timedelta +from collections import deque, defaultdict +from argparse import ArgumentParser, FileType + +from graph.util import make_edge + +from util.accuracy import ( + pattern_frequencies, + threshold_frequencies, + precision, recall, avg_relative_error +) + +from algorithms.fsm.incremental.exact_counting import IncrementalExactCountingAlgorithm +from algorithms.fsm.incremental.naive_reservoir import IncrementalNaiveReservoirAlgorithm +from algorithms.fsm.incremental.optimized_reservoir import IncerementalOptimizedReservoirAlgorithm + +from algorithms.fsm.dynamic.exact_counting import DynamicExactCountingAlgorithm +from algorithms.fsm.dynamic.naive_reservoir import DynamicNaiveReservoirAlgorithm +from algorithms.fsm.dynamic.optimized_reservoir import DynamicOptimizedReservoirAlgorithm + +ACCURACY_SAMPLE_INTERVAL = 10 + +ALGORITHMS = { + + 'incremental': { + 'exact': IncrementalExactCountingAlgorithm, + 'naive': IncrementalNaiveReservoirAlgorithm, + 'optimal': IncerementalOptimizedReservoirAlgorithm + }, + + 'dynamic': { + 'exact': DynamicExactCountingAlgorithm, + 'naive': DynamicNaiveReservoirAlgorithm, + 'optimal': DynamicOptimizedReservoirAlgorithm + } + +} + + +def run_simulation(simulators, edges, T_k, window_size=0): + ms = timedelta(microseconds=1) + + np.random.shuffle(edges) + + accuracy_metrics = defaultdict(lambda: defaultdict(list)) + performance_metrics = defaultdict(list) + + if window_size > 0: + sliding_window = deque() + + start_time = time.time() + + for idx, edge_to_add in enumerate(edges): + + edge_to_remove = None + + if window_size > 0: + sliding_window.append(edge_to_add) + + if len(sliding_window) > window_size: + edge_to_remove = sliding_window.popleft() + + + for algorithm in ['exact', 'naive', 'optimal']: + + edge_op_start = datetime.now() + + simulators[algorithm].add_edge(edge_to_add) + + if edge_to_remove != None: + simulators[algorithm].remove_edge(edge_to_remove) + + edge_op_end = datetime.now() + + performance_metrics[algorithm].append((edge_op_end - edge_op_start) / ms) + + + if idx % ACCURACY_SAMPLE_INTERVAL == 0: + exact_patterns = +simulators['exact'].patterns + exact_freqs = pattern_frequencies(exact_patterns) + exact_t_freqs = threshold_frequencies(exact_freqs, 0.005) + + for algo_type in ['naive', 'optimal']: + + algo_patterns = +simulators[algo_type].patterns + algo_freqs = pattern_frequencies(algo_patterns) + algo_t_freqs = threshold_frequencies(algo_freqs, 0.005) + + prec = precision(exact_t_freqs, algo_t_freqs) + rec = recall(exact_t_freqs, algo_t_freqs) + are = avg_relative_error(exact_t_freqs, algo_t_freqs, T_k) + + accuracy_metrics[algo_type]['precision'].append(prec) + accuracy_metrics[algo_type]['recall'].append(rec) + accuracy_metrics[algo_type]['avg_relative_error'].append(are) + + end_time = time.time() + + return end_time - start_time, accuracy_metrics, performance_metrics + + +def main(): + parser = ArgumentParser(description="Run a FSM continuous accuracy experiment.") + + parser.add_argument("k", + type=int, + help="size of subgraphs (k-nodes) being mined") + + parser.add_argument('stream_setting', + choices=['incremental', 'dynamic'], + help="choose between incremental or fully dynamic stream setting") + + ''' + parser.add_argument('algorithm', + choices=['exact', 'naive', 'optimal'], + help="choose exact counting, or naive or optimised reservoir sampling") + ''' + + parser.add_argument('edge_file', + type=FileType('r'), + help="path to the input graph edge file") + + parser.add_argument('output_dir', + help="path to the directory for output files") + + parser.add_argument('reservoir_size', + type=int, + help="reservoir size required for naive and optimal algorithms") + + parser.add_argument('T_k', + type=int, + help="number of possible k-node subgraph patterns") + + parser.add_argument('-t', '--times', + type=int, + default=10, + help="number of times the simulation is run in this instance") + + parser.add_argument('-w', '--windowsize', + dest="window_size", + type=int, + help="size of the sliding window (requires dynamic stream setting)") + + args = vars(parser.parse_args()) + + k = args['k'] + #algo = args['algorithm'] + stream = args['stream_setting'] + output_dir = args['output_dir'] + M = args['reservoir_size'] + T_k = args['T_k'] + times = args['times'] + window_size = args['window_size'] + + in_file = args['edge_file'] + #output_dir = args['output_dir'] + + print("Running the Continuous Accuracy Experiment", "\n") + + print("PARAMETERS") + print("stream setting:", stream) + #print("algorithm: ", algo) + print("k: ", k) + print("M: ", M) + print("T_k: ", T_k) + print("times: ", times) + print("window size: ", window_size) + print("input graph: ", in_file.name) + print("output dir: ", output_dir, "\n") + + + ExactAlgorithm = ALGORITHMS[stream]['exact'] + NaiveAlgorithm = ALGORITHMS[stream]['naive'] + OptimizedAlgorithm = ALGORITHMS[stream]['optimal'] + + + if window_size and stream != 'dynamic': + msg = "sliding window is only used with %s stream setting" % (stream) + raise ValueError(msg) + + + # read the input graph from the edge file + with in_file as edge_file: + edge_reader = csv.reader(edge_file, delimiter=' ') + edges = [make_edge(*tuple(int(x) for x in row)) for row in edge_reader] + + + # run simulations and collect the duration and metrics from each run + durations = [] + run_accuracy_metrics = [] + run_performance_metrics = [] + + print("SIMULATIONS", "\n") + + for i in range(times): + print("Running simulation", i + 1, "...") + + simulators = { + 'exact': ExactAlgorithm(k=k), + 'naive': NaiveAlgorithm(k=k, M=M), + 'optimal': OptimizedAlgorithm(k=k, M=M) + } + + duration, acc_metrics, perf_metrics = run_simulation(simulators, edges, T_k, window_size) + + print("Done, run took", duration, "seconds.", "\n") + + durations.append(duration) + run_accuracy_metrics.append(acc_metrics) + run_performance_metrics.append(perf_metrics) + + + avg_duration = np.mean(durations) + + print("Average duration of a run was", avg_duration, "seconds.", "\n") + + ec_edge_op_durations = [x['exact'] for x in run_performance_metrics] + nrs_edge_op_durations = [x['naive'] for x in run_performance_metrics] + ors_edge_op_durations = [x['optimal'] for x in run_performance_metrics] + + print("Average edge operation durations") + print("EC: %d ms" % (np.mean(ec_edge_op_durations))) + print("NRS: %d ms" % (np.mean(nrs_edge_op_durations))) + print("ORS: %d ms" % (np.mean(ors_edge_op_durations))) + + identifier = uuid.uuid4() # unique identifier for files + + print("Plots of performance") + + perf_plot_path = os.path.join(output_dir, "%s_performance_plot.pdf" % (identifier)) + + x_values = np.arange(len(ec_edge_op_durations[0])) + + ec_avg_edge_op_durations = np.mean(ec_edge_op_durations, axis=0) + nrs_avg_edge_op_durations = np.mean(nrs_edge_op_durations, axis=0) + ors_avg_edge_op_durations = np.mean(ors_edge_op_durations, axis=0) + + fig = plt.figure() + ax = fig.add_subplot(111) + ax.plot(x_values, ec_avg_edge_op_durations, label="EC") + ax.plot(x_values, nrs_avg_edge_op_durations, label="NRS") + ax.plot(x_values, ors_avg_edge_op_durations, label="ORS") + ax.legend() + plt.title("edge op durations, k="+str(k)) + plt.savefig(perf_plot_path) + + print("Saved figure to", perf_plot_path) + + + print("Plots of accuracy", "\n") + + for algo_type in ['naive', 'optimal']: + algo_acc_metrics = [d[algo_type] for d in run_accuracy_metrics] + + avg_precision = np.mean([c['precision'] for c in algo_acc_metrics], axis=0) + avg_recall = np.mean([c['recall'] for c in algo_acc_metrics], axis=0) + avg_are = np.mean([c['avg_relative_error'] for c in algo_acc_metrics], axis=0) + + x_values = [ACCURACY_SAMPLE_INTERVAL * i for i in range(1, len(avg_precision) + 1)] + + fig = plt.figure() + ax = fig.add_subplot(311) + + ax.plot(x_values, avg_precision) + plt.title("precision") + + ax = fig.add_subplot(312) + + ax.plot(x_values, avg_recall) + plt.title("recall") + + ax = fig.add_subplot(313) + + ax.plot(x_values, avg_are) + plt.title("average relative error") + + fig.suptitle('%s fully dynamic algorithm performance' % (algo_type), fontsize=16) + + plt.tight_layout() + + acc_plot_path = os.path.join(output_dir, "%s_%s_vs_ec_accuracy_plot.pdf" % (identifier, algo_type)) + + plt.savefig(acc_plot_path) + + print("Saved figure to", acc_plot_path) + + +if __name__ == '__main__': + main() diff --git a/sampling/skip_rp.py b/sampling/skip_rp.py new file mode 100644 index 0000000..1bc2906 --- /dev/null +++ b/sampling/skip_rp.py @@ -0,0 +1,109 @@ +import numpy as np + +# The alpha value used in Vitter's "An efficient algorithm for sequential +# random sampling" was 1/13 +ALPHA_INV = 13 + +def draw_V_prime(coefficient): + return np.exp(np.log(np.random.rand()) * coefficient) + + +def skip_records(n, N): + """ + Returns the number of records S to skip before selecting the next record. + + :param n: The number of records to be selected out of the pool. + :param N: The total number of records to choose from. + :type n: int + :type N: int + :returns: S, the number of records to skip + :rtype: int + """ + + threshold = n * ALPHA_INV + + S = N + + if n > 0: + V_prime = draw_V_prime(1 / float(n)) + + if (n > 1): + if (threshold < N): + S = algorithm_D(n, N, V_prime) + else: + S = algorithm_A(n, N) + else: + # the n == 1 special case + S = int(N * V_prime) + + return S + + + +def algorithm_A(n, N): + top = N - n + N_real = float(N) + + V = np.random.rand() + S = 0 + + quot = (N - n) / N_real + + while quot > V: + S += 1 + top -= 1 + N_real -= 1 + + quot *= top / N_real + + return S + + + +def algorithm_D(n, N, V_prime): + n_inv = 1 / float(n) + n_min1_inv = 1 / float(n - 1) + + qu1 = N - n + 1 + + while True: + while True: + X = N * (1 - V_prime) + S = int(X) + + if S < qu1: + break + + V_prime = draw_V_prime(n_inv) + + U = np.random.rand() + + y1 = np.exp(np.log(U * N / float(qu1)) * n_min1_inv) + + V_prime = y1 * (- X / float(N + 1)) * (qu1 / float(qu1 - S)) + + if V_prime <= 1: + break + + y2 = 1 + top = N - 1 + + if n - 1 > S: + bottom = float(N - n) + limit = N - S + else: + bottom = float(N - S - 1) + limit = qu1 + + for t in range(N - 1, limit - 1, -1): + y2 *= top / bottom + top -= 1 + bottom -= 1 + + if N / float(N - X) >= y1 * np.exp(np.log(y2) * n_min1_inv): + # V_prime = draw_V_prime(n_min1_inv) + break + + V_prime = draw_V_prime(n_inv) + + return S diff --git a/sampling/subgraph_reservoir.py b/sampling/subgraph_reservoir.py index 995abb6..3f00103 100644 --- a/sampling/subgraph_reservoir.py +++ b/sampling/subgraph_reservoir.py @@ -6,16 +6,18 @@ class SubgraphReservoir: subgraphs = None vertex_subgraphs = None - def __init__(self, size): + def __init__(self, max_size): """ Initialize a new subgraph reservoir. :param size: The maximum size of the reservoir. :type size: int """ - self.max_size = size + self.size = 0 + self.max_size = max_size self.subgraphs = [] self.subgraph_indices = {} + self.vacant_indices = [] self.vertex_subgraphs = defaultdict(set) @@ -24,12 +26,12 @@ def __contains__(self, subgraph): def __len__(self): - return len(self.subgraphs) + return self.size def is_full(self): """Checks if the reservoir has reached max_size.""" - return len(self) >= self.max_size + return self.size >= self.max_size def add(self, subgraph, N=float('-inf')): @@ -50,8 +52,16 @@ def add(self, subgraph, N=float('-inf')): else: # the reservoir is not full, so we add the new subgraph - idx = len(self) - self.subgraphs.append(subgraph) + + # determine where in the list the subgraph is added + # if there are vacant indices in the list, use them first + # otherwise append the subgraph to the end + if len(self.vacant_indices) > 0: + idx = self.vacant_indices.pop() + self.subgraphs[idx] = subgraph + else: + idx = len(self) + self.subgraphs.append(subgraph) self.subgraph_indices[subgraph] = idx @@ -60,6 +70,9 @@ def add(self, subgraph, N=float('-inf')): success = True + if success and old_subgraph == None: + self.size += 1 + return success, old_subgraph @@ -86,6 +99,26 @@ def replace(self, old_subgraph, new_subgraph): self.vertex_subgraphs[v].remove(idx) + def remove(self, subgraph): + """Removes subgraph from reservoir if possible.""" + + if subgraph in self: + idx = self.subgraph_indices[subgraph] + + self.subgraphs[idx] = None + del self.subgraph_indices[subgraph] + + for u in subgraph.nodes: + self.vertex_subgraphs[u].remove(idx) + + self.vacant_indices.append(idx) + self.size -= 1 + + return True + else: + return False + + def get_common_subgraphs(self, u, v): """Get all subgraphs from the reservoir that contain the edge (u, v).""" common_indices = self.vertex_subgraphs[u] & self.vertex_subgraphs[v] diff --git a/accuracy.py b/util/accuracy.py similarity index 100% rename from accuracy.py rename to util/accuracy.py