Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,24 @@ jobs:

strategy:
matrix:
python-version: [ 3.8, 3.9, "3.10", "3.11", 3.12 ]
python-version: [ "3.9", "3.10", "3.11", "3.12", "3.13", "3.14" ]
os: [ ubuntu-latest, macos-latest, windows-latest ]

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Lint with flake8
- name: Install dependencies
run: |
python -m pip install flake8
flake8 . --max-line-length=127
- name: Verify sorted imports
python3 -m pip install --upgrade pip
python3 -m pip install pre-commit
- name: Run pre-commit hooks
run: |
python -m pip install isort
isort . -m HANGING_INDENT -l 120 --check-only
pre-commit run --all-files
- name: Test install
run: |
python -m pip install --upgrade pip
python -m pip install -U .
python -m pip install -U '.'
- name: Test show usage
run: |
python -m harlogger --help
run: python -m harlogger --help
38 changes: 17 additions & 21 deletions .github/workflows/python-publish.yml
Original file line number Diff line number Diff line change
@@ -1,31 +1,27 @@
# This workflows will upload a Python Package using Twine when a release is created
# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries
# This workflow builds and uploads the Python package to PyPI when a release is created

name: Upload Python Package

on:
release:
types: [created]
types: [ created ]

jobs:
deploy:

