Skip to content

Commit ca70340

Browse files
authored
Merge pull request #42 from coffeegist/refactor/parsing-pipeline-streaming
Refactor parser pipeline with streaming input
2 parents a93c4a3 + 384ffcb commit ca70340

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+2906
-1136
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
11
# Changelog
2+
## [0.4.16] - 10/27/2025
3+
### Changed
4+
- Refactored parser pipeline to use streaming input architecture with generator-based file reading
5+
- Introduced DataStream abstraction (FileDataStream, OutflankDataStream, MythicDataStream)
6+
- Implemented BoundaryBasedParser pattern for consistent parser behavior
7+
- Streamlined parser registration in ParsingPipelineFactory
8+
29
## [0.4.15] - 9/23/2025
310
### Fixed
411
- Pinned Click to version 8.1.8 to prevent typer help menu crashes

bofhound/__main__.py

Lines changed: 101 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
1+
"""Entry point for bofhound CLI application."""
12
import sys
2-
import os
33
import logging
44
import typer
5-
import glob
6-
from syncer import sync
7-
8-
from bofhound.parsers import LdapSearchBofParser, Brc4LdapSentinelParser, HavocParser, \
9-
ParserType, OutflankC2JsonParser, MythicParser
5+
from bofhound.parsers import ParserType, ParsingPipelineFactory
6+
from bofhound.parsers.data_sources import FileDataSource, MythicDataSource, OutflankDataStream
107
from bofhound.writer import BloodHoundWriter
118
from bofhound.uploader import BloodHoundUploader
129
from bofhound.ad import ADDS
@@ -23,20 +20,52 @@
2320

2421
@app.command()
2522
def main(
26-
input_files: str = typer.Option("/opt/cobaltstrike/logs", "--input", "-i", help="Directory or file containing logs of ldapsearch results"),
27-
output_folder: str = typer.Option(".", "--output", "-o", help="Location to export bloodhound files"),
28-
properties_level: PropertiesLevel = typer.Option(PropertiesLevel.Member.value, "--properties-level", "-p", case_sensitive=False, help='Change the verbosity of properties exported to JSON: Standard - Common BH properties | Member - Includes MemberOf and Member | All - Includes all properties'),
29-
parser: ParserType = typer.Option(ParserType.LdapsearchBof.value, "--parser", case_sensitive=False, help="Parser to use for log files. ldapsearch parser (default) supports ldapsearch BOF logs from Cobalt Strike and pyldapsearch logs"),
23+
input_files: str = typer.Option(
24+
"/opt/cobaltstrike/logs", "--input", "-i",
25+
help="Directory or file containing logs of ldapsearch results"
26+
),
27+
output_folder: str = typer.Option(
28+
".", "--output", "-o", help="Location to export bloodhound files"
29+
),
30+
properties_level: PropertiesLevel = typer.Option(
31+
PropertiesLevel.Member.value, "--properties-level", "-p", case_sensitive=False,
32+
help=("Change the verbosity of properties exported to JSON: "
33+
"Standard - Common BH properties | Member - Includes MemberOf and Member | "
34+
"All - Includes all properties")
35+
),
36+
parser_type: ParserType = typer.Option(
37+
ParserType.LdapsearchBof.value, "--parser", case_sensitive=False,
38+
help=("Parser to use for log files. ldapsearch parser (default) supports ldapsearch BOF "
39+
"logs from Cobalt Strike and pyldapsearch logs")
40+
),
3041
debug: bool = typer.Option(False, "--debug", help="Enable debug output"),
31-
zip_files: bool = typer.Option(False, "--zip", "-z", help="Compress the JSON output files into a zip archive"),
42+
zip_files: bool = typer.Option(
43+
False, "--zip", "-z",
44+
help="Compress the JSON output files into a zip archive"
45+
),
3246
quiet: bool = typer.Option(False, "--quiet", "-q", help="Suppress banner"),
33-
mythic_server: str = typer.Option("127.0.0.1", "--mythic-server", help="IP or hostname of Mythic server to connect to", rich_help_panel="Mythic Options"),
34-
mythic_token: str = typer.Option(None, "--mythic-token", help="Mythic API token", rich_help_panel="Mythic Options"),
35-
bh_token_id: str = typer.Option(None, "--bh-token-id", help="BloodHound API token ID", rich_help_panel="BloodHound CE Options"),
36-
bh_token_key: str = typer.Option(None, "--bh-token-key", help="BloodHound API token key", rich_help_panel="BloodHound CE Options"),
37-
bh_server: str = typer.Option("http://127.0.0.1:8080", "--bh-server", help="BloodHound CE URL", rich_help_panel="BloodHound CE Options")):
47+
mythic_server: str = typer.Option(
48+
"127.0.0.1", "--mythic-server", help="IP or hostname of Mythic server to connect to",
49+
rich_help_panel="Mythic Options"
50+
),
51+
mythic_token: str = typer.Option(
52+
None, "--mythic-token", help="Mythic API token", rich_help_panel="Mythic Options"
53+
),
54+
bh_token_id: str = typer.Option(
55+
None, "--bh-token-id", help="BloodHound API token ID",
56+
rich_help_panel="BloodHound CE Options"
57+
),
58+
bh_token_key: str = typer.Option(
59+
None, "--bh-token-key", help="BloodHound API token key",
60+
rich_help_panel="BloodHound CE Options"
61+
),
62+
bh_server: str = typer.Option(
63+
"http://127.0.0.1:8080", "--bh-server", help="BloodHound CE URL",
64+
rich_help_panel="BloodHound CE Options"
65+
)):
3866
"""
39-
Generate BloodHound compatible JSON from logs written by the ldapsearch BOF, pyldapsearch and specific C2 frameworks
67+
Generate BloodHound compatible JSON from logs written by the ldapsearch BOF, pyldapsearch and
68+
specific C2 frameworks
4069
"""
4170

