Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion aalpy/automata/Dfa.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def compute_output_seq(self, state, sequence):

def execute_sequence(self, origin_state, seq):
if not seq:
return origin_state.output
self.current_state = origin_state
return self.current_state.output
return super(Dfa, self).execute_sequence(origin_state, seq)


Expand Down
3 changes: 2 additions & 1 deletion aalpy/automata/MooreMachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def compute_output_seq(self, state, sequence):

def execute_sequence(self, origin_state, seq):
if not seq:
return origin_state.output
self.current_state = origin_state
return self.current_state.output
return super(MooreMachine, self).execute_sequence(origin_state, seq)

def to_state_setup(self):
Expand Down
7 changes: 3 additions & 4 deletions aalpy/learning_algs/adaptive/AdaptiveObservationTree.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,11 +398,10 @@ def find_basis_frontier_pair(self, frontier_state, frontier_state_access):
if frontier_state and Apartness.compute_witness(basis_state, frontier_state, self) is not None:
continue

reference.execute_sequence(
reference.initial_state, frontier_state_access)
reference.execute_sequence(reference.initial_state, frontier_state_access)
state_one = reference.current_state
reference.execute_sequence(
reference.initial_state, basis_state_access)

reference.execute_sequence(reference.initial_state, basis_state_access)
state_two = reference.current_state

