Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions facedancer/filters/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import datetime

from facedancer.request import USBControlRequest

from ..logging import log

from . import USBProxyFilter
Expand All @@ -23,7 +25,7 @@ def __init__(self, verbose=4, decoration=''):



def filter_control_in(self, req, data, stalled):
def filter_control_in(self, req: USBControlRequest | None, data, stalled):
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type annotations! We love more type annotations in the codebase! Thank you!

Log IN control requests without modification.
"""
Expand All @@ -33,7 +35,7 @@ def filter_control_in(self, req, data, stalled):
return req, data, stalled

if self.verbose > 3:
log.info("{} {}{}".format(self.timestamp(), self.decoration, repr(req)))
log.info("{} {}< control {}".format(self.timestamp(), self.decoration, req))

if self.verbose > 3 and stalled:
log.info("{} {}< --STALLED-- ".format(self.timestamp(), self.decoration))
Expand All @@ -57,7 +59,7 @@ def filter_control_out(self, req, data):
return req, data

if self.verbose > 3:
log.info("{} {}{}".format(self.timestamp(), self.decoration, repr(req)))
log.info("{} {}> control {}".format(self.timestamp(), self.decoration, req))

if self.verbose > 4 and data:
self._pretty_print_data(data, '>', self.decoration)
Expand Down Expand Up @@ -101,16 +103,18 @@ def filter_out(self, ep_num, data):

def timestamp(self):
""" Generate a quick timestamp for printing. """
return datetime.datetime.now().strftime("[%H:%M:%S]")
return datetime.datetime.now().strftime("[%H:%M:%S.%f]")

def _magic_decode(self, data):
""" Simple decode function that attempts to find a nice string representation for the console."""
try:
return bytes(data).decode('utf-16le')
except:
return bytes(data)
return bytes(data).hex(' ', 2)


def _pretty_print_data(self, data, direction_marker, decoration='', is_string=False, ep_marker=''):
data = self._magic_decode(data) if is_string else bytes(data)
log.info("{} {}{}{}: {}".format(self.timestamp(), ep_marker, decoration, direction_marker, data))
decoded = self._magic_decode(data) if is_string else bytes(data).hex(' ', 2)
if self.verbose < 6 and len(data) >= 30:
decoded = decoded[:30] + '…'
log.info("{} {}{}{}: {} ({})".format(self.timestamp(), ep_marker, decoration, direction_marker, decoded, len(data)))
35 changes: 23 additions & 12 deletions facedancer/filters/standard.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class USBProxySetupFilters(USBProxyFilter):

MAX_PACKET_SIZE_EP0 = 64

configurations: dict[int, USBDescriptor]

def __init__(self, device, verbose=0):
self.device = device
self.configurations = {}
Expand All @@ -48,18 +50,20 @@ def filter_control_in(self, req, data, stalled):
if descriptor_type == self.DESCRIPTOR_CONFIGURATION and req.length >= 32:
configuration = USBDescribable.from_binary_descriptor(data)
self.configurations[configuration.number] = configuration
if self.verbose > 0:
if self.verbose > 2:
log.info("-- Storing configuration {} --".format(configuration))


if descriptor_type == self.DESCRIPTOR_DEVICE and req.length >= 7:
# Patch our data to overwrite the maximum packet size on EP0.
# See USBProxy.connect for a rationale on this.
device = USBDescribable.from_binary_descriptor(data)
device.max_packet_size_ep0 = 64
data = bytearray(device.get_descriptor())[:len(data)]
if self.verbose > 0:
log.info("-- Patched device descriptor. --")
old = device.max_packet_size_ep0
if old != 64:
device.max_packet_size_ep0 = 64
data = bytearray(device.get_descriptor())[:len(data)]
if self.verbose > 0:
log.info(f"-- Patched device descriptor. max_packet_size_ep0 {old} to 64 --")

return req, data, stalled

Expand All @@ -69,13 +73,14 @@ def filter_control_out(self, req, data):
# handle it ourself, and absorb it.
if req.get_recipient() == self.RECIPIENT_DEVICE and \
req.request == self.SET_ADDRESS_REQUEST:
log.info(f"setup: out - handle set_address {req}")
req.acknowledge(blocking=True)
self.device.set_address(req.value)
return None, None

# Special case: if this is a SET_CONFIGURATION_REQUEST,
# pass it through, but also set up the Facedancer hardware
# in response.
# Special case: SET_CONFIGURATION_REQUEST
# libusb1 recommends always calling setConfiguration() instead
# of sending the control packet manually
if req.get_recipient() == self.RECIPIENT_DEVICE and \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. This should have been changed when we moved USB Proxy from libusb to libusb1. Thank you!

req.request == self.SET_CONFIGURATION_REQUEST:
configuration_index = req.value
Expand All @@ -84,8 +89,10 @@ def filter_control_out(self, req, data):
if configuration_index in self.configurations:
configuration = self.configurations[configuration_index]

if self.verbose > 0:
if self.verbose > 2:
log.info("-- Applying configuration {} --".format(configuration))
elif self.verbose > 0:
log.info(f"-- Applying configuration {configuration.get_identifier()} --")

self.device.configured(configuration)

Expand All @@ -94,13 +101,17 @@ def filter_control_out(self, req, data):
else:
log.warning("-- WARNING: Applying configuration {}, but we've never read that configuration's descriptor! --".format(configuration_index))

# Special case: if this is a SET_INTERFACE_REQUEST,
# pass it through, but also tell the device so it can update
# its current configuration.
return None, None

# Special case: SET_INTERFACE_REQUEST
# libusb1 recommends calling setInterfaceAlternative()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

if req.get_recipient() == self.RECIPIENT_INTERFACE and \
req.request == self.SET_INTERFACE_REQUEST:
interface_number = req.index
alternate = req.value
log.info(f"setup: out - handle SET_INTERFACE_REQUEST {req}")
self.device.interface_changed(interface_number, alternate)

return None, None

return req, data
Loading