permissions:
id-token: write
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.x'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install setuptools wheel twine build
- name: Build and publish
env:
TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }}
TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }}
run: |
python -m build
twine upload dist/*
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install uv
uses: astral-sh/setup-uv@v3
- name: Build distributions
run: uv build
- name: Publish to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
packages-dir: dist
9 changes: 9 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.14.2
hooks:
# Run the linter.
- id: ruff-check
# Run the formatter.
- id: ruff-format
141 changes: 82 additions & 59 deletions harlogger/__main__.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,107 @@
import click
from pymobiledevice3.cli.cli_common import Command
from pymobiledevice3.lockdown_service_provider import LockdownServiceProvider
from typing import Annotated, Optional

from harlogger.sniffers import Filters, HostSnifferProfile, MobileSnifferProfile, SnifferPreference


@click.group()
def cli():
pass


@cli.group()
def mobile():
""" Mobile sniffing options """
pass
import typer
from pymobiledevice3.cli.cli_common import ServiceProviderDep
from typer_injector import InjectingTyper

from harlogger.sniffers import Filters, HostSnifferProfile, MobileSnifferProfile, SnifferPreference

@mobile.command('profile', cls=Command)
@click.option('pids', '-p', '--pid', type=click.INT, multiple=True, help='filter pid list')
@click.option('--color/--no-color', default=True)
@click.option('process_names', '-pn', '--process-name', multiple=True, help='filter process name list')
@click.option('images', '-i', '--image', multiple=True, help='filter image list')
@click.option('--request/--no-request', is_flag=True, default=True, help='show requests')
@click.option('--response/--no-response', is_flag=True, default=True, help='show responses')
@click.option('-u', '--unique', is_flag=True, help='show only unique requests per image/pid/method/uri combination')
@click.option('--black-list/--white-list', default=True, is_flag=True)
def mobile_profile(service_provider: LockdownServiceProvider, pids, process_names, color, request, response, images,
unique, black_list):
cli = InjectingTyper(
help="Monitor HTTP traffic on given macOS/iOS devices.",
no_args_is_help=True,
)
mobile = InjectingTyper(help="Mobile sniffing options")
cli.add_typer(mobile, name="mobile")


@mobile.command("profile")
def mobile_profile(
service_provider: ServiceProviderDep,
pids: Annotated[Optional[list[int]], typer.Option("-p", "--pid", help="filter pid list")] = None,
process_names: Annotated[
Optional[list[str]],
typer.Option("-pn", "--process-name", help="filter process name list"),
] = None,
color: Annotated[bool, typer.Option("--color/--no-color")] = True,
request: Annotated[bool, typer.Option("--request/--no-request", help="show requests")] = True,
response: Annotated[bool, typer.Option("--response/--no-response", help="show responses")] = True,
images: Annotated[Optional[list[str]], typer.Option("-i", "--image", help="filter image list")] = None,
unique: Annotated[
bool,
typer.Option("-u", "--unique", help="show only unique requests per image/pid/method/uri combination"),
] = False,
black_list: Annotated[bool, typer.Option("--black-list/--white-list")] = True,
):
"""
Sniff using CFNetworkDiagnostics.mobileconfig profile.

This requires the specific Apple profile to be installed for the sniff to work.
"""
filters = Filters(pids, process_names, images, black_list)
MobileSnifferProfile(service_provider, filters=filters, request=request, response=response, color=color,
unique=unique).sniff()


@mobile.command('preference', cls=Command)
@click.option('-o', '--out', type=click.File('w'), help='file to store the har entries into upon exit (ctrl+c)')
@click.option('pids', '-p', '--pid', type=click.INT, multiple=True, help='filter pid list')
@click.option('--color/--no-color', default=True)
@click.option('process_names', '-pn', '--process-name', multiple=True, help='filter process name list')
@click.option('images', '-i', '--image', multiple=True, help='filter image list')
@click.option('--request/--no-request', is_flag=True, default=True, help='show requests')
@click.option('--response/--no-response', is_flag=True, default=True, help='show responses')
@click.option('-u', '--unique', is_flag=True, help='show only unique requests per image/pid/method/uri combination')
@click.option('--black-list/--white-list', default=True, is_flag=True)
def mobile_preference(service_provider: LockdownServiceProvider, out, pids, process_names, images, request, response,
color, unique, black_list):
MobileSnifferProfile(
service_provider, filters=filters, request=request, response=response, color=color, unique=unique
).sniff()


@mobile.command("preference")
def mobile_preference(
service_provider: ServiceProviderDep,
out: Annotated[
Optional[typer.FileTextWrite],
typer.Option("-o", "--out", help="file to store the har entries into upon exit (ctrl+c)"),
] = None,
pids: Annotated[Optional[list[int]], typer.Option("-p", "--pid", help="filter pid list")] = None,
process_names: Annotated[
Optional[list[str]],
typer.Option("-pn", "--process-name", help="filter process name list"),
] = None,
color: Annotated[bool, typer.Option("--color/--no-color")] = True,
images: Annotated[Optional[list[str]], typer.Option("-i", "--image", help="filter image list")] = None,
request: Annotated[bool, typer.Option("--request/--no-request", help="show requests")] = True,
response: Annotated[bool, typer.Option("--response/--no-response", help="show responses")] = True,
unique: Annotated[
bool,
typer.Option("-u", "--unique", help="show only unique requests per image/pid/method/uri combination"),
] = False,
black_list: Annotated[bool, typer.Option("--black-list/--white-list")] = True,
):
"""
Sniff using the secret com.apple.CFNetwork.plist configuration.

This sniff includes the request/response body as well but requires the device to be jailbroken for
the sniff to work
"""
filters = Filters(pids, process_names, images, black_list)
SnifferPreference(service_provider, filters=filters, request=request, response=response, out=out, color=color,
unique=unique).sniff()


@cli.command('profile')
@click.option('pids', '-p', '--pid', type=click.INT, multiple=True, help='filter pid list')
@click.option('--color/--no-color', default=True)
@click.option('process_names', '-pn', '--process-name', multiple=True, help='filter process name list')
@click.option('images', '-i', '--image', multiple=True, help='filter image list')
@click.option('--request/--no-request', is_flag=True, default=True, help='show requests')
@click.option('--response/--no-response', is_flag=True, default=True, help='show responses')
@click.option('-u', '--unique', is_flag=True, help='show only unique requests per image/pid/method/uri combination')
@click.option('--black-list/--white-list', default=True, is_flag=True)
def host_profile(pids, process_names, color, request, response, images, unique, black_list):
SnifferPreference(
service_provider, filters=filters, request=request, response=response, out=out, color=color, unique=unique
).sniff()


@cli.command("profile")
def host_profile(
pids: Annotated[Optional[list[int]], typer.Option("-p", "--pid", help="filter pid list")] = None,
process_names: Annotated[
Optional[list[str]],
typer.Option("-pn", "--process-name", help="filter process name list"),
] = None,
color: Annotated[bool, typer.Option("--color/--no-color")] = True,
request: Annotated[bool, typer.Option("--request/--no-request", help="show requests")] = True,
response: Annotated[bool, typer.Option("--response/--no-response", help="show responses")] = True,
images: Annotated[Optional[list[str]], typer.Option("-i", "--image", help="filter image list")] = None,
unique: Annotated[
bool,
typer.Option("-u", "--unique", help="show only unique requests per image/pid/method/uri combination"),
] = False,
black_list: Annotated[bool, typer.Option("--black-list/--white-list")] = True,
):
"""
Sniff using CFNetworkDiagnostics.mobileconfig profile.

This requires the specific Apple profile to be installed for the sniff to work.
"""
filters = Filters(pids, process_names, images, black_list)
HostSnifferProfile(filters=filters, request=request, response=response, color=color,
unique=unique).sniff()
HostSnifferProfile(filters=filters, request=request, response=response, color=color, unique=unique).sniff()


if __name__ == '__main__':
if __name__ == "__main__":
cli()
2 changes: 1 addition & 1 deletion harlogger/haralyzer_patches.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ def text(self) -> Optional[str]:


def add_text_base64_support_for_haralyzer() -> None:
setattr(Response, 'text', ResponseHook.text)
Response.text = ResponseHook.text
64 changes: 40 additions & 24 deletions harlogger/http_transaction.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,50 @@
from abc import abstractmethod
from typing import List, Mapping, MutableMapping, Union
from collections.abc import Mapping, MutableMapping
from typing import Optional, Union

from cached_property import cached_property


class HTTPTransaction:
def __init__(self, url: str, http_version: str, headers: Mapping = None, body: Union[str, bytes] = None):
def __init__(
self, url: str, http_version: str, headers: Optional[Mapping] = None, body: Optional[Union[str, bytes]] = None
):
self.url = url
self.http_version = http_version
self.headers = headers
self.body = body

@staticmethod
def parse_transaction(message: str) -> 'HTTPTransaction':
def parse_transaction(message: str) -> "HTTPTransaction":
res = None
parsed_transaction = HTTPTransaction._parse_fields(message=message)

if 'Protocol Enqueue' in parsed_transaction:
info = parsed_transaction.pop('Protocol Enqueue').split()[1:]
if "Protocol Enqueue" in parsed_transaction:
info = parsed_transaction.pop("Protocol Enqueue").split()[1:]
if len(info) == 2:
method, url = info
http_version = 'unknown'
http_version = "unknown"
else:
method, url, http_version = info
parsed_transaction.pop('Message')
parsed_transaction.pop('Request')
parsed_transaction.pop("Message")
parsed_transaction.pop("Request")
res = HTTPRequest(url, method, http_version, parsed_transaction)

elif 'Protocol Received' in parsed_transaction:
url = parsed_transaction.pop('Protocol Received').split()[2]
http_version, status, *status_text = parsed_transaction.pop('Response').split(' ', 2)
elif "Protocol Received" in parsed_transaction:
url = parsed_transaction.pop("Protocol Received").split()[2]
http_version, status, *status_text = parsed_transaction.pop("Response").split(" ", 2)
res = HTTPResponse(url, http_version, status, status_text, parsed_transaction)
return res

@staticmethod
def _parse_fields(message: str) -> MutableMapping:
result = {}
for line in message.split('\n'):
if ': ' not in line:
for line in message.split("\n"):
if ": " not in line:
continue

line = line.strip()
k, v = line.split(':', 1)
k, v = line.split(":", 1)
k = k.strip()
v = v.strip()
result[k] = v
Expand All @@ -53,28 +56,41 @@ def _start_line(self) -> str:

@cached_property
def formatted(self) -> str:
formatted_headers = ''
formatted_headers = ""
for k, v in self.headers.items():
formatted_headers += f'{k}: {v}\n'
return f'{self._start_line()}\n{formatted_headers}\n{self.body if self.body else ""}\n'
formatted_headers += f"{k}: {v}\n"
return f"{self._start_line()}\n{formatted_headers}\n{self.body if self.body else ''}\n"


class HTTPRequest(HTTPTransaction):
def __init__(self, url: str, method: str, http_version: str, headers: Mapping = None,
body: Union[str, bytes] = None):
def __init__(
self,
url: str,
method: str,
http_version: str,
headers: Optional[Mapping] = None,
body: Optional[Union[str, bytes]] = None,
):
super().__init__(url, http_version, headers, body)
self.method = method

def _start_line(self) -> str:
return f'{self.method} {self.url} {self.http_version}'
return f"{self.method} {self.url} {self.http_version}"


class HTTPResponse(HTTPTransaction):
def __init__(self, url: str, http_version: str, status: int, status_text: List, headers: Mapping = None,
body: Union[str, bytes] = None):
def __init__(
self,
url: str,
http_version: str,
status: int,
status_text: list,
headers: Optional[Mapping] = None,
body: Optional[Union[str, bytes]] = None,
):
super().__init__(url, http_version, headers, body)
self.status = status
self.status_text = ' '.join(status_text)
self.status_text = " ".join(status_text)

def _start_line(self) -> str:
return f'{self.http_version} {self.status} {self.status_text}'
return f"{self.http_version} {self.status} {self.status_text}"
Loading