Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 95 additions & 6 deletions capa/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,18 @@ def evaluate(self, features: FeatureSet, short_circuit=True):
# short circuit
return Result(False, self, results)

return Result(True, self, results)
locations = set()
for res in results:
locations.update(res.locations)
return Result(True, self, results, locations=locations)
else:
results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children]
success = all(results)
return Result(success, self, results)
locations = set()
if success:
for res in results:
locations.update(res.locations)
return Result(success, self, results, locations=locations)


class Or(Statement):
Expand All @@ -153,13 +160,17 @@ def evaluate(self, features: FeatureSet, short_circuit=True):
results.append(result)
if result:
# short circuit as soon as we hit one match
return Result(True, self, results)
return Result(True, self, results, locations=result.locations)

return Result(False, self, results)
else:
results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children]
success = any(results)
return Result(success, self, results)
locations = set()
for res in results:
if res.success:
locations.update(res.locations)
return Result(success, self, results, locations=locations)


class Not(Statement):
Expand Down Expand Up @@ -207,7 +218,11 @@ def evaluate(self, features: FeatureSet, short_circuit=True):

if satisfied_children_count >= self.count:
# short circuit as soon as we hit the threshold
return Result(True, self, results)
locations = set()
for res in results:
if res.success:
locations.update(res.locations)
return Result(True, self, results, locations=locations)

return Result(False, self, results)
else:
Expand All @@ -217,7 +232,12 @@ def evaluate(self, features: FeatureSet, short_circuit=True):
#
# we can't use `if child is True` because the instance is not True.
success = sum([1 for child in results if bool(child) is True]) >= self.count
return Result(success, self, results)
locations = set()
if success:
for res in results:
if res.success:
locations.update(res.locations)
return Result(success, self, results, locations=locations)


class Range(Statement):
Expand Down Expand Up @@ -299,6 +319,75 @@ def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations:
features[capa.features.common.MatchedRule(namespace)].update(locations)


class Sequence(Statement):
"""
match if the children evaluate to True in increasing order of location.

the order of evaluation is dictated by the property
`Sequence.children` (type: list[Statement|Feature]).
"""

def __init__(self, children, description=None):
super().__init__(description=description)
self.children = children

def evaluate(self, features: FeatureSet, short_circuit=True):
capa.perf.counters["evaluate.feature"] += 1
capa.perf.counters["evaluate.feature.sequence"] += 1

results = []
min_location = None

for child in self.children:
result = child.evaluate(features, short_circuit=short_circuit)
results.append(result)

if not result:
# all children must match
return Result(False, self, results)

# Check for location ordering
# We want to find *some* location in the child's locations that is greater than
# the minimum location from the previous child.
#
# If this is the first child, we just take its minimum location.

# The child might match at multiple locations.
# We need to be careful to pick a location that allows subsequent children to match.
# This is a greedy approach: we pick the smallest location that satisfies the constraint.
# This maximizes the "room" for subsequent children.

valid_locations = sorted(result.locations)
if not valid_locations:
# This should effectively never happen if `result.success` is True,
# unless the feature has no associated location (e.g. global features).
# If a feature has no location, we can't enforce order, so strict sequence fails?
# OR we assume it "matches anywhere" and doesn't constrain order?
#
# For now, let's assume valid locations are required for sequence logic.
# If a child has no locations, it fails the sequence constraint.
return Result(False, self, results)

if min_location is None:
min_location = valid_locations[0]
# Filter result to only include this location
results[-1] = Result(True, child, result.children, locations={min_location})
else:
# Find the first location that is strictly greater than min_location
found = False
for loc in valid_locations:
if loc > min_location:
min_location = loc
found = True
results[-1] = Result(True, child, result.children, locations={min_location})
break

if not found:
return Result(False, self, results)
Comment on lines +371 to +386
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for finding the next valid location can be simplified to be more readable and concise by using a generator expression with next(). This avoids the use of a boolean flag (found) and repeated code, making the implementation easier to maintain.

            if min_location is None:
                # first child, pick the smallest location.
                min_location = valid_locations[0]
                results[-1] = Result(True, child, result.children, locations={min_location})
            else:
                # subsequent children, find the smallest location that is greater than the previous one.
                new_min_location = next((loc for loc in valid_locations if loc > min_location), None)
                if new_min_location is None:
                    return Result(False, self, results)

                min_location = new_min_location
                results[-1] = Result(True, child, result.children, locations={min_location})


return Result(True, self, results, locations={next(iter(r.locations)) for r in results})


