Skip to content

Commit 4889870

Browse files
committed
Python: Sample Limoncello CFF unflattening workflow
Mostly just as a demonstration of rewriting MLIL, less about ACTUALLY deobfuscating the target
1 parent 7871953 commit 4889870

File tree

1 file changed

+293
-0
lines changed

1 file changed

+293
-0
lines changed

python/examples/wf_unflatten.py

Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
import json
2+
from binaryninja import Workflow, Activity, AnalysisContext, MediumLevelILBasicBlock, \
3+
MediumLevelILInstruction, ReportCollection, FlowGraphReport, show_report_collection, \
4+
FlowGraph, DisassemblySettings
5+
from binaryninja.mediumlevelil import *
6+
from binaryninja.enums import *
7+
8+
9+
"""
10+
This workflow reverses the control-flow flattening algorithm of Limoncello[1],
11+
at least some of the time. It does this in a relatively simple manner:
12+
13+
1. Find "dispatcher blocks" which contain a MLIL_JUMP_TO whose targets all have the
14+
dispatcher as a post-dominator (ie all targets must flow back to the dispatcher)
15+
2. Walk backwards from all unconditional branches into the dispatcher until finding blocks
16+
with conditional branches and build a list of all unconditional continuation blocks
17+
for each block that flows into the dispatcher.
18+
3. Copy those continuation blocks and the dispatcher into each block that calls them.
19+
This leaves you with a copy of the dispatcher block (and intermediate bookkeeping) in
20+
every path that would have normally flowed into the dispatcher.
21+
4. Since each path now has its own copy of the dispatcher, use MLIL dataflow to solve for
22+
which branch of the dispatcher is taken at each copy. When a dispatcher with a solved
23+
target is encountered, rewrite it as an MLIL_GOTO with the target directly.
24+
25+
[1] https://github.com/jonpalmisc/limoncello
26+
"""
27+
28+
29+
def is_dispatcher(block: MediumLevelILBasicBlock) -> bool:
30+
"""
31+
Determine if a block looks like a CFF dispatcher, i.e. all outgoing edges
32+
post-dominate it (or return)
33+
34+
:param block: The block to check
35+
:return: True if it's a dispatcher
36+
"""
37+
for ins in block:
38+
ins: MediumLevelILInstruction
39+
if ins.operation == MediumLevelILOperation.MLIL_JUMP_TO:
40+
for edge in block.outgoing_edges:
41+
out_block = edge.target
42+
# This is too trivial for fancier cases with multiple blocks that all lead
43+
# to a return, but whatever this is just a sample plugin
44+
if len(out_block.outgoing_edges) == 0:
45+
continue
46+
if block not in out_block.post_dominators:
47+
return False
48+
49+
return True
50+
return False
51+
52+
53+
def graph_pls(fn: MediumLevelILFunction) -> FlowGraph:
54+
# For ReportCollection, create graph with the settings I want
55+
settings = DisassemblySettings()
56+
settings.set_option(DisassemblyOption.ShowAddress, True)
57+
return fn.create_graph_immediate(settings=settings)
58+
59+
60+
def rewrite_action(context: AnalysisContext, do_it: bool):
61+
# Main workflow action
62+
63+
# =====================================================================
64+
# Custom debug report
65+
66+
report = None
67+
if context.function.check_for_debug_report("unflatten"):
68+
report = ReportCollection()
69+
70+
try:
71+
if report is not None:
72+
init_graph = graph_pls(context.mlil)
73+
report.append(FlowGraphReport("Initial", init_graph, context.view))
74+
75+
# =====================================================================
76+
# Finding flattened control flow and resolving continuations
77+
78+
# Look for dispatcher block
79+
dispatcher = None
80+
for block in context.mlil.basic_blocks:
81+
if is_dispatcher(block):
82+
dispatcher = block
83+
break
84+
85+
if dispatcher is None:
86+
return
87+
88+
if report is not None:
89+
init_graph = graph_pls(context.mlil)
90+
for node_index, node in enumerate(init_graph.nodes):
91+
if node.basic_block.start == dispatcher.start:
92+
node.highlight = HighlightStandardColor.RedHighlightColor
93+
init_graph.replace(node_index, node)
94+
report.append(FlowGraphReport("Finding Dispatcher", init_graph, context.view))
95+
96+
# Find blocks that flow directly into the dispatcher
97+
queue = []
98+
to_copy = {}
99+
for incoming in dispatcher.incoming_edges:
100+
if dispatcher in incoming.source.dominators:
101+
to_copy[incoming.source] = [dispatcher]
102+
queue.append(incoming.source)
103+
104+
# For each of these, walk back along unconditional branch edges to find their
105+
# list of continuation blocks which will be copied at their end
106+
while len(queue) > 0:
107+
# This is a sort of backwards BFS
108+
top = queue.pop(0)
109+
any_conditional = False
110+
for incoming in top.incoming_edges:
111+
if incoming.type != BranchType.UnconditionalBranch:
112+
any_conditional = True
113+
# Having any conditional branches ends the unconditional chain
114+
if not any_conditional:
115+
for incoming in top.incoming_edges:
116+
if incoming.type == BranchType.UnconditionalBranch:
117+
if incoming.source not in to_copy:
118+
queue.append(incoming.source)
119+
to_copy[incoming.source] = [top] + to_copy[top]
120+
to_copy[top] = []
121+
122+
# Ignore empty continuation paths
123+
for bb in list(to_copy.keys()):
124+
if len(to_copy[bb]) == 0:
125+
to_copy.pop(bb)
126+
127+
if report is not None:
128+
for block, path in to_copy.items():
129+
init_graph = graph_pls(context.mlil)
130+
for node_index, node in enumerate(init_graph.nodes):
131+
if node.basic_block.start in [bb.start for bb in path]:
132+
node.highlight = HighlightStandardColor.RedHighlightColor
133+
if node.basic_block.start == block.start:
134+
node.highlight = HighlightStandardColor.GreenHighlightColor
135+
init_graph.replace(node_index, node)
136+
report.append(FlowGraphReport(" Blocks flowing into", init_graph, context.view))
137+
138+
# =====================================================================
139+
# Modify the IL to copy the continuations into all the blocks calling the dispatcher
140+
141+
# Make a new IL function and append the modified instructions to it
142+
old_mlil = context.mlil
143+
new_mlil = MediumLevelILFunction(old_mlil.arch, low_level_il=context.llil)
144+
new_mlil.prepare_to_copy_function(old_mlil)
145+
block_map_starts = {}
146+
147+
# Copy all instructions in all blocks of the old version of the function
148+
for block in old_mlil.basic_blocks:
149+
new_mlil.prepare_to_copy_block(block)
150+
block_map_starts[block] = len(new_mlil)
151+
152+
new_mlil.set_current_address(old_mlil[InstructionIndex(block.start)].address, block.arch)
153+
154+
for instr_index in range(block.start, block.end):
155+
instr: MediumLevelILInstruction = old_mlil[InstructionIndex(instr_index)]
156+
157+
# Copy continuation blocks to end of block calling dispatcher
158+
if block in to_copy:
159+
path = to_copy[block]
160+
161+
if instr_index == block.end - 1:
162+
# For every block in the continuation, copy it at the end of this block
163+
for copy_block in path:
164+
new_mlil.prepare_to_copy_block(copy_block)
165+
166+
# Skip the final instruction in the continuations because it is a MLIL_GOTO
167+
end = copy_block.end - 1
168+
if copy_block == dispatcher:
169+
end = copy_block.end
170+
171+
for copy_block_instr_index in range(copy_block.start, end):
172+
# Copy instructions as-is
173+
copy_block_instr: MediumLevelILInstruction = old_mlil[InstructionIndex(copy_block_instr_index)]
174+
new_mlil.set_current_address(copy_block_instr.address, copy_block.arch)
175+
new_mlil.append(copy_block_instr.copy_to(new_mlil))
176+
continue
177+
178+
# Otherwise, copy the instruction as-is
179+
new_mlil.set_current_address(instr.address, block.arch)
180+
new_mlil.append(instr.copy_to(new_mlil))
181+
182+
# Generate blocks and SSA (for dataflow) for the next part
183+
new_mlil.finalize()
184+
new_mlil.generate_ssa_form()
185+
186+
# Since we're constructing a new function twice, we need to commit the mappings
187+
# of the intermediate function before copying again so that mappings will resolve
188+
# all the way to the end (gross)
189+
# TODO: Construct from another function without needing this
190+
context.mlil = new_mlil
191+
192+
if report is not None:
193+
newer_graph = graph_pls(new_mlil)
194+
report.append(FlowGraphReport("Swapped dispatch with jump_to", newer_graph, context.view))
195+
196+
# =====================================================================
197+
# Now convert all MLIL_JUMP_TO with known dest to a jump
198+
199+
# Maybe this should be a separate workflow action (so it can be composed)
200+
old_mlil = new_mlil
201+
new_mlil = MediumLevelILFunction(old_mlil.arch, low_level_il=context.llil)
202+
new_mlil.prepare_to_copy_function(old_mlil)
203+
204+
# Keep a running list of blocks and labels so we can line up the MLIL_GOTOs
205+
block_labels = {}
206+
207+
for old_block in old_mlil.basic_blocks:
208+
new_mlil.prepare_to_copy_block(old_block)
209+
new_mlil.set_current_address(old_mlil[InstructionIndex(old_block.start)].address, old_block.arch)
210+
211+
# Update block label list for the MLIL_GOTOs
212+
if old_block.start in block_labels:
213+
label = block_labels[old_block.start]
214+
else:
215+
label = MediumLevelILLabel()
216+
block_labels[old_block.start] = label
217+
new_mlil.mark_label(label)
218+
219+
for instr_index in range(old_block.start, old_block.end):
220+
instr: MediumLevelILInstruction = old_mlil[InstructionIndex(instr_index)]
221+
# If we find a MLIL_JUMP_TO with a known constant dest, then rewrite it
222+
# to a MLIL_GOTO with the known dest filled in
223+
if instr.operation == MediumLevelILOperation.MLIL_JUMP_TO:
224+
if instr.dest.value.type == RegisterValueType.ConstantPointerValue:
225+
dest_value = instr.dest.value.value
226+
if dest_value in instr.targets:
227+
old_target_index = instr.targets[dest_value]
228+
if old_target_index in block_labels:
229+
target_label = block_labels[old_target_index]
230+
else:
231+
target_label = MediumLevelILLabel()
232+
block_labels[old_target_index] = target_label
233+
new_mlil.append(new_mlil.goto(target_label, ILSourceLocation.from_instruction(instr)))
234+
continue
235+
236+
# Otherwise, copy the instruction as-is
237+
new_mlil.set_current_address(instr.address, old_block.arch)
238+
new_mlil.append(instr.copy_to(new_mlil))
239+
240+
new_mlil.finalize()
241+
new_mlil.generate_ssa_form()
242+
243+
if report is not None:
244+
newer_graph = graph_pls(new_mlil)
245+
report.append(FlowGraphReport("Resolved constant jump_to's", newer_graph, context.view))
246+
247+
# =====================================================================
248+
# And we're done
249+
250+
if do_it:
251+
context.mlil = new_mlil
252+
finally:
253+
# Show debug report if requested, even on exception thrown
254+
if report is not None:
255+
show_report_collection("Unflatten Debug Report", report)
256+
257+
258+
# Create and register the workflow for translating these instructions
259+
wf = Workflow("core.function.metaAnalysis").clone("core.function.metaAnalysis")
260+
261+
# Define the custom activity configuration
262+
wf.register_activity(Activity(
263+
configuration=json.dumps({
264+
"name": "extension.unflatten_limoncello.unflatten.dry_run",
265+
"title": "Unflatten (Limoncello) Dry Run",
266+
"description": "Detect and reverse Limoncello's Control Flow Flattening scheme.",
267+
"eligibility": {
268+
"auto": {
269+
"default": False
270+
}
271+
}
272+
}),
273+
action=lambda context: rewrite_action(context, False)
274+
))
275+
wf.register_activity(Activity(
276+
configuration=json.dumps({
277+
"name": "extension.unflatten_limoncello.unflatten",
278+
"title": "Unflatten (Limoncello)",
279+
"description": "Detect and reverse Limoncello's Control Flow Flattening scheme.",
280+
"eligibility": {
281+
"auto": {
282+
"default": False
283+
}
284+
}
285+
}),
286+
action=lambda context: rewrite_action(context, True)
287+
))
288+
289+
wf.insert_after("core.function.generateMediumLevelIL",[
290+
"extension.unflatten_limoncello.unflatten.dry_run",
291+
"extension.unflatten_limoncello.unflatten"
292+
])
293+
wf.register()

0 commit comments

Comments
 (0)