Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bbot/core/event/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1247,6 +1247,10 @@ def _words(self):
return set()


class OPEN_UDP_PORT(OPEN_TCP_PORT):
pass


class URL_UNVERIFIED(BaseEvent):
_status_code_regex = re.compile(r"^status-(\d{1,3})$")

Expand Down
82 changes: 82 additions & 0 deletions bbot/modules/censys_dns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from bbot.modules.templates.censys import censys


class censys_dns(censys):
"""
Query the Censys certificates API for subdomains.
Thanks to https://github.com/owasp-amass/amass/blob/master/resources/scripts/cert/censys.ads
"""

watched_events = ["DNS_NAME"]
produced_events = ["DNS_NAME"]
flags = ["subdomain-enum", "passive", "safe"]
meta = {
"description": "Query the Censys API for subdomains",
"created_date": "2022-08-04",
"author": "@TheTechromancer",
"auth_required": True,
}
options = {"api_key": "", "max_pages": 5}
options_desc = {
"api_key": "Censys.io API Key in the format of 'key:secret'",
"max_pages": "Maximum number of pages to fetch (100 results per page)",
}

async def setup(self):
self.max_pages = self.config.get("max_pages", 5)
return await super().setup()

async def query(self, query):
results = set()
cursor = ""
for i in range(self.max_pages):
url = f"{self.base_url}/v2/certificates/search"
json_data = {
"q": f"names: {query}",
"per_page": 100,
}
if cursor:
json_data.update({"cursor": cursor})
resp = await self.api_request(
url,
method="POST",
json=json_data,
)

if resp is None:
break

try:
d = resp.json()
except Exception as e:
self.warning(f"Failed to parse JSON from {url} (response: {resp}): {e}")

if resp.status_code < 200 or resp.status_code >= 400:
if isinstance(d, dict):
error = d.get("error", "")
if error:
self.warning(error)
self.verbose(f'Non-200 Status code: {resp.status_code} for query "{query}", page #{i + 1}')
self.debug(f"Response: {resp.text}")
break
else:
if d is None:
break
elif not isinstance(d, dict):
break
status = d.get("status", "").lower()
result = d.get("result", {})
hits = result.get("hits", [])
if status != "ok" or not hits:
break

for h in hits:
names = h.get("names", [])
for n in names:
results.add(n.strip(".*").lower())

cursor = result.get("links", {}).get("next", "")
if not cursor:
break

return results
177 changes: 177 additions & 0 deletions bbot/modules/censys_ip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
from bbot.modules.templates.censys import censys


class censys_ip(censys):
"""
Query the Censys /v2/hosts/{ip} endpoint for associated hostnames, IPs, and URLs.
"""

watched_events = ["IP_ADDRESS"]
produced_events = [
"IP_ADDRESS",
"DNS_NAME",
"URL_UNVERIFIED",
"OPEN_TCP_PORT",
"OPEN_UDP_PORT",
"TECHNOLOGY",
"PROTOCOL",
]
flags = ["passive", "safe"]
meta = {
"description": "Query the Censys API for hosts by IP address",
"created_date": "2026-01-26",
"author": "@TheTechromancer",
"auth_required": True,
}
options = {"api_key": "", "dns_names_limit": 100, "in_scope_only": True}
options_desc = {
"api_key": "Censys.io API Key in the format of 'key:secret'",
"dns_names_limit": "Maximum number of DNS names to extract from dns.names (default 100)",
"in_scope_only": "Only query in-scope IPs. If False, will query up to distance 1.",
}
scope_distance_modifier = 1

async def setup(self):
self.dns_names_limit = self.config.get("dns_names_limit", 100)
self.warning(
"This module may consume a lot of API queries. Unless you specifically want to query on each individual IP, we recommend using the censys_dns module instead."
)
return await super().setup()

async def filter_event(self, event):
in_scope_only = self.config.get("in_scope_only", True)
max_scope_distance = 0 if in_scope_only else (self.scan.scope_search_distance + 1)
if event.scope_distance > max_scope_distance:
return False, "event is not in scope"
return True

async def handle_event(self, event):
ip = str(event.host)
url = f"{self.base_url}/v2/hosts/{ip}"

resp = await self.api_request(url)
if resp is None:
self.debug(f"No response for {ip}")
return

if resp.status_code == 404:
self.debug(f"No data found for {ip}")
return

if resp.status_code != 200:
self.verbose(f"Non-200 status code ({resp.status_code}) for {ip}")
return

try:
data = resp.json()
except Exception as e:
self.warning(f"Failed to parse JSON response for {ip}: {e}")
return

result = data.get("result", {})
if not result:
return

# Track what we've already emitted to avoid duplicates
seen = set()

# Extract data from services
for service in result.get("services", []):
port = service.get("port")
transport = service.get("transport_protocol", "TCP").upper()

# Emit OPEN_TCP_PORT or OPEN_UDP_PORT for services with a port
# QUIC uses UDP as transport, so treat it as UDP
if port and (port, transport) not in seen:
seen.add((port, transport))
if transport in ("UDP", "QUIC"):
event_type = "OPEN_UDP_PORT"
else:
event_type = "OPEN_TCP_PORT"
await self.emit_event(
self.helpers.make_netloc(ip, port),
event_type,
parent=event,
context="{module} found open port on {event.parent.data}",
)

