✅ Do:
- Keep tasks small and focused (single responsibility principle)
- Make tasks idempotent when possible (safe to run multiple times with same result)
⚠️ Configurevisibility_timeoutproperly - See Configuration - Visibility Timeout Warning to prevent duplicate processing- Use timeouts for long-running tasks to prevent resource exhaustion
- Implement custom
failed()handlers for cleanup, logging, and alerting - Use
should_retry()for intelligent retry logic based on exception type - Pass ORM models directly as parameters - they're automatically serialized as lightweight references and re-fetched with fresh data when the task executes (Supported ORMs: SQLAlchemy, Django ORM, Tortoise ORM)
- Use type hints on task parameters for better IDE support and documentation
- Name tasks descriptively (class name or function name should explain purpose)
- Use
correlation_idfor distributed tracing and tracking related tasks across systems - For async CPU-bound tasks (
AsyncProcessTask), initialize warm event loops in production for better performance (see Configuration docs)
❌ Don't:
- Include blocking I/O in async tasks (use
SyncTaskwith thread pool for sync I/O, orSyncProcessTaskfor CPU-bound work) - Share mutable state between tasks (each task execution should be isolated)
- Perform network calls without timeouts (always use
timeoutparameter) - Store large objects in task parameters (serialize references instead, e.g., database IDs)
- Use reserved parameter names (
config,run,execute,dispatch,failed,should_retry,on_queue,delay,retry_after,max_attempts,timeout) - Start parameter names with underscore (reserved for internal use)
- Create new database connections in subprocesses without using proper ORM patterns (see ORM Integrations - Best Practices for SQLAlchemy/Django/Tortoise best practices)
✅ Do:
- Use separate queues for different priorities (high/default/low)
- Isolate slow tasks in dedicated queues
- Group related tasks by queue (emails, reports, notifications)
- Consider worker capacity when designing queues
- Use descriptive queue names
Example Worker Configuration:
# Configure in .env file first:
# ASYNCTASQ_DRIVER=redis
# ASYNCTASQ_REDIS_URL=redis://localhost:6379
# Worker 1: Critical tasks
uv run asynctasq worker --queues critical --concurrency 20
# Worker 2: Normal tasks
uv run asynctasq worker --queues default --concurrency 10
# Worker 3: Background tasks
uv run asynctasq worker --queues low-priority,batch --concurrency 5For complete worker deployment patterns, CLI options, and production deployment strategies, see Running Workers.
✅ Do:
- Log errors comprehensively in
failed()method - Use retry limits to prevent infinite loops
- Monitor dead-letter queues regularly
- Implement alerting for critical failures
- Add context to exception messages
class ProcessPayment(AsyncTask):
async def failed(self, exception: Exception) -> None:
# Log with context (ensure `logger` is defined/imported in your module)
logger.error(
f"Payment failed for user {self.user_id}",
extra={
"task_id": self._task_id,
"current_attempt": self._current_attempt,
"user_id": self.user_id,
"amount": self.amount,
},
exc_info=exception,
)
# Alert on critical failures
await notify_admin(exception)✅ Do:
- Tune worker concurrency based on task characteristics
- I/O-bound tasks: High concurrency (20-50)
- CPU-bound tasks: Low concurrency (number of CPU cores)
- Use connection pooling (configured automatically)
- Monitor queue sizes and adjust worker count accordingly
- Consider task batching for high-volume operations
- Prefer Redis for general use or PostgreSQL/MySQL for ACID guarantees. See Queue Drivers - Driver Comparison for complete comparison.
✅ Do:
- Use Redis for high-throughput or PostgreSQL/MySQL for ACID guarantees in production
⚠️ Configure visibility_timeout properly - Default is 3600s (1 hour). Set to (expected_duration × 2) + buffer for each task to prevent duplicate processing on crashes.- Configure proper retry delays to avoid overwhelming systems during outages (exponential backoff recommended)
- Set up monitoring and alerting for queue sizes, worker health, failed tasks, and retry rates
- Use environment variables for configuration (never hardcode credentials)
- Deploy multiple workers for high availability and load distribution across queues
- Use process managers (systemd, supervisor, Kubernetes) for automatic worker restarts
- Monitor dead-letter queues to catch permanently failed tasks and trigger alerts
- Set appropriate timeouts to prevent tasks from hanging indefinitely (use
timeoutin TaskConfig) - Test thoroughly before deploying to production (unit tests + integration tests)
- Use structured logging with context (task_id, worker_id, queue_name, current_attempt)
- Enable event streaming (Redis Pub/Sub) for real-time monitoring and observability
- Configure process pools for CPU-bound tasks (use
ProcessPoolConfigwithsizeandmax_tasks_per_childoptions) - Set task retention policy (
keep_completed_tasks=Falseby default to save memory)
Example Production Setup:
from asynctasq import init, RedisConfig, TaskDefaultsConfig, EventsConfig, ProcessPoolConfig, Worker, DriverFactory, Config
# Configuration - See configuration.md for complete options
init() # Loads from .env file (recommended) or environment variables
# See configuration.md and queue-drivers.md for driver config detailsDeployment Recommendations:
For complete worker deployment patterns including systemd, Docker, Kubernetes, and other process managers, see Running Workers.