Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions bbot/scanner/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ async def async_start_without_generator(self):
pass

async def async_start(self):
"""
Main asynchronous entrypoint for running a scan.
"""
self.start_time = datetime.now(ZoneInfo("UTC"))
self.root_event.data["started_at"] = self.start_time.timestamp()
await self._set_status(SCAN_STATUS_STARTING)
Expand Down Expand Up @@ -399,6 +402,9 @@ async def async_start(self):
new_activity = await self.finish()
if not new_activity:
self._success = True
scan_finish_event = await self._mark_finished()
if scan_finish_event is not None:
yield scan_finish_event
break

await asyncio.sleep(0.1)
Expand All @@ -422,8 +428,6 @@ async def async_start(self):
self.critical(f"Unexpected error during scan:\n{traceback.format_exc()}")

finally:
scan_finish_event = await self._mark_finished()
yield scan_finish_event
tasks = self._cancel_tasks()
self.debug(f"Awaiting {len(tasks):,} tasks")
for task in tasks:
Expand Down Expand Up @@ -466,9 +470,7 @@ async def _mark_finished(self):
self.duration_seconds = self.duration.total_seconds()
self.duration_human = self.helpers.human_timedelta(self.duration)

self._scan_finish_status_message = (
f"Scan {self.name} completed in {self.duration_human} with status {self.status}"
)
self._scan_finish_status_message = f"Scan {self.name} completed in {self.duration_human} with status {status}"

scan_finish_event = self.finish_event(self._scan_finish_status_message, status)

Expand Down
19 changes: 18 additions & 1 deletion bbot/test/test_step_2/module_tests/test_module_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,24 @@ async def setup_before_prep(self, module_test):
# Wait for Kafka to be ready
await self.wait_for_port_open(9092)

await asyncio.sleep(1)
# Port open does not guarantee Kafka is fully initialized.
# Try to connect using AIOKafkaConsumer until it succeeds.
from aiokafka import AIOKafkaConsumer

max_attempts = 30
for attempt in range(1, max_attempts + 1):
consumer = AIOKafkaConsumer(
"bbot_events",
bootstrap_servers="localhost:9092",
group_id="readiness_check_group",
)
try:
await consumer.start()
await consumer.stop()
break
except Exception as e:
module_test.log.verbose(f"Kafka not ready yet (attempt {attempt}/{max_attempts}): {e}")
await asyncio.sleep(1)

async def check(self, module_test, events):
from aiokafka import AIOKafkaConsumer
Expand Down
27 changes: 26 additions & 1 deletion bbot/test/test_step_2/module_tests/test_module_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,34 @@ async def setup_before_prep(self, module_test):
)
stdout, stderr = await process.communicate()

# wait for the container to start
# wait for the container to start accepting TCP connections
await self.wait_for_port_open(3306)

# additionally, wait until MySQL is actually ready to accept queries.
# Port open does not guarantee the server has finished initialization,
# which can cause flaky IncompleteReadError / CR_SERVER_LOST failures.
try:
import aiomysql
except Exception: # pragma: no cover - import errors should surface in the test itself
aiomysql = None

if aiomysql is not None:
max_attempts = 20
for attempt in range(1, max_attempts + 1):
try:
conn = await aiomysql.connect(
user="root",
password="bbotislife",
db="bbot",
host="localhost",
)
conn.close()
break
except Exception as e:
# MySQL isn't ready yet; keep waiting a bit longer
module_test.log.verbose(f"MySQL not ready yet (attempt {attempt}/{max_attempts}): {e}")
await asyncio.sleep(1)

if process.returncode != 0:
self.log.error(f"Failed to start MySQL server: {stderr.decode()}")

Expand Down
20 changes: 14 additions & 6 deletions bbot/test/test_step_2/module_tests/test_module_nats.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,22 @@ async def setup_before_prep(self, module_test):
# Wait for NATS to be ready by checking the port
await self.wait_for_port_open(4222)

# Connect to NATS
# Connect to NATS (with retries until it is actually ready to accept connections)
import nats

try:
self.nc = await nats.connect(["nats://localhost:4222"])
except Exception as e:
self.log.error(f"Error connecting to NATS: {e}")
raise
max_attempts = 20
last_exc = None
for attempt in range(1, max_attempts + 1):
try:
self.nc = await nats.connect(["nats://localhost:4222"])
break
except Exception as e:
last_exc = e
self.log.verbose(f"NATS not ready yet (attempt {attempt}/{max_attempts}): {e}")
await asyncio.sleep(1)
else:
self.log.error(f"Error connecting to NATS after {max_attempts} attempts: {last_exc}")
raise last_exc

# Collect events from NATS
self.nats_events = []
Expand Down
25 changes: 24 additions & 1 deletion bbot/test/test_step_2/module_tests/test_module_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,32 @@ async def setup_before_prep(self, module_test):
"postgres",
)

# wait for the container to start
# wait for the container to start accepting TCP connections
await self.wait_for_port_open(5432)

# additionally, wait until PostgreSQL is actually ready to accept queries.
# Port open does not guarantee the server has finished initialization.
try:
import asyncpg
except Exception: # pragma: no cover - import errors should surface in the test itself
asyncpg = None

if asyncpg is not None:
max_attempts = 20
for attempt in range(1, max_attempts + 1):
try:
conn = await asyncpg.connect(
user="postgres",
password="bbotislife",
database="bbot",
host="127.0.0.1",
)
await conn.close()
break
except Exception as e:
self.log.verbose(f"PostgreSQL not ready yet (attempt {attempt}/{max_attempts}): {e}")
await asyncio.sleep(1)

if process.returncode != 0:
self.log.error("Failed to start PostgreSQL server")

Expand Down
Loading