Skip to content
65 changes: 65 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,71 @@ The SSE server listens at `http://localhost:8807/v1/mcp/`

After connecting your AI client to BBOT Server, you can ask it sensible questions like, "Use MCP to get all the bbot findings", "what are the top open ports?", "what else can you do with BBOT MCP?", etc.

## As a Python Library

You can interact fully with BBOT Server as a Python library. It supports either local or remote connections, and the interface to both is identical:

### Asynchronous

```python
import asyncio
from bbot_server import BBOTServer

async def main():
# talk directly to local MongoDB + Redis
bbot_server = BBOTServer(interface="python")

# or to a remote BBOT Server instance (config must contain a valid API key)
bbot_server = BBOTServer(interface="http", url="http://bbot:8807/v1/")

# one-time setup
await bbot_server.setup()

hosts = await bbot_server.get_hosts()
print(f"hosts: {hosts}")

if __name__ == "__main__":
asyncio.run(main())
```

### Synchronous

```python
from bbot_server import BBOTServer

if __name__ == "__main__":
# talk directly to local MongoDB + Redis
bbot_server = BBOTServer(interface="python", synchronous=True)

# or to a remote BBOT Server instance (config must contain a valid API key)
bbot_server = BBOTServer(interface="http", url="http://bbot:8807/v1/", synchronous=True)

# one-time setup
bbot_server.setup()

hosts = bbot_server.get_hosts()
print(f"hosts: {hosts}")
```

## Running Tests

When running tests, first start MongoDB and Redis via Docker:

```bash
docker run --rm -p 27017:27017 mongo
docker run --rm -p 6379:6379 redis
```

Then execute `pytest`:

```bash
# run all tests
poetry run pytest -v

# run specific tests
poetry run pytest -v -k test_applet_scans
```

## Screenshots

*Tailing activities in real time*
Expand Down
2 changes: 0 additions & 2 deletions bbot_server/applets/_root.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
class RootApplet(BaseApplet):
name = "Root Applet"

attach_to = ""

_nested = False

_route_prefix = ""
Expand Down
1 change: 1 addition & 0 deletions bbot_server/applets/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class BaseApplet:
model = None

# which other applet should include this one
# leave blank to attach to the root applet
attach_to = ""

# whether to nest this applet under its parent
Expand Down
5 changes: 4 additions & 1 deletion bbot_server/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ def db_config(self):

@property
def uri(self):
return self.db_config.get("uri", "")
uri = self.db_config.get("uri", "")
if not uri:
raise BBOTServerValueError(f"Database URI is missing from config: {self.db_config}")
return uri

@property
def db_name(self):
Expand Down
26 changes: 21 additions & 5 deletions bbot_server/event_store/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ class BaseEventStore(BaseDB):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.event_store_config = self.config.get("event_store", {})
self.archive_after_days = self.event_store_config.get("archive_after", 90)
self.archive_cron = self.event_store_config.get("archive_cron", "0 0 * * *")
self.archive_after_days = self.db_config.get("archive_after", 90)
self.archive_cron = self.db_config.get("archive_cron", "0 0 * * *")

async def insert_event(self, event):
if not isinstance(event, Event):
Expand All @@ -20,9 +19,26 @@ async def get_event(self, uuid: str):
event = await self._get_event(uuid)
return Event(**event)

async def get_events(self, host: str = None, type=None, min_timestamp=None, archived=False, active=True):
async def get_events(
self,
host: str = None,
domain: str = None,
type: str = None,
scan: str = None,
min_timestamp: float = None,
max_timestamp: float = None,
archived: bool = False,
active: bool = True,
):
async for event in self._get_events(
host=host, type=type, min_timestamp=min_timestamp, archived=archived, active=active
host=host,
domain=domain,
type=type,
scan=scan,
min_timestamp=min_timestamp,
max_timestamp=max_timestamp,
archived=archived,
active=active,
):
yield Event(**event)

Expand Down
34 changes: 28 additions & 6 deletions bbot_server/event_store/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from motor.motor_asyncio import AsyncIOMotorClient


from bbot_server.errors import BBOTServerNotFoundError
from bbot_server.errors import BBOTServerNotFoundError, BBOTServerValueError
from bbot_server.event_store._base import BaseEventStore


class MongoEventStore(BaseEventStore):
"""
docker run --rm -p 27017:27017 mongo
docker run --rm -p 127.0.0.1:27017:27017 mongo
"""