# Emit PROTOCOL for non-HTTP services
# Use extended_service_name (more specific) falling back to service_name
# Also check transport_protocol for protocols like QUIC
service_name = service.get("extended_service_name") or service.get("service_name", "")
# If service_name is UNKNOWN but transport_protocol is meaningful, use that
if service_name.upper() == "UNKNOWN" and transport and transport not in ("TCP", "UDP"):
service_name = transport
if service_name and service_name.upper() not in ("HTTP", "HTTPS", "UNKNOWN"):
protocol_key = ("protocol", service_name.upper(), port)
if protocol_key not in seen:
seen.add(protocol_key)
protocol_data = {"host": str(event.host), "protocol": service_name}
if port:
protocol_data["port"] = port
await self.emit_event(
protocol_data,
"PROTOCOL",
parent=event,
context="{module} found {event.type}: {event.data[protocol]} on {event.parent.data}",
)

# Extract URLs from HTTP services
http_data = service.get("http", {})
request = http_data.get("request", {})
uri = request.get("uri")
if uri and uri not in seen:
seen.add(uri)
await self.emit_event(
uri,
"URL_UNVERIFIED",
parent=event,
context="{module} found {event.data} in HTTP service of {event.parent.data}",
)

# Extract TLS certificate data
tls_data = service.get("tls", {})
certs = tls_data.get("certificates", {})
leaf_data = certs.get("leaf_data", {})

# Extract names from leaf_data.names
for name in leaf_data.get("names", []):
await self._emit_host(name, event, seen, "TLS certificate")

# Extract common_name from leaf_data.subject
subject = leaf_data.get("subject", {})
for cn in subject.get("common_name", []):
await self._emit_host(cn, event, seen, "TLS certificate subject")

# Extract software/technologies
for software in service.get("software", []):
product = software.get("uniform_resource_identifier", software.get("product", ""))
if product:
await self.emit_event(
{"technology": product, "host": str(event.host)},
"TECHNOLOGY",
parent=event,
context="{module} found {event.type}: {event.data[technology]} on {event.parent.data}",
)

# Extract dns.names (limit to configured max)
dns_data = result.get("dns", {})
dns_names = dns_data.get("names", [])
for name in dns_names[: self.dns_names_limit]:
await self._emit_host(name, event, seen, "reverse DNS")

async def _emit_host(self, host, event, seen, source):
"""Emit IP_ADDRESS or DNS_NAME for a host value."""
# Validate and emit as DNS_NAME
try:
validated = self.helpers.validators.validate_host(host)
except ValueError as e:
self.debug(f"Error validating host {host} in {source}: {e}")
if validated and validated not in seen:
seen.add(validated)
await self.emit_event(
validated,
"DNS_NAME",
parent=event,
context=f"{{module}} found {{event.data}} in {source} of {{event.parent.data}}",
)
54 changes: 54 additions & 0 deletions bbot/modules/templates/censys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import traceback

from bbot.modules.templates.subdomain_enum import subdomain_enum_apikey


class censys(subdomain_enum_apikey):
"""
Base template for Censys API modules.
Provides common authentication and API request handling.
"""

options = {"api_key": ""}
options_desc = {"api_key": "Censys.io API Key in the format of 'key:secret'"}

base_url = "https://search.censys.io/api"

async def setup(self):
await super().setup()
api_keys = set()
for module_name in ("censys", "censys_dns", "censys_ip"):
module_config = self.scan.config.get("modules", {}).get(module_name, {})
api_key = module_config.get("api_key", "")
if isinstance(api_key, str):
api_key = [api_key]
for key in api_key:
key = key.strip()
if key:
api_keys.add(key)
if not api_keys:
if self.auth_required:
return None, "No API key set"
self.api_key = api_keys.pop() if api_keys else ""
try:
await self.ping()
self.hugesuccess("API is ready")
return True
except Exception as e:
self.trace(traceback.format_exc())
return None, f"Error with API ({str(e).strip()})"

async def ping(self):
url = f"{self.base_url}/v1/account"
resp = await self.api_request(url, retry_on_http_429=False)
d = resp.json()
assert isinstance(d, dict), f"Invalid response from {url}: {resp}"
quota = d.get("quota", {})
used = int(quota.get("used", 0))
allowance = int(quota.get("allowance", 0))
assert used < allowance, "No quota remaining"

def prepare_api_request(self, url, kwargs):
api_id, api_secret = self.api_key.split(":", 1)
kwargs["auth"] = (api_id, api_secret)
return url, kwargs
3 changes: 1 addition & 2 deletions bbot/modules/wpscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ class wpscan(BaseModule):
},
{
"name": "Install wpscan gem",
# we install globally because installing to a user's home dir is unpredictable across different distros and often missing from PATH
"shell": "gem install wpscan --no-user-install",
"gem": {"name": "wpscan", "state": "latest", "user_install": False},
"become": True,
},
]
Expand Down
1 change: 1 addition & 0 deletions bbot/test/test_step_1/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ async def test_helpers_misc(helpers, scan, bbot_scanner, bbot_httpserver):
assert helpers.validators.validate_host("LOCALHOST ") == "localhost"
assert helpers.validators.validate_host(" 192.168.1.1") == "192.168.1.1"
assert helpers.validators.validate_host(" Dead::c0dE ") == "dead::c0de"
assert helpers.validators.validate_host(".*.wildcard.evilcorp.com") == "wildcard.evilcorp.com"
assert helpers.validators.soft_validate(" evilCorp.COM", "host") is True
assert helpers.validators.soft_validate("!@#$", "host") is False
with pytest.raises(ValueError):
Expand Down
Loading
Loading