4271
if debug:
@@ -48,122 +77,83 @@ def main(
4877
banner()
4978

5079
# default to Cobalt logfile naming format
51-
logfile_name_format = "beacon*.log"
80+
data_source = None
81+
82+
match parser_type:
5283

53-
match parser:
54-
5584
case ParserType.LdapsearchBof:
5685
logger.debug("Using ldapsearch parser")
57-
parser = LdapSearchBofParser
58-
86+
data_source = FileDataSource(str(input_files), "beacon*.log")
87+
5988
case ParserType.BRC4:
6089
logger.debug("Using Brute Ratel parser")
61-
parser = Brc4LdapSentinelParser
62-
logfile_name_format = "b-*.log"
6390
if input_files == "/opt/cobaltstrike/logs":
6491
input_files = "/opt/bruteratel/logs"
92+
data_source = FileDataSource(str(input_files), "b-*.log")
6593

6694
case ParserType.HAVOC:
6795
logger.debug("Using Havoc parser")
68-
parser = HavocParser
69-
logfile_name_format = "Console_*.log"
7096
if input_files == "/opt/cobaltstrike/logs":
7197
input_files = "/opt/havoc/data/loot"
98+
data_source = FileDataSource(str(input_files), "Console_*.log")
7299

73100
case ParserType.OUTFLANKC2:
74101
logger.debug("Using OutflankC2 parser")
75-
parser = OutflankC2JsonParser
76-
logfile_name_format = "*.json"
102+
data_source = FileDataSource(
103+
str(input_files), "*.json", stream_type=OutflankDataStream
104+
)
77105

78106
case ParserType.MYTHIC:
79107
logger.debug("Using Mythic parser")
80-
parser = MythicParser()
81108
if mythic_token is None:
82109
logger.error("Mythic server and API token must be provided")
83110
sys.exit(-1)
84-
#
85-
# instead of iteraitng over log files on disk, we'll iterate over
86-
# Mythic callback objects
87-
#
88-
sync(parser.connect(mythic_server, mythic_token))
89-
cs_logs = sync(parser.collect_callbacks())
90-
91-
111+
data_source = MythicDataSource(mythic_server, mythic_token)
112+
92113
case _:
93-
raise ValueError(f"Unknown parser type: {parser}")
94-
95-
if os.path.isfile(input_files):
96-
cs_logs = [input_files]
97-
logger.debug(f"Log file explicitly provided {input_files}")
98-
elif os.path.isdir(input_files):
99-
# recurisively get a list of all .log files in the input directory, sorted by last modified time
100-
cs_logs = glob.glob(f"{input_files}/**/{logfile_name_format}", recursive=True)
101-
if len(cs_logs) == 0:
102-
# check for pyldapsearch/soapy logs
103-
cs_logs = glob.glob(f"{input_files}/*.log", recursive=True)
104-
105-
cs_logs.sort(key=os.path.getmtime)
106-
107-
if len(cs_logs) == 0:
108-
logger.error(f"No log files found in {input_files}!")
109-
return
110-
else:
111-
logger.info(f"Located {len(cs_logs)} beacon log files")
112-
else:
113-
if not isinstance(parser, MythicParser):
114-
logger.error(f"Could not find {input_files} on disk")
115-
sys.exit(-1)
116-
117-
parsed_ldap_objects = []
118-
parsed_local_objects = []
119-
with console.status(f"", spinner="aesthetic") as status:
120-
for log in cs_logs:
121-
status.update(f" [bold] Parsing {log}")
122-
formatted_data = parser.prep_file(log)
123-
new_objects = parser.parse_data(formatted_data)
124-
125-
# jank insert to reparse outflank logs for local data
126-
if parser == OutflankC2JsonParser:
127-
new_local_objects = parser.parse_local_objects(log)
128-
else:
129-
new_local_objects = parser.parse_local_objects(formatted_data)
130-
131-
logger.debug(f"Parsed {log}")
132-
logger.debug(f"Found {len(new_objects)} objects in {log}")
133-
parsed_ldap_objects.extend(new_objects)
134-
parsed_local_objects.extend(new_local_objects)
135-
136-
logger.info(f"Parsed {len(parsed_ldap_objects)} LDAP objects from {len(cs_logs)} log files")
137-
logger.info(f"Parsed {len(parsed_local_objects)} local group/session objects from {len(cs_logs)} log files")
114+
raise ValueError(f"Unknown parser type: {parser_type}")
138115

139116
ad = ADDS()
140117
broker = LocalBroker()
141-
118+
pipeline = ParsingPipelineFactory.create_pipeline(parser_type=parser_type)
119+
120+
with console.status("", spinner="aesthetic") as status:
121+
results = pipeline.process_data_source(
122+
data_source,
123+
progress_callback=lambda id: status.update(f"Processing {id}")
124+
)
125+
126+
ldap_objects = results.get_ldap_objects()
127+
local_objects = results.get_local_group_memberships() + results.get_sessions() + \
128+
results.get_privileged_sessions() + results.get_registry_sessions()
129+
logger.info("Parsed %d LDAP objects", len(ldap_objects))
130+
logger.info("Parsed %d local group/session objects", len(local_objects))
142131
logger.info("Sorting parsed objects by type...")
143-
ad.import_objects(parsed_ldap_objects)
144-
broker.import_objects(parsed_local_objects, ad.DOMAIN_MAP.values())
145-
146-
logger.info(f"Parsed {len(ad.users)} Users")
147-
logger.info(f"Parsed {len(ad.groups)} Groups")
148-
logger.info(f"Parsed {len(ad.computers)} Computers")
149-
logger.info(f"Parsed {len(ad.domains)} Domains")
150-
logger.info(f"Parsed {len(ad.trustaccounts)} Trust Accounts")
151-
logger.info(f"Parsed {len(ad.ous)} OUs")
152-
logger.info(f"Parsed {len(ad.containers)} Containers")
153-
logger.info(f"Parsed {len(ad.gpos)} GPOs")
154-
logger.info(f"Parsed {len(ad.enterprisecas)} Enterprise CAs")
155-
logger.info(f"Parsed {len(ad.aiacas)} AIA CAs")
156-
logger.info(f"Parsed {len(ad.rootcas)} Root CAs")
157-
logger.info(f"Parsed {len(ad.ntauthstores)} NTAuth Stores")
158-
logger.info(f"Parsed {len(ad.issuancepolicies)} Issuance Policies")
159-
logger.info(f"Parsed {len(ad.certtemplates)} Cert Templates")
160-
logger.info(f"Parsed {len(ad.schemas)} Schemas")
161-
logger.info(f"Parsed {len(ad.CROSSREF_MAP)} Referrals")
162-
logger.info(f"Parsed {len(ad.unknown_objects)} Unknown Objects")
163-
logger.info(f"Parsed {len(broker.sessions)} Sessions")
164-
logger.info(f"Parsed {len(broker.privileged_sessions)} Privileged Sessions")
165-
logger.info(f"Parsed {len(broker.registry_sessions)} Registry Sessions")
166-
logger.info(f"Parsed {len(broker.local_group_memberships)} Local Group Memberships")
132+
133+
ad.import_objects(ldap_objects)
134+
broker.import_objects(results, ad.DOMAIN_MAP.values())
135+
136+
logger.info("Parsed %d Users", len(ad.users))
137+
logger.info("Parsed %d Groups", len(ad.groups))
138+
logger.info("Parsed %d Computers", len(ad.computers))
139+
logger.info("Parsed %d Domains", len(ad.domains))
140+
logger.info("Parsed %d Trust Accounts", len(ad.trustaccounts))
141+
logger.info("Parsed %d OUs", len(ad.ous))
142+
logger.info("Parsed %d Containers", len(ad.containers))
143+
logger.info("Parsed %d GPOs", len(ad.gpos))
144+
logger.info("Parsed %d Enterprise CAs", len(ad.enterprisecas))
145+
logger.info("Parsed %d AIA CAs", len(ad.aiacas))
146+
logger.info("Parsed %d Root CAs", len(ad.rootcas))
147+
logger.info("Parsed %d NTAuth Stores", len(ad.ntauthstores))
148+
logger.info("Parsed %d Issuance Policies", len(ad.issuancepolicies))
149+
logger.info("Parsed %d Cert Templates", len(ad.certtemplates))
150+
logger.info("Parsed %d Schemas", len(ad.schemas))
151+
logger.info("Parsed %d Referrals", len(ad.CROSSREF_MAP))
152+
logger.info("Parsed %d Unknown Objects", len(ad.unknown_objects))
153+
logger.info("Parsed %d Sessions", len(broker.sessions))
154+
logger.info("Parsed %d Privileged Sessions", len(broker.privileged_sessions))
155+
logger.info("Parsed %d Registry Sessions", len(broker.registry_sessions))
156+
logger.info("Parsed %d Local Group Memberships", len(broker.local_group_memberships))
167157

168158
ad.process()
169159
ad.process_local_objects(broker)
@@ -194,10 +184,10 @@ def main(
194184
# Upload files to BloodHound CE
195185
#
196186
if bh_token_id and bh_token_key and bh_server:
197-
with console.status(f"", spinner="aesthetic") as status:
198-
status.update(f" [bold] Uploading files to BloodHound server...")
187+
with console.status("", spinner="aesthetic") as status:
188+
status.update(" [bold] Uploading files to BloodHound server...")
199189
uploader = BloodHoundUploader(bh_server, bh_token_id, bh_token_key)
200-
190+
201191
if not uploader.create_upload_job():
202192
return
203193

@@ -209,6 +199,7 @@ def main(
209199

210200

211201
def banner():
202+
"""Display the bofhound banner."""
212203
print('''
213204
_____________________________ __ __ ______ __ __ __ __ _______
214205
| _ / / __ / | ____/| | | | / __ \\ | | | | | \\ | | | \\

0 commit comments

Comments
 (0)