diff --git a/Examples.py b/Examples.py index 82d86ce5484..40a007bbcc6 100644 --- a/Examples.py +++ b/Examples.py @@ -1140,15 +1140,12 @@ def passive_vpa_learning_arithmetics(): def passive_vpa_learning_on_all_benchmark_models(): from aalpy.learning_algs import run_PAPNI - from aalpy.utils.BenchmarkVpaModels import get_all_VPAs + from aalpy.utils.BenchmarkVpaModels import vpa_L1, vpa_L12, vpa_for_odd_parentheses from aalpy.utils import generate_input_output_data_from_vpa, convert_i_o_traces_for_RPNI - for gt in get_all_VPAs(): - + for gt in [vpa_L1(), vpa_L12(), vpa_for_odd_parentheses()]: vpa_alphabet = gt.input_alphabet - data = generate_input_output_data_from_vpa(gt, num_sequances=2000, min_seq_len=1, max_seq_len=16) - - data = convert_i_o_traces_for_RPNI(data) + data = generate_input_output_data_from_vpa(gt, num_sequances=2000, max_seq_len=16) papni = run_PAPNI(data, vpa_alphabet, algorithm='gsm', print_info=True) @@ -1160,3 +1157,147 @@ def passive_vpa_learning_on_all_benchmark_models(): assert False, 'Papni Learned Model not consistent with data.' print('PAPNI model conforms to data.') + + +def gsm_rpni(): + from aalpy import load_automaton_from_file + from aalpy.utils.Sampling import get_io_traces, sample_with_length_limits + from aalpy.learning_algs.general_passive.GeneralizedStateMerging import run_GSM + + automaton = load_automaton_from_file("DotModels/car_alarm.dot", "moore") + input_traces = sample_with_length_limits(automaton.get_input_alphabet(), 100, 20, 30) + traces = get_io_traces(automaton, input_traces) + + learned_model = run_GSM(traces, output_behavior="moore", transition_behavior="deterministic") + learned_model.visualize() + + +def gsm_edsm(): + from typing import Dict + from aalpy import load_automaton_from_file + from aalpy.utils.Sampling import get_io_traces, sample_with_length_limits + from aalpy.learning_algs.general_passive.GeneralizedStateMerging import run_GSM + from aalpy.learning_algs.general_passive.ScoreFunctionsGSM import ScoreCalculation + from aalpy.learning_algs.general_passive.Node import Node + + automaton = load_automaton_from_file("DotModels/car_alarm.dot", "moore") + input_traces = sample_with_length_limits(automaton.get_input_alphabet(), 100, 20, 30) + traces = get_io_traces(automaton, input_traces) + + def EDSM_score(part: Dict[Node, Node]): + nr_partitions = len(set(part.values())) + nr_merged = len(part) + return nr_merged - nr_partitions + + score = ScoreCalculation(score_function=EDSM_score) + learned_model = run_GSM(traces, output_behavior="moore", transition_behavior="deterministic", score_calc=score) + learned_model.visualize() + + +def gsm_likelihood_ratio(): + from typing import Dict + from scipy.stats import chi2 + from aalpy.learning_algs.general_passive.GeneralizedStateMerging import run_GSM + from aalpy.learning_algs.general_passive.ScoreFunctionsGSM import ScoreFunction, differential_info, ScoreCalculation + from aalpy.learning_algs.general_passive.Node import Node + from aalpy.utils.Sampling import get_io_traces, sample_with_length_limits + from aalpy import load_automaton_from_file + + automaton = load_automaton_from_file("DotModels/MDPs/faulty_car_alarm.dot", "mdp") + input_traces = sample_with_length_limits(automaton.get_input_alphabet(), 2000, 20, 30) + traces = get_io_traces(automaton, input_traces) + + def likelihood_ratio_score(alpha=0.05) -> ScoreFunction: + if not 0 < alpha <= 1: + raise ValueError(f"Confidence {alpha} not between 0 and 1") + + def score_fun(part: Dict[Node, Node]): + llh_diff, param_diff = differential_info(part) + if param_diff == 0: + # This should cover the corner case when the partition merges only states with no outgoing transitions. + return -1 # Let them be very bad merges. + score = 1 - chi2.cdf(2 * llh_diff, param_diff) + if score < alpha: + return False + return score + + return score_fun + + score = ScoreCalculation(score_function=likelihood_ratio_score()) + learned_model = run_GSM(traces, output_behavior="moore", transition_behavior="stochastic", score_calc=score) + learned_model.visualize() + + +def gsm_IOAlergia_EDSM(): + from aalpy.learning_algs.general_passive.GeneralizedStateMerging import run_GSM + from aalpy.learning_algs.general_passive.ScoreFunctionsGSM import hoeffding_compatibility, ScoreCalculation + from aalpy.learning_algs.general_passive.Node import Node + from aalpy.utils.Sampling import get_io_traces, sample_with_length_limits + from aalpy import load_automaton_from_file + + automaton = load_automaton_from_file("DotModels/MDPs/faulty_car_alarm.dot", "mdp") + input_traces = sample_with_length_limits(automaton.get_input_alphabet(), 2000, 20, 30) + traces = get_io_traces(automaton, input_traces) + + class IOAlergiaWithEDSM(ScoreCalculation): + def __init__(self, epsilon): + super().__init__() + self.ioa_compatibility = hoeffding_compatibility(epsilon) + self.evidence = 0 + + def reset(self): + self.evidence = 0 + + def local_compatibility(self, a: Node, b: Node): + self.evidence += 1 + return self.ioa_compatibility(a, b) + + def score_function(self, part: dict[Node, Node]): + return self.evidence + + epsilon = 0.05 + scores = { + "IOA": ScoreCalculation(hoeffding_compatibility(epsilon)), + "IOA+EDSM": IOAlergiaWithEDSM(epsilon), + } + for name, score in scores.items(): + learned_model = run_GSM(traces, output_behavior="moore", transition_behavior="stochastic", score_calc=score, + compatibility_on_pta=True, compatibility_on_futures=True) + learned_model.visualize(name) + + +def gsm_IOAlergia_domain_knowldege(): + from aalpy.learning_algs.general_passive.GeneralizedStateMerging import run_GSM + from aalpy.learning_algs.general_passive.ScoreFunctionsGSM import hoeffding_compatibility, ScoreCalculation + from aalpy.learning_algs.general_passive.Node import Node + from aalpy.utils.Sampling import get_io_traces, sample_with_length_limits + from aalpy import load_automaton_from_file + + automaton = load_automaton_from_file("DotModels/MDPs/faulty_car_alarm.dot", "mdp") + input_traces = sample_with_length_limits(automaton.get_input_alphabet(), 2000, 20, 30) + traces = get_io_traces(automaton, input_traces) + + ioa_compat = hoeffding_compatibility(0.05) + + def get_parity(node: Node): + pref = node.get_prefix() + return [sum(in_s == key for in_s, out_s in pref) % 2 for key in ["l", "d"]] + + # The car has 4 physical states arising from the combination of locked/unlocked and open/closed. + # Each input toggles a transition between these four states. While the car alarm system has richer behavior than that, + # it still needs to discern the physical states. Thus, in every sane implementation of a car alarm system, every state + # is associated with exactly one physical state. This additional assumption can be enforced by checking the parity of + # all input symbols during merging. + def ioa_compat_domain_knowledge(a: Node, b: Node): + parity = get_parity(a) == get_parity(b) + ioa = ioa_compat(a, b) + return parity and ioa + + scores = { + "IOA": ScoreCalculation(ioa_compat), + "IOA+DK": ScoreCalculation(ioa_compat_domain_knowledge), + } + for name, score in scores.items(): + learned_model = run_GSM(traces, output_behavior="moore", transition_behavior="stochastic", score_calc=score, + compatibility_on_pta=True, compatibility_on_futures=True) + learned_model.visualize(name) diff --git a/aalpy/__init__.py b/aalpy/__init__.py index 78368c006e8..6cc9a244d28 100644 --- a/aalpy/__init__.py +++ b/aalpy/__init__.py @@ -9,6 +9,8 @@ MealyState, MooreMachine, MooreState, + NDMooreMachine, + NDMooreState, Onfsm, OnfsmState, Sevpa, @@ -37,6 +39,7 @@ run_non_det_Lstar, run_RPNI, run_stochastic_Lstar, + run_GSM, run_PAPNI ) from .oracles import ( diff --git a/aalpy/automata/NonDeterministicMooreMachine.py b/aalpy/automata/NonDeterministicMooreMachine.py new file mode 100644 index 00000000000..97d474d73fb --- /dev/null +++ b/aalpy/automata/NonDeterministicMooreMachine.py @@ -0,0 +1,67 @@ +import random +from collections import defaultdict +from typing import List, Dict, Generic + +from aalpy.base import AutomatonState, Automaton +from aalpy.base.Automaton import OutputType, InputType + + +class NDMooreState(AutomatonState, Generic[InputType, OutputType]): + """ + Single state of a non-deterministic Moore machine. Each state has an output value. + """ + + def __init__(self, state_id, output=None): + super().__init__(state_id) + self.transitions: Dict[InputType, List['NDMooreState']] = defaultdict(lambda: list()) + self.output: OutputType = output + + +class NDMooreMachine(Automaton[NDMooreState[InputType, OutputType]]): + + def to_state_setup(self): + state_setup = dict() + + def set_dict_entry(state: NDMooreState): + state_setup[state.state_id] = (state.output, + {in_sym: [target.state_id for target in trans] for in_sym, trans in + state.transitions.items()}) + + set_dict_entry(self.initial_state) + for state in self.states: + if state is self.initial_state: + continue + set_dict_entry(state) + + @staticmethod + def from_state_setup(state_setup: dict, **kwargs) -> 'NDMooreMachine': + states_map = {key: NDMooreState(key, output=value[0]) for key, value in state_setup.items()} + + for key, values in state_setup.items(): + source = states_map[key] + for i, transitions in values[1].items(): + for node in transitions: + source.transitions[i].append(states_map[node]) + + initial_state = states_map[list(state_setup.keys())[0]] + return NDMooreMachine(initial_state, list(states_map.values())) + + def __init__(self, initial_state: AutomatonState, states: list): + super().__init__(initial_state, states) + + def step(self, letter): + """ + In Moore machines outputs depend on the current state. + + Args: + + letter: single input that is looked up in the transition function leading to a new state + + Returns: + + the output of the reached state + + """ + options = self.current_state.transitions[letter] + self.current_state = random.choice(options) + return self.current_state.output diff --git a/aalpy/automata/__init__.py b/aalpy/automata/__init__.py index 49299c76055..dd213c3bc45 100644 --- a/aalpy/automata/__init__.py +++ b/aalpy/automata/__init__.py @@ -5,5 +5,6 @@ from .Onfsm import Onfsm, OnfsmState from .StochasticMealyMachine import StochasticMealyMachine, StochasticMealyState from .MarkovChain import MarkovChain, McState +from .NonDeterministicMooreMachine import NDMooreMachine, NDMooreState from .Sevpa import Sevpa, SevpaState, SevpaAlphabet, SevpaTransition from .Vpa import Vpa, VpaAlphabet, VpaState, VpaTransition diff --git a/aalpy/learning_algs/__init__.py b/aalpy/learning_algs/__init__.py index 0dac5adfa62..81892586e52 100644 --- a/aalpy/learning_algs/__init__.py +++ b/aalpy/learning_algs/__init__.py @@ -10,3 +10,4 @@ from .stochastic_passive.ActiveAleriga import run_active_Alergia from .deterministic_passive.RPNI import run_RPNI, run_PAPNI from .deterministic_passive.active_RPNI import run_active_RPNI +from .general_passive.GeneralizedStateMerging import run_GSM \ No newline at end of file diff --git a/aalpy/learning_algs/deterministic_passive/ClassicRPNI.py b/aalpy/learning_algs/deterministic_passive/ClassicRPNI.py new file mode 100644 index 00000000000..d389a11b4c3 --- /dev/null +++ b/aalpy/learning_algs/deterministic_passive/ClassicRPNI.py @@ -0,0 +1,129 @@ +import time +from bisect import insort +from aalpy.learning_algs.deterministic_passive.rpni_helper_functions import to_automaton, createPTA, \ + check_sequence, extract_unique_sequences + + +class ClassicRPNI: + def __init__(self, data, automaton_type, print_info=True): + self.data = data + self.automaton_type = automaton_type + self.print_info = print_info + + pta_construction_start = time.time() + self.root_node = createPTA(data, automaton_type) + self.test_data = extract_unique_sequences(self.root_node) + + if self.print_info: + print(f'PTA Construction Time: {round(time.time() - pta_construction_start, 2)}') + + def run_rpni(self): + start_time = time.time() + + red = [self.root_node] + blue = list(red[0].children.values()) + while blue: + lex_min_blue = min(list(blue)) + merged = False + + for red_state in red: + if not self._compatible_states(red_state, lex_min_blue): + continue + merge_candidate = self._merge(red_state, lex_min_blue, copy_nodes=True) + if self._compatible(merge_candidate): + self._merge(red_state, lex_min_blue) + merged = True + break + + if not merged: + insort(red, lex_min_blue) + if self.print_info: + print(f'\rCurrent automaton size: {len(red)}', end="") + + blue.clear() + for r in red: + for c in r.children.values(): + if c not in red: + blue.append(c) + + if self.print_info: + print(f'\nRPNI Learning Time: {round(time.time() - start_time, 2)}') + print(f'RPNI Learned {len(red)} state automaton.') + + assert sorted(red, key=lambda x: len(x.prefix)) == red + return to_automaton(red, self.automaton_type) + + def _compatible(self, root_node): + """ + Check if current model is compatible with the data. + """ + for sequence in self.test_data: + if not check_sequence(root_node, sequence, automaton_type=self.automaton_type): + return False + return True + + def _compatible_states(self, red_node, blue_node): + """ + Only allow merging of states that have same output(s). + """ + if self.automaton_type != 'mealy': + # None is compatible with everything + return red_node.output == blue_node.output or red_node.output is None or blue_node.output is None + else: + red_io = {i: o for i, o in red_node.children.keys()} + blue_io = {i: o for i, o in blue_node.children.keys()} + for common_i in set(red_io.keys()).intersection(blue_io.keys()): + if red_io[common_i] != blue_io[common_i]: + return False + return True + + def _merge(self, red_node, lex_min_blue, copy_nodes=False): + """ + Merge two states and return the root node of resulting model. + """ + root_node = self.root_node.copy() if copy_nodes else self.root_node + lex_min_blue = lex_min_blue.copy() if copy_nodes else lex_min_blue + + red_node_in_tree = root_node + for p in red_node.prefix: + red_node_in_tree = red_node_in_tree.children[p] + + to_update = root_node + for p in lex_min_blue.prefix[:-1]: + to_update = to_update.children[p] + + to_update.children[lex_min_blue.prefix[-1]] = red_node_in_tree + + if self.automaton_type != 'mealy': + self._fold(red_node_in_tree, lex_min_blue) + else: + self._fold_mealy(red_node_in_tree, lex_min_blue) + + return root_node + + def _fold(self, red_node, blue_node): + # Change the output of red only to concrete output, ignore None + red_node.output = blue_node.output if blue_node.output is not None else red_node.output + + for i in blue_node.children.keys(): + if i in red_node.children.keys(): + self._fold(red_node.children[i], blue_node.children[i]) + else: + red_node.children[i] = blue_node.children[i] + + def _fold_mealy(self, red_node, blue_node): + blue_io_map = {i: o for i, o in blue_node.children.keys()} + + updated_keys = {} + for io, val in red_node.children.items(): + o = blue_io_map[io[0]] if io[0] in blue_io_map.keys() else io[1] + updated_keys[(io[0], o)] = val + + red_node.children = updated_keys + + for io in blue_node.children.keys(): + if io in red_node.children.keys(): + self._fold_mealy(red_node.children[io], blue_node.children[io]) + else: + red_node.children[io] = blue_node.children[io] + diff --git a/aalpy/learning_algs/deterministic_passive/GeneralizedStateMerging.py b/aalpy/learning_algs/deterministic_passive/GsmRPNI.py similarity index 89% rename from aalpy/learning_algs/deterministic_passive/GeneralizedStateMerging.py rename to aalpy/learning_algs/deterministic_passive/GsmRPNI.py index d5b963b3934..b1dc55e0890 100644 --- a/aalpy/learning_algs/deterministic_passive/GeneralizedStateMerging.py +++ b/aalpy/learning_algs/deterministic_passive/GsmRPNI.py @@ -1,10 +1,10 @@ -import queue import time +from collections import deque from aalpy.learning_algs.deterministic_passive.rpni_helper_functions import to_automaton, RpniNode, createPTA -class GeneralizedStateMerging: +class GsmRPNI: def __init__(self, data, automaton_type, print_info=True): self.data = data self.final_automaton_type = automaton_type @@ -12,7 +12,7 @@ def __init__(self, data, automaton_type, print_info=True): self.print_info = print_info pta_construction_start = time.time() - self.root = createPTA(data, self.automaton_type) + self.root_node = createPTA(data, self.automaton_type) self.log = [] if self.print_info: @@ -22,7 +22,7 @@ def run_rpni(self): start_time = time.time() # sorted list of states already considered - red_states = list([self.root]) + red_states = [self.root_node] # used to get the minimal non-red state blue_states = list(red_states[0].children.values()) @@ -51,7 +51,7 @@ def run_rpni(self): node.output = block.output node.children = block.children - node = self.root.get_child_by_prefix(blue_state.prefix[:-1]) + node = self.root_node.get_child_by_prefix(blue_state.prefix[:-1]) node.children[blue_state.prefix[-1]] = red_state blue_states.clear() @@ -72,11 +72,11 @@ def _partition_from_merge(self, red: RpniNode, blue: RpniNode): """ partitions = dict() - q = queue.Queue() - q.put((red, blue)) + q = deque() + q.append((red, blue)) - while not q.empty(): - red, blue = q.get() + while len(q) != 0: + red, blue = q.popleft() def get_partition(node: RpniNode): if node not in partitions: @@ -100,7 +100,7 @@ def get_partition(node: RpniNode): for symbol, blue_child in blue.children.items(): if symbol in partition.children.keys(): - q.put((partition.children[symbol], blue_child)) + q.append((partition.children[symbol], blue_child)) else: # blue_child is blue after merging if there is a red state in the partition partition.children[symbol] = blue_child diff --git a/aalpy/learning_algs/deterministic_passive/RPNI.py b/aalpy/learning_algs/deterministic_passive/RPNI.py index 48123e9b218..669f422c1f9 100644 --- a/aalpy/learning_algs/deterministic_passive/RPNI.py +++ b/aalpy/learning_algs/deterministic_passive/RPNI.py @@ -1,120 +1,8 @@ -import time -from bisect import insort from typing import Union from aalpy.base import DeterministicAutomaton -from aalpy.learning_algs.deterministic_passive.GeneralizedStateMerging import GeneralizedStateMerging -from aalpy.learning_algs.deterministic_passive.rpni_helper_functions import to_automaton, createPTA, \ - check_sequence, extract_unique_sequences - - -class RPNI: - def __init__(self, data, automaton_type, print_info=True): - self.data = data - self.automaton_type = automaton_type - self.print_info = print_info - - pta_construction_start = time.time() - self.root_node = createPTA(data, automaton_type) - self.test_data = extract_unique_sequences(self.root_node) - - if self.print_info: - print(f'PTA Construction Time: {round(time.time() - pta_construction_start, 2)}') - - def run_rpni(self): - start_time = time.time() - - red = [self.root_node] - blue = list(red[0].children.values()) - while blue: - lex_min_blue = min(list(blue)) - merged = False - - for red_state in red: - if not red_state.compatible_outputs(lex_min_blue): - continue - merge_candidate = self._merge(red_state, lex_min_blue, copy_nodes=True) - if self._compatible(merge_candidate): - self._merge(red_state, lex_min_blue) - merged = True - break - - if not merged: - insort(red, lex_min_blue) - if self.print_info: - print(f'\rCurrent automaton size: {len(red)}', end="") - - blue.clear() - for r in red: - for c in r.children.values(): - if c not in red: - blue.append(c) - - if self.print_info: - print(f'\nRPNI Learning Time: {round(time.time() - start_time, 2)}') - print(f'RPNI Learned {len(red)} state automaton.') - - assert sorted(red, key=lambda x: len(x.prefix)) == red - return to_automaton(red, self.automaton_type) - - def _compatible(self, root_node): - """ - Check if current model is compatible with the data. - """ - for sequence in self.test_data: - if not check_sequence(root_node, sequence, automaton_type=self.automaton_type): - return False - return True - - def _merge(self, red_node, lex_min_blue, copy_nodes=False): - """ - Merge two states and return the root node of resulting model. - """ - root_node = self.root_node.copy() if copy_nodes else self.root_node - lex_min_blue = lex_min_blue.copy() if copy_nodes else lex_min_blue - - red_node_in_tree = root_node - for p in red_node.prefix: - red_node_in_tree = red_node_in_tree.children[p] - - to_update = root_node - for p in lex_min_blue.prefix[:-1]: - to_update = to_update.children[p] - - to_update.children[lex_min_blue.prefix[-1]] = red_node_in_tree - - if self.automaton_type != 'mealy': - self._fold(red_node_in_tree, lex_min_blue) - else: - self._fold_mealy(red_node_in_tree, lex_min_blue) - - return root_node - - def _fold(self, red_node, blue_node): - # Change the output of red only to concrete output, ignore None - red_node.output = blue_node.output if blue_node.output is not None else red_node.output - - for i in blue_node.children.keys(): - if i in red_node.children.keys(): - self._fold(red_node.children[i], blue_node.children[i]) - else: - red_node.children[i] = blue_node.children[i] - - def _fold_mealy(self, red_node, blue_node): - blue_io_map = {i: o for i, o in blue_node.children.keys()} - - updated_keys = {} - for io, val in red_node.children.items(): - o = blue_io_map[io[0]] if io[0] in blue_io_map.keys() else io[1] - updated_keys[(io[0], o)] = val - - red_node.children = updated_keys - - for io in blue_node.children.keys(): - if io in red_node.children.keys(): - self._fold_mealy(red_node.children[io], blue_node.children[io]) - else: - red_node.children[io] = blue_node.children[io] +from aalpy.learning_algs.deterministic_passive.ClassicRPNI import ClassicRPNI +from aalpy.learning_algs.deterministic_passive.GsmRPNI import GsmRPNI def run_RPNI(data, automaton_type, algorithm='gsm', @@ -144,19 +32,14 @@ def run_RPNI(data, automaton_type, algorithm='gsm', assert input_completeness in {None, 'self_loop', 'sink_state'} if algorithm == 'classic': - rpni = RPNI(data, automaton_type, print_info) - - if rpni.root_node is None: - print('Data provided to RPNI is not deterministic. Ensure that the data is deterministic, ' - 'or consider using Alergia.') - return None + rpni = ClassicRPNI(data, automaton_type, print_info) else: - rpni = GeneralizedStateMerging(data, automaton_type, print_info) + rpni = GsmRPNI(data, automaton_type, print_info) - if rpni.root is None: - print('Data provided to RPNI is not deterministic. Ensure that the data is deterministic, ' - 'or consider using Alergia.') - return None + if rpni.root_node is None: + print('Data provided to RPNI is not deterministic. Ensure that the data is deterministic, ' + 'or consider using Alergia.') + return None learned_model = rpni.run_rpni() @@ -222,16 +105,7 @@ def run_PAPNI(data, vpa_alphabet, algorithm='gsm', print_info=True): papni_data.append((processed_sequance, label)) # instantiate and run PAPNI as base RPNI with stack-aware data - if print_info: - print('PAPNI with RPNI backend:') - if algorithm == 'classic': - rpni = RPNI(papni_data, automaton_type='dfa', print_info=print_info) - else: - rpni = GeneralizedStateMerging(papni_data, automaton_type='dfa', - print_info=print_info) - - # run classic RPNI with preprocessed data that is aware of stack - learned_model = rpni.run_rpni() + learned_model = run_RPNI(papni_data, automaton_type='dfa', algorithm=algorithm, print_info=print_info) # convert intermediate DFA representation to VPA learned_model = vpa_from_dfa_representation(learned_model, vpa_alphabet) diff --git a/aalpy/learning_algs/general_passive/GeneralizedStateMerging.py b/aalpy/learning_algs/general_passive/GeneralizedStateMerging.py new file mode 100644 index 00000000000..2370aed9a16 --- /dev/null +++ b/aalpy/learning_algs/general_passive/GeneralizedStateMerging.py @@ -0,0 +1,341 @@ +import functools +from bisect import insort +from typing import Dict, Tuple, Callable, List, Optional +from collections import deque + +from aalpy.learning_algs.general_passive.Node import Node, OutputBehavior, TransitionBehavior, TransitionInfo, \ + OutputBehaviorRange, TransitionBehaviorRange, intersection_iterator +from aalpy.learning_algs.general_passive.ScoreFunctionsGSM import ScoreCalculation, hoeffding_compatibility + + +# TODO add option for making checking of futures and partition non mutual exclusive? +# Easiest done by adding a new method / field to ScoreCalculation + +class Partitioning: + def __init__(self, red: Node, blue: Node): + self.red: Node = red + self.blue: Node = blue + self.score = False + self.red_mapping: Dict[Node, Node] = dict() + self.full_mapping: Dict[Node, Node] = dict() + + +class Instrumentation: + def __init__(self): + pass + + def reset(self, gsm: 'GeneralizedStateMerging'): + pass + + def pta_construction_done(self, root: Node): + pass + + def log_promote(self, node: Node): + pass + + def log_merge(self, part: Partitioning): + pass + + def learning_done(self, root: Node, red_states: List[Node]): + pass + + +class GeneralizedStateMerging: + def __init__(self, *, + output_behavior: OutputBehavior = "moore", + transition_behavior: TransitionBehavior = "deterministic", + score_calc: ScoreCalculation = None, + pta_preprocessing: Callable[[Node], Node] = None, + postprocessing: Callable[[Node], Node] = None, + compatibility_on_pta: bool = False, + compatibility_on_futures: bool = False, + node_order: Callable[[Node, Node], bool] = None, + consider_only_min_blue=False, + depth_first=False): + + if output_behavior not in OutputBehaviorRange: + raise ValueError(f"invalid output behavior {output_behavior}") + self.output_behavior: OutputBehavior = output_behavior + if transition_behavior not in TransitionBehaviorRange: + raise ValueError(f"invalid transition behavior {transition_behavior}") + self.transition_behavior: TransitionBehavior = transition_behavior + + if score_calc is None: + if transition_behavior == "deterministic": + score_calc = ScoreCalculation() + elif transition_behavior == "nondeterministic" : + raise ValueError("Missing score_calc for nondeterministic transition behavior. No default available.") + elif transition_behavior == "stochastic" : + score_calc = ScoreCalculation(hoeffding_compatibility(0.005, compatibility_on_pta)) + self.score_calc: ScoreCalculation = score_calc + + if node_order is None: + node_order = Node.__lt__ + self.node_order = functools.cmp_to_key(lambda a, b: -1 if node_order(a, b) else 1) + + self.pta_preprocessing = pta_preprocessing or (lambda x: x) + self.postprocessing = postprocessing or (lambda x: x) + + self.compatibility_on_pta = compatibility_on_pta + self.compatibility_on_futures = compatibility_on_futures + + self.consider_only_min_blue = consider_only_min_blue + self.depth_first = depth_first + + def compute_local_compatibility(self, a: Node, b: Node): + if self.output_behavior == "moore" and not Node.moore_compatible(a, b): + return False + if self.transition_behavior == "deterministic" and not Node.deterministic_compatible(a, b): + return False + return self.score_calc.local_compatibility(a, b) + + # TODO: make more generic by adding the option to use a different algorithm than red blue + # for selecting potential merge candidates. Maybe using inheritance with abstract `run`. + def run(self, data, convert=True, instrumentation: Instrumentation = None): + if instrumentation is None: + instrumentation = Instrumentation() + instrumentation.reset(self) + + if isinstance(data, Node): + root = data + else: + root = Node.createPTA(data, self.output_behavior) + + root = self.pta_preprocessing(root) + instrumentation.pta_construction_done(root) + instrumentation.log_promote(root) + + if self.transition_behavior == "deterministic": + if not root.is_deterministic(): + raise ValueError("required deterministic automaton but input data is nondeterministic") + + # sorted list of states already considered + red_states = [root] + + partition_candidates: Dict[Tuple[Node, Node], Partitioning] = dict() + while True: + # get blue states + blue_states = [] + for r in red_states: + for _, t in r.transition_iterator(): + c = t.target + if c in red_states: + continue + blue_states.append(c) + if self.consider_only_min_blue or not self.score_calc.has_score_function(): + blue_states = [min(blue_states, key=self.node_order)] + + # no blue states left -> done + if len(blue_states) == 0: + break + blue_states.sort(key=self.node_order) + + # loop over blue states + promotion = False + for blue_state in blue_states: + # FUTURE: Parallelize + # FUTURE: Save partitions? + + # calculate partitions resulting from merges with red states if necessary + current_candidates: Dict[Node, Partitioning] = dict() + perfect_partitioning = None + + red_state = None + for red_state in red_states: + partition = partition_candidates.get((red_state, blue_state)) + if partition is None: + partition = self._partition_from_merge(red_state, blue_state) + if partition.score is True: + perfect_partitioning = partition + break + current_candidates[red_state] = partition + + assert red_state is not None + # partition with perfect score found: don't consider anything else + if perfect_partitioning: + partition_candidates = {(red_state, blue_state): perfect_partitioning} + break + + # no merge candidates for this blue state -> promote + if all(part.score is False for part in current_candidates.values()): + insort(red_states, blue_state, key=self.node_order) + instrumentation.log_promote(blue_state) + promotion = True + break + + # update tracking dict with new candidates + new_candidates = (((red, blue_state), part) for red, part in current_candidates.items() if + part.score is not False) + partition_candidates.update(new_candidates) + + # a state was promoted -> don't clear candidates + if promotion: + continue + + # find best partitioning and clear candidates + best_candidate = max(partition_candidates.values(), key=lambda part: part.score) + for real_node, partition_node in best_candidate.red_mapping.items(): + real_node.transitions = partition_node.transitions + for access_pair, t_info in real_node.transition_iterator(): + if t_info.target not in red_states: + t_info.target.predecessor = real_node + t_info.target.prefix_access_pair = access_pair # not sure whether this is actually required + instrumentation.log_merge(best_candidate) + # FUTURE: optimizations for compatibility tests where merges can be orthogonal + # FUTURE: caching for aggregating compatibility tests + partition_candidates.clear() + + instrumentation.learning_done(root, red_states) + + root = self.postprocessing(root) + if convert: + root = root.to_automaton(self.output_behavior, self.transition_behavior) + return root + + def _check_futures(self, red: Node, blue: Node) -> bool: + q: deque[Tuple[Node, Node]] = deque([(red, blue)]) + pop = q.pop if self.depth_first else q.popleft + + while len(q) != 0: + red, blue = pop() + + if self.compute_local_compatibility(red, blue) is False: + return False + + for in_sym, red_trans, blue_trans in intersection_iterator(red.transitions, blue.transitions): + for out_sym, red_child, blue_child in intersection_iterator(red_trans, blue_trans): + if self.compatibility_on_pta: + if blue_child.original_count == 0 or red_child.original_count == 0: + continue + q.append((red_child.original_target, blue_child.original_target)) + else: + q.append((red_child.target, blue_child.target)) + + return True + + def _partition_from_merge(self, red: Node, blue: Node) -> Partitioning: + # Compatibility check based on partitions. + # assumes that blue is a tree and red is not reachable from blue + + partitioning = Partitioning(red, blue) + + self.score_calc.reset() + + if self.compatibility_on_futures: + if self._check_futures(red, blue) is False: + return partitioning + + # when compatibility is determined only by future and scores are disabled, we need not create partitions. + if self.compatibility_on_futures and not self.score_calc.has_score_function(): + def update_partition(red_node: Node, blue_node: Optional[Node]) -> Node: + partitioning.red_mapping[red_node] = red_node + return red_node + else: + def update_partition(red_node: Node, blue_node: Optional[Node]) -> Node: + if red_node not in partitioning.full_mapping: + p = red_node.shallow_copy() + partitioning.full_mapping[red_node] = p + partitioning.red_mapping[red_node] = p + else: + p = partitioning.full_mapping[red_node] + if blue_node is not None: + partitioning.full_mapping[blue_node] = p + return p + + # rewire the blue node's parent + blue_parent = update_partition(blue.predecessor, None) + blue_in_sym, blue_out_sym = blue.prefix_access_pair + blue_parent.transitions[blue_in_sym][blue_out_sym].target = red + + q: deque[Tuple[Node, Node]] = deque([(red, blue)]) + pop = q.pop if self.depth_first else q.popleft + + while len(q) != 0: + red, blue = pop() + partition = update_partition(red, blue) + + if not self.compatibility_on_futures: + if self.compute_local_compatibility(partition, blue) is False: + return partitioning + + for in_sym, blue_transitions in blue.transitions.items(): + partition_transitions = partition.get_or_create_transitions(in_sym) + for out_sym, blue_transition in blue_transitions.items(): + partition_transition = partition_transitions.get(out_sym) + if partition_transition is not None: + q.append((partition_transition.target, blue_transition.target)) + partition_transition.count += blue_transition.count + else: + # blue_child is blue after merging if there is a red state in blue's partition + partition_transition = TransitionInfo(blue_transition.target, blue_transition.count, None, 0) + partition_transitions[out_sym] = partition_transition + + partitioning.score = self.score_calc.score_function(partitioning.full_mapping) + return partitioning + + +def run_GSM(data, *, + output_behavior: OutputBehavior = "moore", + transition_behavior: TransitionBehavior = "deterministic", + score_calc: ScoreCalculation = None, + pta_preprocessing: Callable[[Node], Node] = None, + postprocessing: Callable[[Node], Node] = None, + compatibility_on_pta: bool = False, + compatibility_on_futures: bool = False, + node_order: Callable[[Node, Node], bool] = None, + consider_only_min_blue=False, + depth_first=False, + instrumentation=None, + convert=True, + ): + """ + TODO + + Args: + data: + + output_behavior: + + transition_behavior: + + score_calc: + + pta_preprocessing: + + postprocessing: + + compatibility_on_pta: + + compatibility_on_futures: + + node_order: + + consider_only_min_blue: + + depth_first: + + instrumentation: + + convert: + + + Returns: + + + """ + # instantiate the gsm + gsm = GeneralizedStateMerging( + output_behavior=output_behavior, + transition_behavior=transition_behavior, + score_calc=score_calc, + pta_preprocessing=pta_preprocessing, + postprocessing=postprocessing, + compatibility_on_pta=compatibility_on_pta, + compatibility_on_futures=compatibility_on_futures, + node_order=node_order, + consider_only_min_blue=consider_only_min_blue, + depth_first=depth_first, + ) + + # run the algorithm + return gsm.run(data=data, instrumentation=instrumentation, convert=convert) diff --git a/aalpy/learning_algs/general_passive/Instrumentation.py b/aalpy/learning_algs/general_passive/Instrumentation.py new file mode 100644 index 00000000000..edc91c6fc79 --- /dev/null +++ b/aalpy/learning_algs/general_passive/Instrumentation.py @@ -0,0 +1,130 @@ +import time +from functools import wraps +from typing import Dict, Optional + +from aalpy.learning_algs.general_passive.GeneralizedStateMerging import Instrumentation, Partitioning, \ + GeneralizedStateMerging +from aalpy.learning_algs.general_passive.Node import Node + + +class ProgressReport(Instrumentation): + @staticmethod + def min_lvl(lvl): + def decorator(fn): + @wraps(fn) + def wrapper(this, *args, **kw): + if this.lvl < lvl: + return + fn(this, *args, **kw) + + return wrapper + + return decorator + + def __init__(self, lvl): + super().__init__() + self.lvl = lvl + if lvl < 1: + return + self.gsm: Optional[GeneralizedStateMerging] = None + self.log = [] + self.pta_size = None + self.nr_merged_states_total = 0 + self.nr_merged_states = 0 + self.nr_red_states = 0 + + self.previous_time = None + + def reset(self, gsm: GeneralizedStateMerging): + self.gsm = gsm + self.log = [] + self.pta_size = None + self.nr_merged_states_total = 0 + self.nr_merged_states = 0 + self.nr_red_states = 0 + + self.previous_time = time.time() + + @min_lvl(1) + def pta_construction_done(self, root): + print(f'PTA Construction Time: {round(time.time() - self.previous_time, 2)}') + if self.lvl != 1: + states = root.get_all_nodes() + leafs = [state for state in states if len(state.transitions.keys()) == 0] + depth = [state.get_prefix_length() for state in leafs] + self.pta_size = len(states) + print(f'PTA has {len(states)} states leading to {len(leafs)} leafs') + print(f'min / avg / max depth : {min(depth)} / {sum(depth) / len(depth)} / {max(depth)}') + self.previous_time = time.time() + + def print_status(self): + reset_char = "\33[2K\r" + print_str = reset_char + f'Current automaton size: {self.nr_red_states}' + if self.lvl != 1 and not self.gsm.compatibility_on_futures: + print_str += f' Merged: {self.nr_merged_states_total} Remaining: {self.pta_size - self.nr_red_states - self.nr_merged_states_total}' + print(print_str, end="") + + @min_lvl(1) + def log_promote(self, node: Node): + self.log.append(["promote", (node.get_prefix(),)]) + self.nr_red_states += 1 + self.print_status() + + @min_lvl(1) + def log_merge(self, part: Partitioning): + self.log.append(["merge", (part.red.get_prefix(), part.blue.get_prefix())]) + self.nr_merged_states_total += len(part.full_mapping) - len(part.red_mapping) + self.nr_merged_states += 1 + self.print_status() + + @min_lvl(1) + def learning_done(self, root, red_states): + print(f'\nLearning Time: {round(time.time() - self.previous_time, 2)}') + print(f'Learned {len(red_states)} state automaton via {self.nr_merged_states} merges.') + if 2 < self.lvl: + root.visualize("model", self.gsm.output_behavior) + + +class MergeViolationDebugger(Instrumentation): + def __init__(self, ground_truth: Node): + super().__init__() + self.root = ground_truth + self.map: Dict[Node, Node] = dict() + self.log = [] + self.gsm: Optional[GeneralizedStateMerging] = None + + def reset(self, gsm: GeneralizedStateMerging): + self.gsm = gsm + self.map = dict() + self.log = [] + + def log_promote(self, new_red: Node): + new_red_prefix = new_red.get_prefix() + node = self.root.get_by_prefix(new_red_prefix) + old_red = self.map.get(node) + if old_red is None: + self.map[node] = new_red + self.log.append(("promote", new_red_prefix)) + elif old_red is not new_red: + print(f"Erroneous promotion detected:") + print(f" Ground truth: {node.get_prefix()}") + print(f" Representative (old): {old_red.get_prefix()}") + print(f" Representative (new): {new_red_prefix}") + self.log.append(("wrong promote", new_red_prefix)) + + def log_merge(self, part: Partitioning): + red_prefix = part.red.get_prefix() + blue_prefix = part.blue.get_prefix() + red_node = self.root.get_by_prefix(red_prefix) + blue_node = self.root.get_by_prefix(blue_prefix) + if red_node is None or blue_node is None: + self.log.append(("broken merge", red_prefix, blue_prefix)) + elif red_node is blue_node: + self.log.append(("merge", red_prefix, blue_prefix)) + else: + print(f"Erroneous merge detected:") + print(f" PTA red: {red_prefix}") + print(f" PTA blue: {blue_prefix}") + print(f" real red: {red_node.get_prefix()}") + print(f" real blue: {blue_node.get_prefix()}") + self.log.append(("wrong merge", red_prefix, blue_prefix)) diff --git a/aalpy/learning_algs/general_passive/Node.py b/aalpy/learning_algs/general_passive/Node.py new file mode 100644 index 00000000000..be663be7b2e --- /dev/null +++ b/aalpy/learning_algs/general_passive/Node.py @@ -0,0 +1,404 @@ +import math +import pathlib +from functools import total_ordering +from typing import Dict, Any, List, Tuple, Iterable, Callable, Union, TypeVar, Iterator, Optional +import pydot +from copy import copy + +from aalpy.automata import StochasticMealyMachine, StochasticMealyState, MooreState, MooreMachine, NDMooreState, \ + NDMooreMachine, Mdp, MdpState, MealyMachine, MealyState, Onfsm, OnfsmState +from aalpy.base import Automaton + +Key = TypeVar("Key") +Val = TypeVar("Val") + +OutputBehavior = str +OutputBehaviorRange = ["moore", "mealy"] + +TransitionBehavior = str +TransitionBehaviorRange = ["deterministic", "nondeterministic", "stochastic"] + +IOPair = Tuple[Any, Any] +IOTrace = List[IOPair] +IOExample = Tuple[Iterable[Any], Any] + +StateFunction = Callable[['Node'], str] +TransitionFunction = Callable[['Node', Any, Any], str] + + +def generate_values(base: list, step: Callable, backing_set=True): + if backing_set: + result = list(base) + control = set(base) + for val in result: + for new_val in step(val): + if new_val not in control: + control.add(new_val) + result.append(new_val) + return result + else: + result = list(base) + for val in result: + for new_val in step(val): + if new_val not in result: + result.append(new_val) + return result + + +def intersection_iterator(a: Dict[Key, Val], b: Dict[Key, Val]) -> Iterator[Tuple[Key, Val, Val]]: + missing = object() + for key, a_val in a.items(): + b_val = b.get(key, missing) + if b_val is missing: + continue + yield key, a_val, b_val + + +def union_iterator(a: Dict[Key, Val], b: Dict[Key, Val], default: Val = None) -> Iterator[Tuple[Key, Val, Val]]: + for key, a_val in a.items(): + b_val = b.get(key, default) + yield key, a_val, b_val + for key, b_val in b.items(): + if key in a: + continue + a_val = a.get(key, default) + yield key, a_val, b_val + + +# TODO maybe split this for maintainability (and perfomance?) +class TransitionInfo: + __slots__ = ["target", "count", "original_target", "original_count"] + + def __init__(self, target, count, original_target, original_count): + self.target: 'Node' = target + self.count: int = count + self.original_target: 'Node' = original_target + self.original_count: int = original_count + + +# TODO add custom pickling code that flattens the Node structure in order to circumvent running into recursion issues for large models +@total_ordering +class Node: + """ + Generic class for observably deterministic automata. + + The prefix is given as (minimal) list of IO pairs leading to that state. + We assume an initial transition to the initial state, which has to be reflected in the prefix. + This way, the output of the initial state for Moore machines can be encoded in its prefix. + + Transition count is preferred over state count as it allows to easily count transitions for non-tree-shaped automata + """ + __slots__ = ['transitions', 'predecessor', 'prefix_access_pair'] + + def __init__(self, prefix_access_pair, predecessor: 'Node' = None): + # TODO try single dict + self.transitions: Dict[Any, Dict[Any, TransitionInfo]] = dict() + self.predecessor: Node = predecessor + self.prefix_access_pair = prefix_access_pair + + def __lt__(self, other, compare_length_only=False): + own_l, other_l = self.get_prefix_length(), other.get_prefix_length() + if own_l != other_l: + return own_l < other_l + if compare_length_only: + return False + own_p = self.get_prefix() + other_p = other.get_prefix() + try: + return own_p < other_p + except TypeError: + return [str(x) for x in own_p] < [str(x) for x in other_p] + + def __eq__(self, other): + return self is other # TODO hack, does this lead to problems down the line? + + def __hash__(self): + return id(self) # TODO This is a hack + + # TODO implicit prefixes as currently implemented require O(length) time for prefix calculations (e.g. to determine the minimal blue node) + # other options would be to have more efficient explicit prefixes such as shared list representations + def get_prefix_length(self): + node = self + length = 0 + while node.predecessor: + node = node.predecessor + length += 1 + return length + + def get_prefix_output(self): + return self.prefix_access_pair[1] + + def get_prefix(self, include_output=True): + node = self + prefix = [] + while node.predecessor: + symbol = node.prefix_access_pair + if not include_output: + symbol = symbol[0] + prefix.append(symbol) + node = node.predecessor + prefix.reverse() + return prefix + + def get_or_create_transitions(self, in_sym) -> Dict[Any, TransitionInfo]: + t = self.transitions.get(in_sym) + if t is None: + t = dict() + self.transitions[in_sym] = t + return t + + def transition_iterator(self) -> Iterable[Tuple[Tuple[Any, Any], TransitionInfo]]: + for in_sym, transitions in self.transitions.items(): + for out_sym, node in transitions.items(): + yield (in_sym, out_sym), node + + def shallow_copy(self) -> 'Node': + node = Node(self.prefix_access_pair, self.predecessor) + for in_sym, t in self.transitions.items(): + d = dict() + for out_sym, v in t.items(): + d[out_sym] = copy(v) + node.transitions[in_sym] = d + return node + + def get_by_prefix(self, seq: IOTrace) -> Optional['Node']: + node: Node = self + for in_sym, out_sym in seq: + if in_sym is None: # ignore initial transition of Node.get_prefix() + continue + trans = node.transitions.get(in_sym) + if trans is None: + return None + t_info = trans.get(out_sym) + if t_info is None: + return None + node = t_info.target + return node + + def get_all_nodes(self) -> List['Node']: + def generator(state: Node): + for _, child in state.transition_iterator(): + yield child.target + + return generate_values([self], generator) + + def is_tree(self): + q: List['Node'] = [self] + backing_set = set() + while len(q) != 0: + current = q.pop(0) + for _, child in current.transition_iterator(): + t = child.target + if t in backing_set: + return False + q.append(t) + backing_set.add(t) + return True + + def to_automaton(self, output_behavior: OutputBehavior, transition_behavior: TransitionBehavior, + check_behavior=False, set_prefix=False) -> Automaton: + nodes = self.get_all_nodes() + + if check_behavior: + if output_behavior == "moore" and not self.is_moore(): + raise ValueError("Tried to obtain Moore machine from non-Moore structure") + if transition_behavior == "deterministic" and not self.is_deterministic(): + raise ValueError("Tried to obtain deterministic automaton from non-deterministic structure") + + type_dict = { + ("moore", "deterministic"): (MooreMachine, MooreState), + ("moore", "nondeterministic"): (NDMooreMachine, NDMooreState), + ("moore", "stochastic"): (Mdp, MdpState), + ("mealy", "deterministic"): (MealyMachine, MealyState), + ("mealy", "nondeterministic"): (Onfsm, OnfsmState), + ("mealy", "stochastic"): (StochasticMealyMachine, StochasticMealyState), + } + + AutomatonClass, StateClass = type_dict[(output_behavior, transition_behavior)] + + # create states + state_map = dict() + for i, node in enumerate(nodes): + state_id = f's{i}' + if output_behavior == "mealy": + state = StateClass(state_id) + elif output_behavior == "moore": + state = StateClass(state_id, node.get_prefix_output()) + state_map[node] = state + if set_prefix: + if transition_behavior == "deterministic": + state.prefix = tuple(p[0] for p in node.get_prefix()) + else: + state.prefix = tuple(node.get_prefix()) + else: + state.prefix = None + + initial_state = state_map[self] + + # add transitions + for node in nodes: + state = state_map[node] + for in_sym, transitions in node.transitions.items(): + total = sum(t.count for t in transitions.values()) + for out_sym, target_node in transitions.items(): + target_state = state_map[target_node.target] + count = target_node.count + if AutomatonClass is MooreMachine: + state.transitions[in_sym] = target_state + elif AutomatonClass is MealyMachine: + state.transitions[in_sym] = target_state + state.output_fun[in_sym] = out_sym + elif AutomatonClass is NDMooreMachine: + state.transitions[in_sym].append(target_state) + elif AutomatonClass is Onfsm: + state.transitions[in_sym].append((out_sym, target_state)) + elif AutomatonClass is Mdp: + state.transitions[in_sym].append((target_state, count / total)) + elif AutomatonClass is StochasticMealyMachine: + state.transitions[in_sym].append((target_state, out_sym, count / total)) + + return AutomatonClass(initial_state, list(state_map.values())) + + def visualize(self, path: Union[str, pathlib.Path], output_behavior: OutputBehavior = "mealy", format: str = "dot", + engine="dot", *, + state_label: StateFunction = None, state_color: StateFunction = None, + trans_label: TransitionFunction = None, trans_color: TransitionFunction = None, + state_props: Dict[str, StateFunction] = None, + trans_props: Dict[str, TransitionFunction] = None, + node_naming: StateFunction = None): + + # handle default parameters + if output_behavior not in ["moore", "mealy", None]: + raise ValueError(f"Invalid OutputBehavior {output_behavior}") + if state_props is None: + state_props = dict() + if trans_props is None: + trans_props = dict() + if state_label is None: + if output_behavior == "moore": + def state_label(node: Node): + return f'{node.get_prefix_output()} {node.count()}' + else: + def state_label(node: Node): + return f'{sum(t.count for _, t in node.transition_iterator())}' + if trans_label is None and "label" not in trans_props: + if output_behavior == "moore": + def trans_label(node: Node, in_sym, out_sym): + return f'{in_sym} [{node.transitions[in_sym][out_sym].count}]' + else: + def trans_label(node: Node, in_sym, out_sym): + return f'{in_sym} / {out_sym} [{node.transitions[in_sym][out_sym].count}]' + if state_color is None: + def state_color(x): return "black" + if trans_color is None: + def trans_color(x, y, z): return "black" + if node_naming is None: + node_dict = dict() + + def node_naming(node: Node): + if node not in node_dict: + node_dict[node] = f"s{len(node_dict)}" + return node_dict[node] + state_props = {"label": state_label, "color": state_color, "fontcolor": state_color, **state_props} + trans_props = {"label": trans_label, "color": trans_color, "fontcolor": trans_color, **trans_props} + + # create new graph + graph = pydot.Dot('automaton', graph_type='digraph') + + # graph.add_node(pydot.Node(str(self.prefix), label=state_label(self))) + nodes = self.get_all_nodes() + + # add nodes + for node in nodes: + arg_dict = {key: fun(node) for key, fun in state_props.items()} + graph.add_node(pydot.Node(node_naming(node), **arg_dict)) + + # add transitions + for node in nodes: + for in_sym, options in node.transitions.items(): + for out_sym, c in options.items(): + arg_dict = {key: fun(node, in_sym, out_sym) for key, fun in trans_props.items()} + graph.add_edge(pydot.Edge(node_naming(node), node_naming(c.target), **arg_dict)) + + # add initial state + # TODO maybe add option to parameterize this + graph.add_node(pydot.Node('__start0', shape='none', label='')) + graph.add_edge(pydot.Edge('__start0', node_naming(self), label='')) + + file_ext = format + if format == 'dot': + format = 'raw' + if format == 'raw': + file_ext = 'dot' + graph.write(path=str(path) + "." + file_ext, prog=engine, format=format) + + def add_data(self, data): + for seq in data: + self.add_trace(seq) + + def add_trace(self, data: IOTrace): + curr_node: Node = self + for in_sym, out_sym in data: + transitions = curr_node.get_or_create_transitions(in_sym) + info = transitions.get(out_sym) + if info is None: + node = Node((in_sym, out_sym), curr_node) + transitions[out_sym] = TransitionInfo(node, 1, node, 1) + else: + info.count += 1 + info.original_count += 1 + node = info.target + curr_node = node + + def add_example(self, data: IOExample): + # TODO add support for example based algorithms + raise NotImplementedError() + + @staticmethod + def createPTA(data, output_behavior) -> 'Node': + if output_behavior == "moore": + initial_output = data[0][0] + data = (d[1:] for d in data) + else: + initial_output = None + root_node = Node((None, initial_output), None) + root_node.add_data(data) + return root_node + + def is_locally_deterministic(self): + return all(len(item) == 1 for item in self.transitions.values()) + + def is_deterministic(self): + return all(node.is_locally_deterministic() for node in self.get_all_nodes()) + + def deterministic_compatible(self, other: 'Node'): + common_keys = filter(lambda key: key in self.transitions.keys(), other.transitions.keys()) + return all(list(self.transitions[key].keys()) == list(other.transitions[key].keys()) for key in common_keys) + + def is_moore(self): + output_dict = dict() + for node in self.get_all_nodes(): + for (in_sym, out_sym), transition in node.transition_iterator(): + child = transition.target + if child in output_dict.keys() and output_dict[child] != out_sym: + return False + else: + output_dict[child] = out_sym + return True + + def moore_compatible(self, other: 'Node'): + return self.get_prefix_output() == other.get_prefix_output() + + def local_log_likelihood_contribution(self): + llc = 0 + for in_sym, trans in self.transitions.items(): + total_count = 0 + for out_sym, info in trans.items(): + total_count += info.count + llc += info.count * math.log(info.count) + if total_count != 0: + llc -= total_count * math.log(total_count) + return llc + + def count(self): + return sum(trans.count for _, trans in self.transition_iterator()) diff --git a/aalpy/learning_algs/general_passive/ScoreFunctionsGSM.py b/aalpy/learning_algs/general_passive/ScoreFunctionsGSM.py new file mode 100644 index 00000000000..219a28d08bc --- /dev/null +++ b/aalpy/learning_algs/general_passive/ScoreFunctionsGSM.py @@ -0,0 +1,225 @@ +from math import sqrt, log +from typing import Callable, Dict, List, Iterable, Any + +from aalpy.learning_algs.general_passive.Node import Node, intersection_iterator, union_iterator, TransitionInfo + +LocalCompatibilityFunction = Callable[[Node, Node], bool] +ScoreFunction = Callable[[Dict[Node, Node]], Any] +AggregationFunction = Callable[[Iterable], Any] + + +class ScoreCalculation: + def __init__(self, local_compatibility: LocalCompatibilityFunction = None, score_function: ScoreFunction = None): + # This is a hack that gives a simple implementation where we can easily + # - determine whether the default is overridden (for optimization) + # - override behavior in a functional way by providing the functions as arguments (no extra class) + # - override behavior in a stateful way by implementing a new class that provides `local_compatibility` and / or `score_function` methods + if not hasattr(self, "local_compatibility"): + self.local_compatibility: LocalCompatibilityFunction = local_compatibility or self.default_local_compatibility + if not hasattr(self, "score_function"): + self.score_function: ScoreFunction = score_function or self.default_score_function + + def reset(self): + pass + + @staticmethod + def default_local_compatibility(a: Node, b: Node): + return True + + @staticmethod + def default_score_function(part: Dict[Node, Node]): + return True + + def has_score_function(self): + return self.score_function is not self.default_score_function + + def has_local_compatibility(self): + return self.local_compatibility is not self.default_local_compatibility + + +def hoeffding_compatibility(eps, compare_original=True) -> LocalCompatibilityFunction: + eps_fact = sqrt(0.5 * log(2 / eps)) + count_name = "original_count" if compare_original else "count" + transition_dummy = TransitionInfo(None, 0, None, 0) + + def similar(a: Node, b: Node): + # iterate over inputs that are common to both states + for in_sym, a_trans, b_trans in intersection_iterator(a.transitions, b.transitions): + # could create appropriate dict here + a_total, b_total = (sum(getattr(x, count_name) for x in trans.values()) for trans in (a_trans, b_trans)) + if a_total == 0 or b_total == 0: + continue # parameter combinations require this check + threshold = eps_fact * (sqrt(1 / a_total) + sqrt(1 / b_total)) + # iterate over outputs that appear in either distribution + for out_sym, a_info, b_info in union_iterator(a_trans, b_trans, transition_dummy): + ac, bc = (getattr(x, count_name) for x in (a_info, b_info)) + if abs(ac / a_total - bc / b_total) > threshold: + return False + return True + + return similar + + +class ScoreWithKTail(ScoreCalculation): + """Applies k-Tails to a compatibility function: Compatibility is only evaluated up to a certain depth k.""" + + def __init__(self, other_score: ScoreCalculation, k: int): + super().__init__(None, other_score.score_function) + self.other_score = other_score + self.k = k + + self.depth_offset = None + + def reset(self): + self.other_score.reset() + self.depth_offset = None + + def local_compatibility(self, a: Node, b: Node): + # assuming b is tree shaped. + if self.depth_offset is None: + self.depth_offset = b.get_prefix_length() + depth = b.get_prefix_length() - self.depth_offset + if self.k <= depth: + return True + + return self.other_score.local_compatibility(a, b) + + +class ScoreWithSinks(ScoreCalculation): + """This class allows rejecting merge candidates based on additional criteria for the initial merge""" + + def __init__(self, other_score: ScoreCalculation, sink_cond: Callable[[Node], bool], allow_sink_merge=True): + super().__init__(None, other_score.score_function) + self.other_score = other_score + self.sink_cond = sink_cond + self.allow_sink_merge = allow_sink_merge + + self.is_first = True + + def reset(self): + self.other_score.reset() + self.is_first = True + + def local_compatibility(self, a: Node, b: Node): + if self.is_first: + self.is_first = False + a_sink, b_sink = self.sink_cond(a), self.sink_cond(b) + if a_sink != b_sink: + return False + if a_sink and b_sink and not self.allow_sink_merge: + return False + return self.other_score.local_compatibility(a, b) + + +class ScoreCombinator(ScoreCalculation): + """ + This class is used to combine several scoring / compatibility mechanisms by aggregating the results of the + individual methods in a user defined manner. It uses generator expressions to allow for short circuit evaluation. + """ + + def __init__(self, scores: List[ScoreCalculation], aggregate_compatibility: AggregationFunction = None, + aggregate_score: AggregationFunction = None): + super().__init__() + self.scores = scores + self.aggregate_compatibility = aggregate_compatibility or self.default_aggregate_compatibility + self.aggregate_score = aggregate_score or self.default_aggregate_score + + def reset(self): + for score in self.scores: + score.reset() + + def local_compatibility(self, a: Node, b: Node): + return self.aggregate_compatibility(score.local_compatibility(a, b) for score in self.scores) + + def score_function(self, part: Dict[Node, Node]): + return self.aggregate_score(score.score_function(part) for score in self.scores) + + @staticmethod + def default_aggregate_compatibility(compatibility_iterable): + """Commits to the first value that is not inconclusive (== None). Accepts if in doubt.""" + for compat in compatibility_iterable: + if compat is None: + continue + return compat + return True + + @staticmethod + def default_aggregate_score(score_iterable): + return list(score_iterable) + + +def local_to_global_compatibility(local_fun: LocalCompatibilityFunction) -> ScoreFunction: + """ + Converts a local compatibility function to a global score function by evaluating the local compatibility for each of + the new partitions with all nodes that make up that partition. One use case for this is to evaluate a local score + function after the partitions are complete. The order of arguments for the local compatibility function is + partition, original. + """ + + def fun(part: Dict[Node, Node]): + for old_node, new_node in part.items(): + if local_fun(new_node, old_node) is False: # Follows local_fun(red, blue) + return False + return True + + return fun + + +def differential_info(part: Dict[Node, Node]): + relevant_nodes_old = list(part.keys()) + relevant_nodes_new = set(part.values()) + + partial_llh_old = sum(node.local_log_likelihood_contribution() for node in relevant_nodes_old) + partial_llh_new = sum(node.local_log_likelihood_contribution() for node in relevant_nodes_new) + + num_params_old = sum(1 for node in relevant_nodes_old for _ in node.transition_iterator()) + num_params_new = sum(1 for node in relevant_nodes_new for _ in node.transition_iterator()) + + return partial_llh_old - partial_llh_new, num_params_old - num_params_new + + +def transform_score(score, transform: Callable): + if isinstance(score, Callable): + return lambda *args: transform(score(*args)) + if isinstance(score, ScoreCalculation): + score.score_function = lambda *args: transform(score.score_function(*args)) + return score + return transform(score) + + +def make_greedy(score): + return transform_score(score, lambda x: x is not False) + + +def lower_threshold(score, thresh): + return transform_score(score, lambda x: x if thresh < x else False) + + +def AIC_score(alpha=0) -> ScoreFunction: + def score(part: Dict[Node, Node]): + llh_diff, param_diff = differential_info(part) + return lower_threshold(param_diff - llh_diff, alpha) + + return score + + +def EDSM_frequency_score(min_evidence=-1) -> ScoreFunction: + def score(part: Dict[Node, Node]): + total_evidence = 0 + for old_node, new_node in part.items(): + for in_sym, trans_old, trans_new in intersection_iterator(old_node.transitions, new_node.transitions): + for out_sym, t_info_old, t_info_new in intersection_iterator(trans_old, trans_new): + if t_info_old.count != t_info_new.count: + total_evidence += t_info_old.count + return lower_threshold(total_evidence, min_evidence) + + return score + + +def EDSM_score(min_evidence=-1) -> ScoreFunction: + def score(part: Dict[Node, Node]): + nr_partitions = len(set(part.values())) + nr_merged = len(part) + return lower_threshold(nr_merged - nr_partitions, min_evidence) + + return score diff --git a/aalpy/learning_algs/general_passive/__init__.py b/aalpy/learning_algs/general_passive/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/aalpy/utils/FileHandler.py b/aalpy/utils/FileHandler.py index 5e543718110..5d2551f9d60 100644 --- a/aalpy/utils/FileHandler.py +++ b/aalpy/utils/FileHandler.py @@ -7,11 +7,11 @@ from aalpy.automata import Dfa, MooreMachine, Mdp, Onfsm, MealyState, DfaState, MooreState, MealyMachine, \ MdpState, StochasticMealyMachine, StochasticMealyState, OnfsmState, MarkovChain, McState, Sevpa, SevpaState, \ - SevpaTransition, Vpa, VpaState, VpaTransition + SevpaTransition, Vpa, VpaState, VpaTransition, NDMooreMachine file_types = ['dot', 'png', 'svg', 'pdf', 'string'] automaton_types = {Dfa: 'dfa', MealyMachine: 'mealy', MooreMachine: 'moore', Mdp: 'mdp', - StochasticMealyMachine: 'smm', Onfsm: 'onfsm', MarkovChain: 'mc', + StochasticMealyMachine: 'smm', Onfsm: 'onfsm', NDMooreMachine: 'ndmoore', MarkovChain: 'mc', Sevpa: 'sevpa', Vpa: 'vpa'} @@ -31,7 +31,7 @@ def _get_node(state, automaton_type): return Node(state.state_id, label=_wrap_label(state.state_id)) if automaton_type == 'mealy': return Node(state.state_id, label=_wrap_label(state.state_id)) - if automaton_type == 'moore': + if automaton_type in ['moore', 'ndmoore']: return Node(state.state_id, label=_wrap_label(f'{state.state_id}|{state.output}'), shape='record', style='rounded') if automaton_type == 'onfsm': @@ -68,6 +68,13 @@ def _add_transition_to_graph(graph, state, automaton_type, display_same_state_tr if not display_same_state_trans and state.state_id == s[1].state_id: continue graph.add_edge(Edge(state.state_id, s[1].state_id, label=_wrap_label(f'{i}/{s[0]}'))) + if automaton_type == 'ndmoore': + for i in state.transitions.keys(): + new_states = state.transitions[i] + for new_state in new_states: + if not display_same_state_trans and state.state_id == new_state.state_id: + continue + graph.add_edge(Edge(state.state_id, new_state.state_id, label=_wrap_label(f'{i}'))) if automaton_type == 'mc': for new_state, prob in state.transitions: prob = round(prob, round_floats) if round_floats else prob diff --git a/setup.py b/setup.py index 0ab3188bbc0..74d7b270660 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ version='1.4.3', packages=['aalpy', 'aalpy.base', 'aalpy.SULs', 'aalpy.utils', 'aalpy.oracles', 'aalpy.automata', 'aalpy.learning_algs', 'aalpy.learning_algs.stochastic', 'aalpy.learning_algs.deterministic', - 'aalpy.learning_algs.non_deterministic', + 'aalpy.learning_algs.non_deterministic', 'aalpy.learning_algs.general_passive', 'aalpy.learning_algs.stochastic_passive', 'aalpy.learning_algs.deterministic_passive'], url='https://github.com/DES-Lab/AALpy', license='MIT',