Skip to content

Commit 41bc6e5

Browse files
committed
feat: add startup banner and enhanced event formatting for better visibility
1 parent debda3a commit 41bc6e5

File tree

4 files changed

+132
-18
lines changed

4 files changed

+132
-18
lines changed

src/asynctasq/core/worker.py

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
from typing import Any
99
import uuid
1010

11+
from rich.panel import Panel
12+
from rich.table import Table
13+
1114
from asynctasq.config import Config
1215
from asynctasq.drivers.base_driver import BaseDriver
1316
from asynctasq.drivers.retry_utils import calculate_retry_delay
@@ -73,6 +76,41 @@ def __init__(
7376
self._task_serializer = TaskSerializer(self.serializer)
7477
self._task_executor = TaskExecutor()
7578

79+
def _display_startup_banner(self) -> None:
80+
"""Display a beautiful startup banner with worker configuration."""
81+
from rich.console import Console
82+
83+
console = Console()
84+
85+
# Create configuration table
86+
config_table = Table.grid(padding=(0, 2))
87+
config_table.add_column(style="cyan", justify="right")
88+
config_table.add_column(style="bold white")
89+
90+
config_table.add_row("Worker ID", f"[bold blue]{self.worker_id}[/bold blue]")
91+
config_table.add_row("Hostname", f"[dim]{self.hostname}[/dim]")
92+
config_table.add_row("Queues", f"[magenta]{', '.join(self.queues)}[/magenta]")
93+
config_table.add_row("Concurrency", f"[green]{self.concurrency}[/green]")
94+
config_table.add_row("Driver", f"[yellow]{type(self.queue_driver).__name__}[/yellow]")
95+
96+
if self.max_tasks:
97+
config_table.add_row("Max Tasks", f"[orange1]{self.max_tasks}[/orange1]")
98+
99+
if self.process_pool_size:
100+
config_table.add_row("Process Pool", f"[cyan]{self.process_pool_size} workers[/cyan]")
101+
102+
# Create a nice panel
103+
panel = Panel(
104+
config_table,
105+
title="[bold green]⚡ AsyncTasq Worker Starting[/bold green]",
106+
border_style="green",
107+
padding=(1, 2),
108+
)
109+
110+
console.print()
111+
console.print(panel)
112+
console.print()
113+
76114
async def start(self) -> None:
77115
"""Initialize worker and begin processing tasks until shutdown.
78116
@@ -95,6 +133,9 @@ async def start(self) -> None:
95133
except ImportError:
96134
logger.info("uvloop not available, using default event loop policy")
97135

136+
# Display startup banner
137+
self._display_startup_banner()
138+
98139
self._running = True
99140
self._start_time = datetime.now(UTC)
100141

@@ -127,13 +168,6 @@ async def start(self) -> None:
127168
for sig in (signal.SIGTERM, signal.SIGINT):
128169
loop.add_signal_handler(sig, self._handle_shutdown)
129170

130-
logger.info(
131-
"Worker %s starting: queues=%s, concurrency=%d",
132-
self.worker_id,
133-
self.queues,
134-
self.concurrency,
135-
)
136-
137171
# Emit worker_online event via global emitters
138172
await EventRegistry.emit(
139173
WorkerEvent(

src/asynctasq/monitoring/emitters.py

Lines changed: 85 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,40 @@ class LoggingEventEmitter(EventEmitter):
4545
Uses Rich for colorized, styled console output with icons and visual hierarchy.
4646
"""
4747

48+
def _format_duration(self, duration_ms: int | None) -> str:
49+
"""Format duration with color-coded performance indicators."""
50+
if duration_ms is None:
51+
return ""
52+
53+
# Convert to seconds for readability
54+
duration_s = duration_ms / 1000.0
55+
56+
# Color-code based on performance
57+
if duration_s < 1.0:
58+
color = "green"
59+
icon = "⚡"
60+
elif duration_s < 5.0:
61+
color = "cyan"
62+
icon = "✨"
63+
elif duration_s < 30.0:
64+
color = "yellow"
65+
icon = "⏱️"
66+
else:
67+
color = "red"
68+
icon = "🐌"
69+
70+
# Format with appropriate precision
71+
if duration_s < 1.0:
72+
duration_str = f"{duration_ms}ms"
73+
elif duration_s < 60.0:
74+
duration_str = f"{duration_s:.2f}s"
75+
else:
76+
minutes = int(duration_s // 60)
77+
seconds = duration_s % 60
78+
duration_str = f"{minutes}m {seconds:.1f}s"
79+
80+
return f"{icon} [{color}]{duration_str}[/{color}]"
81+
4882
def _format_task_event(self, event: TaskEvent) -> str:
4983
"""Format a task event with colors and icons."""
5084
# Event type to emoji/icon mapping
@@ -53,18 +87,37 @@ def _format_task_event(self, event: TaskEvent) -> str:
5387
"task_completed": "✅",
5488
"task_failed": "❌",
5589
"task_retrying": "🔄",
90+
"task_reenqueued": "📤",
91+
"task_enqueued": "📥",
92+
"task_cancelled": "🚫",
5693
}
5794

5895
icon = event_icons.get(event.event_type.value, "📋")
5996
event_name = event.event_type.value.replace("_", " ").title()
6097

61-
return (
98+
# Base message with event type and task info
99+
base_msg = (
62100
f"{icon} [bold cyan]{event_name}[/bold cyan] "
63101
f"[dim]task=[/dim][yellow]{event.task_id[:8]}[/yellow] "
64-
f"[dim]queue=[/dim][magenta]{event.queue}[/magenta] "
65-
f"[dim]worker=[/dim][blue]{event.worker_id}[/blue]"
102+
f"[dim]name=[/dim][bold magenta]{event.task_name}[/bold magenta] "
103+
f"[dim]queue=[/dim][magenta]{event.queue}[/magenta]"
66104
)
67105

106+
# Add attempt info if > 1
107+
if event.attempt > 1:
108+
base_msg += f" [dim]attempt=[/dim][orange1]{event.attempt}[/orange1]"
109+
110+
# Add duration for completed/failed tasks
111+
if event.duration_ms is not None:
112+
duration_str = self._format_duration(event.duration_ms)
113+
base_msg += f" [dim]duration=[/dim]{duration_str}"
114+
115+
# Add error info for failed tasks
116+
if event.error and event.event_type.value == "task_failed":
117+
base_msg += f"\n [dim]└─[/dim] [red]Error:[/red] {event.error}"
118+
119+
return base_msg
120+
68121
def _format_worker_event(self, event: WorkerEvent) -> str:
69122
"""Format a worker event with colors and icons."""
70123
# Event type to emoji/icon mapping
@@ -77,8 +130,36 @@ def _format_worker_event(self, event: WorkerEvent) -> str:
77130
icon = event_icons.get(event.event_type.value, "⚙️")
78131
event_name = event.event_type.value.replace("_", " ").title()
79132

133+
# Special formatting for worker_online - make it stand out
134+
if event.event_type.value == "worker_online":
135+
# Create a simple but elegant startup message
136+
queues_str = ", ".join(event.queues)
137+
return (
138+
f"{icon} [bold green]{event_name}[/bold green] "
139+
f"[dim]worker=[/dim][bold blue]{event.worker_id}[/bold blue] "
140+
f"[dim]queues=[/dim][cyan]\\[{queues_str}][/cyan] "
141+
f"[dim]hostname=[/dim][dim]{event.hostname}[/dim]"
142+
)
143+
144+
# Special formatting for worker_offline
145+
if event.event_type.value == "worker_offline":
146+
uptime_str = ""
147+
if event.uptime_seconds:
148+
hours = event.uptime_seconds // 3600
149+
minutes = (event.uptime_seconds % 3600) // 60
150+
seconds = event.uptime_seconds % 60
151+
uptime_str = f" [dim]uptime=[/dim][cyan]{hours}h {minutes}m {seconds}s[/cyan]"
152+
153+
return (
154+
f"{icon} [bold red]{event_name}[/bold red] "
155+
f"[dim]worker=[/dim][blue]{event.worker_id}[/blue] "
156+
f"[dim]processed=[/dim][green]{event.processed}[/green]"
157+
f"{uptime_str}"
158+
)
159+
160+
# Standard heartbeat formatting
80161
return (
81-
f"{icon} [bold green]{event_name}[/bold green] "
162+
f"{icon} [dim]{event_name}[/dim] "
82163
f"[dim]worker=[/dim][blue]{event.worker_id}[/blue] "
83164
f"[dim]active=[/dim][cyan]{event.active}[/cyan] "
84165
f"[dim]processed=[/dim][green]{event.processed}[/green]"

tests/unit/core/test_worker.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -250,12 +250,11 @@ async def test_start_logs_info_message(self) -> None:
250250
except asyncio.CancelledError:
251251
pass
252252

253-
# Assert
253+
# Assert - Worker logs uvloop availability messages
254254
mock_logger.info.assert_called()
255-
# Check that the log message includes queue and concurrency info
256255
log_calls = [str(call) for call in mock_logger.info.call_args_list]
257-
assert any("queues=" in str(call) for call in log_calls)
258-
assert any("concurrency=" in str(call) for call in log_calls)
256+
# Check that we log something about uvloop or event loop
257+
assert any("event loop" in str(call).lower() for call in log_calls)
259258

260259

261260
@mark.unit

tests/unit/monitoring/test_emitters.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ async def test_emit_task_event_logs(self, sample_task_event: TaskEvent, caplog)
6060
assert "🚀" in caplog.text # Task started icon
6161
assert "Task Started" in caplog.text # Formatted event name
6262
assert "test-tas" in caplog.text # Truncated task ID (first 8 chars)
63+
assert "TestTask" in caplog.text # Task name
6364
assert "default" in caplog.text # Queue name
64-
assert "worker-abc123" in caplog.text # Worker ID
6565

6666
@mark.asyncio
6767
async def test_emit_worker_event_logs(self, sample_worker_event: WorkerEvent, caplog) -> None:
@@ -76,8 +76,8 @@ async def test_emit_worker_event_logs(self, sample_worker_event: WorkerEvent, ca
7676
assert "🟢" in caplog.text # Worker online icon
7777
assert "Worker Online" in caplog.text # Formatted event name
7878
assert "worker-abc123" in caplog.text # Worker ID
79-
assert "5" in caplog.text # Active tasks
80-
assert "100" in caplog.text # Processed count
79+
assert "default" in caplog.text # Queue names
80+
assert "test-host" in caplog.text # Hostname
8181

8282
@mark.asyncio
8383
async def test_close_is_noop(self) -> None:

0 commit comments

Comments
 (0)