def match(rules: list["capa.rules.Rule"], features: FeatureSet, addr: Address) -> tuple[FeatureSet, MatchResults]:
"""
match the given rules against the given features,
Expand Down
7 changes: 6 additions & 1 deletion capa/render/result_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ class CompoundStatementType:
AND = "and"
OR = "or"
NOT = "not"
NOT = "not"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The NOT enum member is duplicated. Please remove the redundant line to avoid potential issues.

OPTIONAL = "optional"
SEQUENCE = "sequence"


class StatementModel(FrozenModel): ...
Expand Down Expand Up @@ -213,7 +215,7 @@ class StatementNode(FrozenModel):


def statement_from_capa(node: capa.engine.Statement) -> Statement:
if isinstance(node, (capa.engine.And, capa.engine.Or, capa.engine.Not)):
if isinstance(node, (capa.engine.And, capa.engine.Or, capa.engine.Not, capa.engine.Sequence)):
return CompoundStatement(type=node.__class__.__name__.lower(), description=node.description)

elif isinstance(node, capa.engine.Some):
Expand Down Expand Up @@ -280,6 +282,9 @@ def node_to_capa(
elif node.statement.type == CompoundStatementType.OPTIONAL:
return capa.engine.Some(description=node.statement.description, count=0, children=children)

elif node.statement.type == CompoundStatementType.SEQUENCE:
return capa.engine.Sequence(description=node.statement.description, children=children)

else:
assert_never(node.statement.type)

Expand Down
4 changes: 3 additions & 1 deletion capa/rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,8 @@ def build_statements(d, scopes: Scopes):
return ceng.And(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
elif key == "or":
return ceng.Or(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
elif key == "sequence":
return ceng.Sequence(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using unique() on the children of a sequence statement prevents a rule from specifying a sequence of identical features (e.g., two consecutive push instructions). Since the order and multiplicity of features are significant in a sequence, unique() should not be used here. This allows rules to match patterns with repeated elements in a specific order.

Suggested change
return ceng.Sequence(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
return ceng.Sequence([build_statements(dd, scopes) for dd in d[key]], description=description)

elif key == "not":
if len(d[key]) != 1:
raise InvalidRule("not statement must have exactly one child statement")
Expand Down Expand Up @@ -1698,7 +1700,7 @@ def rec(
# feature is found N times
return rec(rule_name, node.child)

elif isinstance(node, ceng.And):
elif isinstance(node, (ceng.And, ceng.Sequence)):
# When evaluating an AND block, all of the children need to match.
#
# So when we index rules, we want to pick the most uncommon feature(s)
Expand Down
144 changes: 143 additions & 1 deletion tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import capa.features.address
from capa.engine import Or, And, Not, Some, Range
from capa.engine import Or, And, Not, Some, Range, Sequence
from capa.features.insn import Number

ADDR1 = capa.features.address.AbsoluteVirtualAddress(0x401001)
Expand Down Expand Up @@ -155,3 +155,145 @@ def test_eval_order():

assert Or([Number(1), Number(2)]).evaluate({Number(2): {ADDR1}}).children[1].statement == Number(2)
assert Or([Number(1), Number(2)]).evaluate({Number(2): {ADDR1}}).children[1].statement != Number(1)


def test_sequence():
# 1 before 2
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}})) is True
# 2 before 1 (fail)
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR2}, Number(2): {ADDR1}})) is False
# 1 same as 2 (fail)
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR1}})) is False

# 1 before 2 before 3
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR2}, Number(3): {ADDR3}}
)
)
is True
)

# 1 before 2 before 3 (fail, 3 is early)
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR4}, Number(3): {ADDR3}}
)
)
is False
)

# 1 before 2 before 3 (fail, 2 is late)
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR4}, Number(3): {ADDR3}}
)
)
is False
)
Comment on lines +178 to +196
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This test case is duplicated. Additionally, the comment fail, 3 is early is a bit misleading for the provided data. I suggest replacing the duplicated blocks with a single, clearer test case that accurately reflects the scenario of an out-of-order feature.

Suggested change
# 1 before 2 before 3 (fail, 3 is early)
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR4}, Number(3): {ADDR3}}
)
)
is False
)
# 1 before 2 before 3 (fail, 2 is late)
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR4}, Number(3): {ADDR3}}
)
)
is False
)
# 1 before 2, but 3 is before 2 (fail)
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR3}, Number(3): {ADDR2}}
)
)
is False
)


# multiple locations for matches
# 1 at 1, 2 at 2 (match)
# 1 also at 3
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1, ADDR3}, Number(2): {ADDR2}})) is True

# greedy matching?
# 1 at 2, 2 at 3
# 1 matches at 2, so min_loc becomes 2.
# 2 matches at 3, > 2. Match.
# But wait, 1 also matches at 4.
# If we picked 4, 1 > 2 would fail? No.
# The heuristic is: pick the *smallest* location for the current child (that satisfies previous constraint).

# CASE:
# 1 matches at 10.
# 2 matches at 5 and 15.
# if 2 picks 5, 5 > 10 is False.
# if 2 picks 15, 15 > 10 is True. Match.

assert (
bool(
Sequence([Number(1), Number(2)]).evaluate(
{
Number(1): {capa.features.address.AbsoluteVirtualAddress(10)},
Number(2): {
capa.features.address.AbsoluteVirtualAddress(5),
capa.features.address.AbsoluteVirtualAddress(15),
},
}
)
)
is True
)

# CASE:
# 1 matches at 10 and 20.
# 2 matches at 15.
# 1 should pick 10. 10 < 15. Match.
assert (
bool(
Sequence([Number(1), Number(2)]).evaluate(
{
Number(1): {
capa.features.address.AbsoluteVirtualAddress(10),
capa.features.address.AbsoluteVirtualAddress(20),
},
Number(2): {capa.features.address.AbsoluteVirtualAddress(15)},
}
)
)
is True
)

# CASE:
# 1 matched at 10.
# 2 matched at 15.
# 3 matched at 12.
# 1 -> 10.
# 2 -> 15 (> 10).
# 3 -> 12 (not > 15).
# Fail.
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{
Number(1): {capa.features.address.AbsoluteVirtualAddress(10)},
Number(2): {capa.features.address.AbsoluteVirtualAddress(15)},
Number(3): {capa.features.address.AbsoluteVirtualAddress(12)},
}
)
)
is False
)


def test_location_propagation():
# regression tests for issue where Or/And/Some statements
# failed to propagate match locations to their results,
# causing Sequence evaluation to fail.

# Or
assert Or([Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
assert Or([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
ADDR1
} # short_circuit=True returns first match
assert Or([Number(1), Number(2)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR2}}, short_circuit=False
).locations == {ADDR1, ADDR2}

# And
assert And([Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
assert And([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {ADDR1, ADDR2}

# Some
assert Some(1, [Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
assert Some(1, [Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
ADDR1
} # short_circuit=True returns first sufficient set
assert Some(2, [Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
ADDR1,
ADDR2,
}
Loading
Loading