async def _setup(self):
Expand All @@ -29,21 +29,43 @@ async def _insert_event(self, event):
event_json = event.model_dump()
await self.collection.insert_one(event_json)

async def _get_events(self, host: str, type: str, min_timestamp: float, archived: bool, active: bool):
async def _get_events(
self,
host: str,
domain: str,
type: str,
scan: str,
min_timestamp: float,
max_timestamp: float,
active: bool,
archived: bool,
):
"""
Get all events from the database, or if min_timestamp is provided, get the newest events up to that timestamp
Get all events from the database based on the provided filters
"""
query = {}
if type is not None:
query["type"] = {"$eq": type}
query["type"] = type
if min_timestamp is not None:
query["timestamp"] = {"$gte": min_timestamp}
# if both active and archived are true, we don't need to filter anything
if not (active and archived):
# if both are false, we need to raise an error
if not (active or archived):
raise ValueError("Must query at least one of active or archived")
raise BBOTServerValueError("Must query at least one of active or archived")
# otherwise if only one is true, we need to filter by the other
query["archived"] = {"$eq": archived}
if max_timestamp is not None:
query["timestamp"] = {"$lte": max_timestamp}
if scan is not None:
query["scan"] = scan
if host is not None:
query["host"] = host
if domain is not None:
# match reverse_host with regex
reversed_host = domain[::-1]
query["reverse_host"] = {"$regex": f"^{reversed_host}(\\.|$)"}
self.log.debug(f"Querying events: {query}")
async for event in self.collection.find(query):
yield event

Expand Down
2 changes: 1 addition & 1 deletion bbot_server/message_queue/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class RedisMessageQueue(BaseMessageQueue):
- bbot:stream:{subject}: for persistent, tailable streams - e.g. events, activities
- bbot:work:{subject}: for one-time messages, e.g. tasks

docker run --rm -p 6379:6379 redis
docker run --rm -p 127.0.0.1:6379:6379 redis
"""

def __init__(self, *args, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion bbot_server/modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def load_python_file(file, namespace, module_dict, base_class_name, module_key_a
log.error(f"Module {value.__name__} does not define required attribute{module_key_attr}")
parent_name = getattr(value, "attach_to", "")
if not parent_name:
log.error(f"Module {value.__name__} does not define required attribute 'attach_to'")
parent_name = "root_applet"
module_family = module_dict.get(parent_name, {})
# if we get a duplicate module name, raise an error
if module_name in module_family:
Expand Down
3 changes: 1 addition & 2 deletions bbot_server/modules/activity/activity_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
class ActivityApplet(BaseApplet):
name = "Activity"
watched_activities = ["*"]
description = "Query and tail BBOT server activity"
attach_to = "root_applet"
description = "Query BBOT server activities"
model = Activity

async def handle_activity(self, activity: Activity, asset: Asset = None):
Expand Down
2 changes: 1 addition & 1 deletion bbot_server/modules/agents/agents_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ async def _kickoff_queued_scans(self):
return 0

async def _kickoff_queued_scans_loop(self):
for i in range(1000):
while True:
online_agents = await self.get_online_agents()
online_agents = [str(agent.id) for agent in online_agents]
if not online_agents:
Expand Down
37 changes: 29 additions & 8 deletions bbot_server/modules/assets/assets_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
class AssetsApplet(BaseApplet):
name = "Assets"
description = "hostnames and IP addresses discovered during scans"
attach_to = "root_applet"

model = Asset

Expand Down Expand Up @@ -125,10 +124,6 @@ async def _get_assets(
query["type"] = type
if host is not None:
query["host"] = host
if domain is not None:
reversed_host = domain[::-1]
# Match exact domain or subdomains (with dot separator)
query["reverse_host"] = {"$regex": f"^{reversed_host}(\\.|$)"}
if target_id is not None:
target_query_kwargs = {}
if target_id != "DEFAULT":
Expand All @@ -137,7 +132,8 @@ async def _get_assets(
query["scope"] = target["id"]
if search is not None:
query["$text"] = {"$search": search}
async for asset in self._query_assets(query, fields, sort):
self.log.debug(f"Querying assets: query={query} / fields={fields}")
async for asset in self._query_assets(query, fields, domain=domain, sort=sort):
yield asset

async def _get_asset(
Expand All @@ -154,17 +150,42 @@ async def _get_asset(
query["host"] = host
return await self.collection.find_one(query, fields)

async def _query_assets(self, query: dict, fields: list[str] = None, sort: list[tuple[str, int]] = None):
async def _query_assets(
self,
query: dict,
fields: list[str] = None,
domain: str = None,
sort: list[tuple[str, int]] = None,
archived: bool = False,
active: bool = True,
):
"""
Lowest-level query function for getting assets from the database.

