Skip to content

Commit b32fc46

Browse files
committed
more work on estimators
1 parent 9d217d2 commit b32fc46

File tree

4 files changed

+380
-98
lines changed

4 files changed

+380
-98
lines changed

src/hypofuzz/bayes.py

Lines changed: 209 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,90 +1,229 @@
11
import math
22
from collections.abc import Sequence
3-
from typing import TYPE_CHECKING, Any
3+
from dataclasses import dataclass
4+
from itertools import takewhile
5+
from random import Random
6+
from typing import TYPE_CHECKING, Any, Optional, TypeVar
47

58
if TYPE_CHECKING:
69
from hypofuzz.hypofuzz import FuzzTarget
710

11+
T = TypeVar("T")
12+
13+
# in the absence of any knowledge about worker lifetimes, assume a worker lives
14+
# for 5 minutes.
15+
DEFAULT_EXPECTED_LIFETIME_ESTIMATOR = 60 * 5
16+
17+
18+
@dataclass
19+
class CurrentWorker:
20+
nodeids: Sequence[str]
21+
e_lifetime: float
22+
23+
24+
@dataclass
25+
class DistributeNodesTarget:
26+
nodeid: str
27+
rates: "BehaviorRates"
28+
e_startup_time: float
29+
30+
31+
@dataclass
32+
class BehaviorRates:
33+
# An estimator for the number of behaviors the next input will discover. This
34+
# will be between 0 and 1.
35+
per_input: float
36+
# An estimator for the number of behaviors discovered per second for a target,
37+
# assuming one worker is fuzzing this target continuously over that second.
38+
per_second: float
39+
40+
41+
def _min_values(values: Sequence[T], key: Any) -> Sequence[T]:
42+
candidates = sorted(
43+
[(value, key(value)) for value in values], key=lambda item: item[1]
44+
)
45+
min_value = candidates[0][1]
46+
return [
47+
item[0] for item in takewhile(lambda item: item[1] == min_value, candidates)
48+
]
49+
850