sep_seq = self.find_distinguishing_seq_partial(reference,
Expand Down
1 change: 0 additions & 1 deletion aalpy/learning_algs/deterministic/ClassificationTree.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ def process_counterexample(self, cex: tuple, hypothesis, cex_processing_fun):
u = cex[:len(cex) - len(v) - 1]
assert (*u, a, *v) == cex

hypothesis.reset_to_initial()
hypothesis.execute_sequence(hypothesis.initial_state, u)
u_state = hypothesis.current_state

Expand Down
57 changes: 29 additions & 28 deletions aalpy/oracles/KWayStateCoverageEqOracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@ class KWayStateCoverageEqOracle(Oracle):
random walk at the end.
"""

def __init__(self, alphabet: list, sul: SUL, k=2, random_walk_len=20, method='permutations'):
def __init__(self, alphabet: list, sul: SUL, k=2, random_walk_len=20,
method='permutations',
num_test_lower_bound=None,
num_test_upper_bound=None):
"""

Args:

alphabet: input alphabet

sul: system under learning

k: k value used for k-wise combinations/permutations of states

random_walk_len: length of random walk performed at the end of each combination/permutation

method: either 'combinations' or 'permutations'
num_test_lower_bound= either None or number a minimum number of test-cases to be performed in each testing round
num_test_upper_bound= either None or number a maximum number of test-cases to be performed in each testing round

"""
super().__init__(alphabet, sul)
assert k > 1 and method in ['combinations', 'permutations']
Expand All @@ -32,39 +34,24 @@ def __init__(self, alphabet: list, sul: SUL, k=2, random_walk_len=20, method='pe
self.fun = combinations if method == 'combinations' else permutations
self.random_walk_len = random_walk_len

def find_cex(self, hypothesis):

if len(hypothesis.states) == 1:
for _ in range(self.random_walk_len):
path = choices(self.alphabet, k=self.random_walk_len)
hypothesis.reset_to_initial()
self.sul.post()
self.sul.pre()
for i, p in enumerate(path):
out_sul = self.sul.step(p)
out_hyp = hypothesis.step(p)
self.num_steps += 1
self.num_test_lower_bound = num_test_lower_bound
self.num_test_upper_bound = num_test_upper_bound

if out_sul != out_hyp:
self.sul.post()
return path[:i + 1]
def find_cex(self, hypothesis):

states = hypothesis.states
shuffle(states)
shuffle(hypothesis.states)

test_cases = []
for comb in self.fun(hypothesis.states, self.k):
prefixes = frozenset([c.prefix for c in comb])
if prefixes in self.cache:
continue
else:
self.cache.add(prefixes)
self.cache.add(prefixes)

index = 0
path = comb[0].prefix

# in case of non-strongly connected automata test case might not be possible as a path between 2 states
# might not exist
possible_test_case = True

while index < len(comb) - 1:
path_between_states = hypothesis.get_shortest_path(comb[index], comb[index + 1])
index += 1
Expand All @@ -79,9 +66,23 @@ def find_cex(self, hypothesis):
continue

path += tuple(choices(self.alphabet, k=self.random_walk_len))
test_cases.append(path)

self.reset_hyp_and_sul(hypothesis)
# lower bound (also accounts for single state hypothesis when a lower bound is not defined)
lower_bound = self.num_test_lower_bound
if len(hypothesis.states) == 1 and lower_bound is None:
lower_bound = 50

while lower_bound is not None and len(test_cases) < lower_bound:
path = tuple(choices(self.alphabet, k=self.random_walk_len))
test_cases.append(path)

# upper bound
if self.num_test_upper_bound is not None:
test_cases = test_cases[:self.num_test_upper_bound]

for path in test_cases:
self.reset_hyp_and_sul(hypothesis)
for i, p in enumerate(path):
out_sul = self.sul.step(p)
out_hyp = hypothesis.step(p)
Expand Down
50 changes: 45 additions & 5 deletions aalpy/oracles/KWayTransitionCoverageEqOracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ def __init__(self, alphabet: list, sul: SUL, k: int = 2, method='random',
max_path_len: int = 50,
max_number_of_steps: int = 0,
optimize: str = 'steps',
random_walk_len=10):
random_walk_len=10,
num_test_lower_bound=None,
num_test_upper_bound=None,
):
"""
Args:

Expand All @@ -33,6 +36,8 @@ def __init__(self, alphabet: list, sul: SUL, k: int = 2, method='random',
max_number_of_steps: maximum number of steps that will be executed on the SUL (0 = no limit)
optimize: minimize either the number of 'steps' or 'queries' that are executed
random_walk_len: the number of steps that are added by 'prefix' generated paths
num_test_lower_bound= either None or number a minimum number of test-cases to be performed in each testing round
num_test_upper_bound= either None or number a maximum number of test-cases to be performed in each testing round

"""
super().__init__(alphabet, sul)
Expand All @@ -48,26 +53,62 @@ def __init__(self, alphabet: list, sul: SUL, k: int = 2, method='random',
self.optimize = optimize
self.random_walk_len = random_walk_len

# to account for cases where number of test-cases generated by the oracle is either too big or too small
# num_test_lower_bound does not affect k-way transition coverage, but limiting the upper bound might as not all
# tests will be executed
self.num_test_lower_bound = num_test_lower_bound
self.num_test_upper_bound = num_test_upper_bound

self.cached_paths = list()

def find_cex(self, hypothesis: Automaton):
if self.method == 'random':
paths = self.generate_random_paths(hypothesis) + self.cached_paths
self.cached_paths = self.greedy_set_cover(hypothesis, paths)

print('Num TC', len(self.cached_paths))
for path in self.cached_paths:
# to account for lower bound
additional_test_cases = []
num_executed_tcs = 0
if self.num_test_lower_bound is not None:
while len(self.cached_paths) + len(additional_test_cases) < self.num_test_lower_bound:
random_length = randint(self.k, self.max_path_len)
steps = tuple(choices(self.alphabet, k=random_length))
additional_test_cases.append(self.create_path(hypothesis, steps))

for path in list(self.cached_paths + additional_test_cases):
if self.num_test_upper_bound is not None and num_executed_tcs >= self.num_test_upper_bound:
break

counter_example = self.check_path(hypothesis, path.steps)
num_executed_tcs += 1

if counter_example is not None:
return counter_example

elif self.method == 'prefix':
generated_paths = []
# generate tcs with prefix coverage
for steps in self.generate_prefix_steps(hypothesis):
counter_example = self.check_path(hypothesis, steps)
generated_paths.append(self.create_path(hypothesis, steps))

if self.num_test_lower_bound is not None and len(generated_paths) >= self.num_test_lower_bound:
break

# add random paths if needed
while self.num_test_lower_bound is not None and len(generated_paths) < self.num_test_lower_bound:
random_length = randint(self.k, self.max_path_len)
steps = tuple(choices(self.alphabet, k=random_length))
generated_paths.append(self.create_path(hypothesis, steps))

# limit upper bound
if self.num_test_upper_bound is not None:
generated_paths = generated_paths[:self.num_test_upper_bound]

for path in generated_paths:
counter_example = self.check_path(hypothesis, path.steps)
if counter_example is not None:
return counter_example

return None

def greedy_set_cover(self, hypothesis: Automaton, paths: list):
Expand All @@ -90,7 +131,6 @@ def greedy_set_cover(self, hypothesis: Automaton, paths: list):
paths = [self.create_path(hypothesis, steps) for steps in self.generate_prefix_steps(hypothesis)]

if self.max_number_of_steps != 0 and step_count > self.max_number_of_steps:
print("stop")
break

return result
Expand Down
Loading