Lets you specify your own custom query, but also provides some convenience filters.

Args:
query: Additional query parameters (mongo)
fields: List of fields to return
domain: Filter assets by domain (including subdomains)
archived: Return archived assets (default: False)
active: Return active assets (default: True)
sort: Fields and direction to sort by, e.g. sort=[("last_seen", -1)]
"""
self.log.debug(f"Querying assets: query={query} / fields={fields}")
query = dict(query or {})
fields = {f: 1 for f in fields} if fields else None
if domain is not None:
reversed_host = domain[::-1]
# Match exact domain or subdomains (with dot separator)
query["reverse_host"] = {"$regex": f"^{reversed_host}(\\.|$)"}
# if both active and archived are true, we don't need to filter anything, because we are returning all assets
if not (active and archived):
# if both are false, we need to raise an error
if not (active or archived):
raise ValueError("Must query at least one of active or archived")
# only one should be true
query["archived"] = {"$eq": archived}
self.log.debug(f"Querying assets: domain={domain} / query={query} / fields={fields}")
cursor = self.collection.find(query, fields)
if sort:
cursor = cursor.sort(sort)
Expand Down
1 change: 0 additions & 1 deletion bbot_server/modules/cloud/cloud_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ class CloudApplet(BaseApplet):
name = "Cloud"
watched_activities = ["NEW_DNS_LINK", "DNS_LINK_REMOVED"]
description = "Cloud providers discovered during scans. Makes use of the cloudcheck library (https://github.com/blacklanternsecurity/cloudcheck)"
attach_to = "root_applet"

async def setup(self):
import cloudcheck
Expand Down
24 changes: 21 additions & 3 deletions bbot_server/modules/events/events_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class EventsApplet(BaseApplet):
name = "Events"
watched_events = ["*"]
description = "query raw BBOT scan events"
attach_to = "root_applet"

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -59,8 +58,27 @@ async def archive_old_events(
self._archive_events_task = asyncio.create_task(self._archive_events(older_than=older_than))

@api_endpoint("/list", methods=["GET"], type="http_stream", response_model=Event, summary="Stream all events")
async def get_events(self, type: str = None, host: str = None, archived: bool = False, active: bool = True):
async for event in self.event_store.get_events(type=type, host=host, archived=archived, active=active):
async def get_events(
self,
type: str = None,
host: str = None,
domain: str = None,
scan: str = None,
min_timestamp: float = None,
max_timestamp: float = None,
active: bool = True,
archived: bool = False,
):
async for event in self.event_store.get_events(
type=type,
host=host,
domain=domain,
scan=scan,
min_timestamp=min_timestamp,
max_timestamp=max_timestamp,
archived=archived,
active=active,
):
yield event

@api_endpoint(
Expand Down
9 changes: 8 additions & 1 deletion bbot_server/modules/events/events_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@ class EventCTL(BaseBBCTL):
def list(
self,
type: Annotated[str, typer.Option("--type", "-t", help="Filter events by type")] = None,
host: common.host = None,
domain: common.domain = None,
scan: Annotated[str, typer.Option("--scan", "-s", help="Filter events by scan ID")] = None,
active: Annotated[bool, typer.Option("--active", "-a", help="Include active (non-archived) events")] = True,
archived: Annotated[bool, typer.Option("--archived", "-r", help="Include archived events")] = False,
json: common.json = False,
csv: common.csv = False,
):
event_list = self.bbot_server.get_events(type=type)
event_list = self.bbot_server.get_events(
type=type, host=host, domain=domain, scan=scan, active=active, archived=archived
)

if json:
for event in event_list:
Expand Down
1 change: 0 additions & 1 deletion bbot_server/modules/scans/scans_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class ScansApplet(BaseApplet):
description = "scans"
watched_events = ["SCAN"]
watched_activities = ["SCAN_STATUS"]
attach_to = "root_applet"
model = Scan

async def setup(self):
Expand Down
Loading