951
def distribute_nodes(
10-
nodeids: Sequence[str], estimators: Sequence[float], *, n: int
52+
targets: Sequence[DistributeNodesTarget],
53+
*,
54+
n: int,
55+
current_workers: Optional[Sequence[CurrentWorker]] = None,
1156
) -> tuple[tuple[str, ...], ...]:
1257
# We have x nodes node_i, each with an estimator \hat{v}_i for "behaviors per
13-
# second". We have n bins (processes), and we want to distribute node_i
14-
# into the n bins such that we maximize overall behaviors per second.
58+
# second". We have n bins (worker), and we want to distribute node_i
59+
# into the n bins such that we maximize the the sum of worker_behaviors.
60+
#
61+
# The estimator for the number of behaviors for a worker is given by
62+
# e_worker_behaviors. Instead of trying for the optimal solution of maximizing
63+
# the sum of worker_behaviors, we instead maximize the smallest worker_behaviors
64+
# quantity.
1565
#
16-
# If our estimators \hat{v}_i were static, then this is trivial: the
17-
# overall behaviors per second is `sum from i=1 to c of max(p_i)`. Of course,
18-
# our estimators are *not* static. Which means we are optimizing over something
19-
# more complicated - a set of multi-armed bandit problems perhaps?
66+
# This is related to "makespan minimization", and is a classic bin packing
67+
# problem. Optimality of this problem is NP complete, so we instead approximate
68+
# the optimal solution with a greedy one, specifically a variant of
69+
# "longest processing time first scheduling": we first sort the nodes in
70+
# increasing order of their estimator. Then, for each mode, we check which
71+
# worker has the lowest worker_behaiors, and assign the node to that worker.
72+
# Since we are iterating in increasing order of estimator, we know that adding
73+
# a node to a worker will increase that worker's worker_behaviors (unless the
74+
# worker's scheduling algorithm for targets is literally adversarial, ie
75+
# adding a higher-than-average value target decreases its expected behaviors
76+
# per second, which we will assume is not the case).
2077
#
21-
# A more naive quantity to maximize (minimize) is the largest bin sum.
22-
# So we are minimizing `max(sum(p_i) for i in c)`. This is related to
23-
# "makespan minimization", and is a classic bin packing problem. Optimality
24-
# is NP complete, so we approximate the optimal solution with a greedy one,
25-
# specifically "longest processing time first scheduling".
78+
# Optionally, the current assignment `current_workers` of node ids to workers
79+
# can be passed. This incorporates an overhead cost to switching a nodeid to a
80+
# different worker. The algorithm is the standard bin packing algorithm, but
81+
# with a penalty to a node being assigned to a worker other than its current
82+
# worker.
2683
#
27-
# (intuitively, we assign the "best" nodes to processes first. So with
28-
# e.g. 10 nodes with an estimator of \hat{v}_i = 1 behavior per second (which
29-
# is really good!) they would all go to different processes (at least until
30-
# the cap of n processes), which is what we want.
31-
32-
assert len(nodeids) == len(estimators)
33-
# estimators of 0 are mathematically valid, but semantically weird, and much
34-
# more likely to be indicative of a logic error
35-
assert all(estimator > 0 for estimator in estimators)
36-
37-
bins: list[list[Any]] = [[] for _ in range(n)]
38-
nodes = [
39-
{"nodeid": nodeid, "estimator": estimator}
40-
for nodeid, estimator in zip(nodeids, estimators)
84+
# This penalty cost of switching a nodeid between workers is
85+
# worker_behaviors_per_second * node_startup_cost_seconds, ie the number of
86+
# behaviors we expect to lose by spending time starting up this node.
87+
88+
random = Random()
89+
if current_workers is None:
90+
current_workers = [CurrentWorker(nodeids=[], e_lifetime=0.0) for _ in range(n)]
91+
92+
assert len(current_workers) == n
93+
94+
# estimators of 0 are mathematically valid, but can lead to bad/pathological
95+
# algorithm outcomes
96+
assert all(target.rates.per_second > 0 for target in targets)
97+
assert all(target.rates.per_input > 0 for target in targets)
98+
99+
# return partitions in the same iteration order they were passed in current_workers
100+
workers: list[dict[str, Any]] = [
101+
{"current_worker": worker, "targets": []} for worker in current_workers
41102
]
42103

43-
# first, we sort the node_i in decreasing order by their estimator.
44-
nodes.sort(key=lambda node: node["estimator"], reverse=True) # type: ignore
45-
46-
# If we have fewer than `n` nodes, we repeat the list of nodes in decreasing
47-
# order of their estimator until we reach `n` nodes. This ensures every
48-
# processes receives a node (in fact, in this case, exactly one).
49-
node_idx = 0
50-
while len(nodes) < n:
51-
nodes.append(nodes[node_idx])
52-
node_idx = (node_idx + 1) % len(nodes)
53-
54-
# then, we assign each node_i to the partition with the least sum.
55-
for node in nodes:
56-
smallest_bin = min(
57-
bins,
58-
key=lambda bin: sum(node["estimator"] for node in bin),
104+
# first, we sort the target in increasing order by their estimator.
105+
targets = sorted(targets, key=lambda target: target.rates.per_second)
106+
107+
# If we have fewer than `n` targets, we repeat the list of targets in decreasing
108+
# order of their estimator until we reach `n` targets. This ensures every
109+
# worker receives at least one target (in fact, in this case, exactly one).
110+
target_idx = 0
111+
while len(targets) < n:
112+
# `targets` are in increasing order, so we index negatively to get
113+
# a decreasing order
114+
targets.append(targets[-target_idx])
115+
target_idx = (target_idx + 1) % len(targets)
116+
117+
# then, we assign each target to the worker with the worst worker_behaviors.
118+
# Since we're iterating over the targets in increasing order of behaviors
119+
# per-second, adding a target to a worker will always increase its
120+
# worker_behaviors.
121+
def worker_score(
122+
worker: dict[str, Any], *, target: Optional[DistributeNodesTarget] = None
123+
) -> float:
124+
e_lifetime: float = worker["current_worker"].e_lifetime
125+
worker_rates = e_worker_rates(
126+
target_rates=[target.rates for target in worker["targets"]],
127+
)
128+
offset = 0.0
129+
if target is not None and target.nodeid not in worker["current_worker"].nodeids:
130+
# Add a penalty for switching nodes between workers. Since the ordering
131+
# quantity is the e_worker_behaviors estimator of lifetime worker
132+
# behaviors, we want to allow a node to switch workers if the ev
133+
# differential is greater than the number of behaviors we expect to
134+
# lose from spending time starting up this worker.
135+
#
136+
# And the number of behaviors we expect to lose is the behaviors per
137+
# second estimator for the worker, times the estimator for the startup
138+
# time of this node.
139+
#
140+
# We are choosing the worker with the lowest score to add this node to,
141+
# so if we want to encourage this node to be assigned to its current
142+
# worker, we want that worker to have a low score, which means we
143+
# want to increase the score of all other workers. So the offset here
144+
# should be positive.
145+
offset = worker_rates.per_second * target.e_startup_time
146+
147+
# to avoid crazy rebalancing during the initial startup phase, don't
148+
# work with small lifetime estimators
149+
e_lifetime = max(e_lifetime, DEFAULT_EXPECTED_LIFETIME_ESTIMATOR)
150+
return (worker_rates.per_second * e_lifetime) + offset
151+
152+
for target in targets:
153+
# find all the workers with the minimum value score, and randomly assign
154+
# this target to one of them. Normally there won't be ties, and the target
155+
# simply goes to the worst worker. But when fuzzing for the first time
156+
# (or after a db wipe) where all targets have the same estimators, we
157+
# don't want to end in an assignment where one worker is given n - 1 nodes
158+
# and the other is given just 1.
159+
smallest_workers = _min_values(
160+
workers,
161+
key=lambda worker: worker_score(worker, target=target),
162+
)
163+
smallest_worker = random.choice(smallest_workers)
164+
165+
score_before = worker_score(smallest_worker)
166+
smallest_worker["targets"].append(target)
167+
# ignore float rounding errors for our invariant check
168+
assert worker_score(smallest_worker) - score_before >= -1e-6, (
169+
score_before,
170+
worker_score(smallest_worker),
59171
)
60-
smallest_bin.append(node)
61172

62-
return tuple(tuple(node["nodeid"] for node in bin) for bin in bins)
173+
return tuple(
174+
tuple(target.nodeid for target in worker["targets"]) for worker in workers
175+
)
63176

64177

65-
# for the behaviors estimators, we should incorporate a lookback across the
178+
# TODO for the behaviors estimators, we should incorporate a lookback across the
66179
# history of workers for this test. Give higher weight to newer estimators
67180
# (proportional to their confidence ie sample size).
68181

69182

70-
def behaviors_per_input(target: "FuzzTarget") -> float:
71-
# an estimator for the number of behaviors the next input will discover.
183+
def e_target_rates(target: "FuzzTarget") -> BehaviorRates:
184+
# per_input computation
72185
since = target.provider.since_new_behavior
73-
return (1 / since) if since > 0 else 1
186+
per_input = (1 / since) if since > 0 else 1
74187

75-
76-
def behaviors_per_second(target: "FuzzTarget") -> float:
77-
# an estimator for the number of behaviors discovered per second, assuming
78-
# one process is fuzzing this target continuously over that second.
79-
# This is a simple adjustment of behaviors_per_input for the test runtime.
188+
# per_second computation
80189
ninputs = target.provider.ninputs
81190
elapsed_time = target.provider.elapsed_time
82191

83192
if elapsed_time == 0:
84-
return 1
85-
86-
inputs_per_second = ninputs / elapsed_time
87-
return behaviors_per_input(target) * inputs_per_second
193+
per_second = 1.0
194+
else:
195+
inputs_per_second = ninputs / elapsed_time
196+
per_second = per_input * inputs_per_second
197+
198+
return BehaviorRates(per_input=per_input, per_second=per_second)
199+
200+
201+
def e_worker_lifetime(current_lifetime: float) -> float:
202+
"""
203+
An estimator for the total lifetime of a worker.
204+
"""
205+
# We use the doomsday-argument estimator that the total lifetime is twice the
206+
# current lifetime. In the future, this could incorporate past worker
207+
# lifetimes as well.
208+
return current_lifetime * 2
209+
210+
211+
def e_worker_rates(*, target_rates: Sequence[BehaviorRates]) -> BehaviorRates:
212+
"""
213+
An estimator for the total number of behaviors that will be discovered by
214+
a worker in its lifetime. Derived from the worker_lifetime and
215+
behaviors_per_second estimators.
216+
"""
217+
weights = bandit_weights(target_rates)
218+
# the expected behavior rates of a worker is
219+
# sum(probability * expected_value) for each of its targets.
220+
# Note that this is tightly dependent on the sampling algorithm used in
221+
# practice by the workers. If that changes (to e.g. thompson sampling), our
222+
# estimators will need to change to use the same sampling algorithm as well.
223+
return BehaviorRates(
224+
per_input=sum(p * rates.per_input for p, rates in zip(weights, target_rates)),
225+
per_second=sum(p * rates.per_second for p, rates in zip(weights, target_rates)),
226+
)
88227

89228

90229
def softmax(values: list[float]) -> list[float]:
@@ -96,3 +235,15 @@ def softmax(values: list[float]) -> list[float]:
96235

97236
total = sum(softmaxed)
98237
return [value / total for value in softmaxed]
238+
239+
240+
def bandit_weights(behavior_rates: Sequence[BehaviorRates]) -> list[float]:
241+
"""
242+
Returns the probability that each target should be chosen, as a solution
243+
to the multi-armed-bandit problem.
244+
"""
245+
246+
# choose the next target to fuzz with probability equal to the softmax
247+
# of its expected value (behaviors per second), aka boltzmann exploration
248+
per_second_estimators = [rates.per_second for rates in behavior_rates]
249+
return softmax(per_second_estimators)

src/hypofuzz/entrypoint.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,10 @@ def _fuzz_impl(n_processes: int, pytest_args: tuple[str, ...]) -> None:
161161
if n_processes <= 1:
162162
# if we only have one process, skip the FuzzWorkerHub abstraction (which
163163
# would cost a process) and just start a FuzzWorker with constant node_ids
164-
shared_state = {"hub_state": {"nodeids": nodeids}, "worker_state": {}}
164+
shared_state = {
165+
"hub_state": {"nodeids": nodeids},
166+
"worker_state": {"nodeids": {}},
167+
}
165168
_start_worker(pytest_args=pytest_args, shared_state=shared_state)
166169
else:
167170
hub = FuzzWorkerHub(

0 commit comments

Comments
 (0)