This guide covers techniques for optimizing queue performance and throughput.
The optimal number of workers depends on the workload type:
CPU-bound tasks (image processing, data transformation, encryption):
- Start with: number of workers ≈ number of CPU cores
- Example: 4-core server → 4 workers
I/O-bound tasks (API calls, database queries, file operations):
- Can run more workers than CPU cores
- Start with: number of workers ≈ 2-4× number of CPU cores
- Example: 4-core server → 8-16 workers
Mixed workload:
- Separate CPU-bound and I/O-bound tasks into different queue names
- Run different worker counts for each queue
Finding the right number:
- Start with the formula above
- Monitor CPU usage, memory usage, and throughput
- Gradually increase worker count
- Stop when throughput plateaus or system resources are saturated
See Workers for more details on running workers.
For production-ready examples of running multiple workers under systemd or Supervisor (including group management, autostart, logs, and reload), see Running workers in production (systemd and Supervisor).
Set memorySoftLimit to prevent workers from accumulating memory leaks. See Loops for details on loop configuration:
use Yiisoft\Queue\Cli\SignalLoop;
use Yiisoft\Queue\Cli\SimpleLoop;
return [
SignalLoop::class => [
'__construct()' => [
'memorySoftLimit' => 256 * 1024 * 1024, // 256MB
],
],
SimpleLoop::class => [
'__construct()' => [
'memorySoftLimit' => 256 * 1024 * 1024, // 256MB
],
],
];When a worker reaches the limit:
- It finishes processing the current message
- It exits gracefully
- The process manager restarts it with fresh memory
Choosing the limit:
- Monitor actual memory usage of your workers
- Set limit 20-30% above typical usage
- Leave headroom for memory spikes
- Consider your server's total memory
See Best practices for general message handler design guidelines.
Clear large objects after use:
public function handle(MessageInterface $message): void
{
$largeData = $this->loadLargeDataset($message->getData()['id']);
$this->processData($largeData);
unset($largeData); // Free memory immediately
}Avoid static caches:
// Bad - accumulates in memory
class Handler implements MessageHandlerInterface
{
private static array $cache = [];
public function handle(MessageInterface $message): void
{
self::$cache[$message->getData()['id']] = $this->load(...);
// Cache grows indefinitely
}
}
// Good - use external cache
class Handler implements MessageHandlerInterface
{
public function __construct(private CacheInterface $cache) {}
public function handle(MessageInterface $message): void
{
$this->cache->set($message->getData()['id'], $this->load(...));
}
}Use different queue names for different priority levels. See Queue names for details on configuring multiple queues:
return [
'yiisoft/queue' => [
'queues' => [
'critical' => AmqpAdapter::class,
'normal' => AmqpAdapter::class,
'low' => AmqpAdapter::class,
],
],
];Run more workers for high-priority queues:
# 8 workers for critical tasks
systemctl start yii-queue-critical@{1..8}
# 4 workers for normal tasks
systemctl start yii-queue-normal@{1..4}
# 2 workers for low-priority tasks
systemctl start yii-queue-low@{1..2}Create separate queues for different workload characteristics:
return [
'yiisoft/queue' => [
'queues' => [
'fast' => AmqpAdapter::class, // Quick tasks (< 1s)
'slow' => AmqpAdapter::class, // Long tasks (> 10s)
'cpu-bound' => AmqpAdapter::class, // CPU-intensive
'io-bound' => AmqpAdapter::class, // I/O-intensive
],
],
];Separating queues by workload type prevents slow tasks from blocking fast tasks.
See Middleware pipelines for details on middleware architecture.
Each middleware adds processing time. Keep the pipeline lean:
return [
'yiisoft/queue' => [
'middlewares-consume' => [
// Only essential middlewares
MetricsMiddleware::class,
],
],
];See Envelopes for details on IdEnvelope.
Bad:
public function processConsume(ConsumeRequest $request, ConsumeHandlerInterface $handler): ConsumeRequest
{
// Heavy operation on every message
$this->logger->debug('Full message dump', [
'message' => json_encode($request->getMessage(), JSON_PRETTY_PRINT),
'backtrace' => debug_backtrace(),
]);
return $handler->handleConsume($request);
}Good:
public function processConsume(ConsumeRequest $request, ConsumeHandlerInterface $handler): ConsumeRequest
{
// Lightweight logging
$this->logger->debug('Processing message', [
'id' => $request->getMessage()->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? null,
]);
return $handler->handleConsume($request);
}Instead of sending many small messages, batch them when possible:
Bad (1000 messages):
foreach ($userIds as $userId) {
$queue->push(new Message(SendEmailHandler::class, [
'userId' => $userId,
]));
}Good (1 message):
$queue->push(new Message(SendBulkEmailHandler::class, [
'userIds' => $userIds, // Process in batches
]));In the handler:
public function handle(MessageInterface $message): void
{
$userIds = $message->getData()['userIds'];
// Process in chunks to avoid memory issues
foreach (array_chunk($userIds, 100) as $chunk) {
$this->emailService->sendBulk($chunk);
}
}While envelope stacking is optimized, deep nesting still has overhead:
// Avoid excessive wrapping
$message = new Message(...);
$message = new Envelope1($message);
$message = new Envelope2($message);
$message = new Envelope3($message);
$message = new Envelope4($message);
$message = new Envelope5($message); // Too many layersKeep envelope depth reasonable (typically 2-3 layers).
For database-heavy message handlers, use an external connection pooler such as PgBouncer (PostgreSQL) or ProxySQL (MySQL/MariaDB). These sit between your workers and the database server, maintaining a pool of live connections and reusing them across requests — reducing connection overhead without requiring any application-level changes.
Combine multiple operations into fewer queries:
Bad:
public function handle(MessageInterface $message): void
{
foreach ($message->getData()['items'] as $item) {
$this->db->insert('items', $item); // N queries
}
}Good:
public function handle(MessageInterface $message): void
{
$this->db->batchInsert('items', $message->getData()['items']); // 1 query
}If your message handler only reads data, use read replicas:
final class GenerateReportHandler implements MessageHandlerInterface
{
public function __construct(
private ConnectionInterface $readDb, // Read replica
private ReportGenerator $generator,
) {}
public function handle(MessageInterface $message): void
{
$data = $this->readDb->query('SELECT ...'); // From replica
$this->generator->generate($data);
}
}See Yii Debug integration for built-in debugging tools.
Monitor these metrics to identify bottlenecks:
Queue metrics:
- Queue depth (messages waiting)
- Processing rate (messages/second)
- Average processing time
- Failure rate (see Error handling)
Worker metrics:
- CPU usage per worker
- Memory usage per worker
- Number of active workers
System metrics:
- Overall CPU usage
- Overall memory usage
- Network I/O
- Disk I/O
Profile slow message handlers to find bottlenecks:
public function handle(MessageInterface $message): void
{
$profiler = new Profiler();
$profiler->start('database');
$data = $this->loadData($message->getData()['id']);
$profiler->stop('database');
$profiler->start('processing');
$result = $this->processData($data);
$profiler->stop('processing');
$profiler->start('storage');
$this->saveResult($result);
$profiler->stop('storage');
$this->logger->debug('Handler profile', $profiler->getResults());
}Test with realistic message volumes and data:
// Load test script
$queue = $container->get(QueueInterface::class);
$start = microtime(true);
$count = 10000;
for ($i = 0; $i < $count; $i++) {
$queue->push(new Message(TestHandler::class, [
'id' => $i,
'data' => $this->generateRealisticData(),
]));
}
$duration = microtime(true) - $start;
echo "Pushed $count messages in $duration seconds\n";
echo "Rate: " . ($count / $duration) . " messages/second\n";Run load tests while monitoring:
- Worker CPU and memory usage
- Queue depth growth
- Processing latency
- Error rates
Adjust configuration based on observations.
Symptoms: Messages accumulate faster than they're processed.
Solutions:
- Add more workers (see Workers)
- Optimize slow message handlers
- Increase prefetch count (if using AMQP)
- Separate slow and fast tasks into different queues (see Queue names)
Symptoms: Workers consume excessive memory.
Solutions:
- Lower
memorySoftLimit(set a smaller numeric value) to restart workers more frequently by hitting the memory threshold sooner (see Loops) - Fix memory leaks in message handlers (see Best practices)
- Reduce prefetch count
- Process large datasets in chunks
Symptoms: CPU/memory underutilized, but throughput is low.
Solutions:
- Increase worker count (see Workers)
- Increase prefetch count
- Reduce middleware overhead (see Middleware pipelines)
- Check for network bottlenecks
- Optimize database queries in handlers
Symptoms: Some workers are busy while others are idle.
Solutions:
- Lower prefetch count for better distribution
- Use shorter message processing times
- Check broker configuration (e.g., RabbitMQ queue settings)