|
| 1 | +# Real-Time Progress with Mercure |
| 2 | + |
| 3 | +This guide explains how to display real-time queue job progress in the browser using [Mercure](https://mercure.rocks/) and Server-Sent Events (SSE). This is particularly useful with [FrankenPHP](https://frankenphp.dev/) which has Mercure built-in. |
| 4 | + |
| 5 | +## Overview |
| 6 | + |
| 7 | +Instead of polling the server or requiring page refreshes, the queue worker pushes updates directly to the browser: |
| 8 | + |
| 9 | +1. User triggers a background job |
| 10 | +2. Job runs in queue worker process |
| 11 | +3. Each progress step publishes a Mercure update |
| 12 | +4. Browser receives updates instantly via EventSource |
| 13 | + |
| 14 | +## Requirements |
| 15 | + |
| 16 | +- [josbeir/cakephp-mercure](https://github.com/josbeir/cakephp-mercure) plugin |
| 17 | +- Mercure hub (standalone or built into FrankenPHP) |
| 18 | + |
| 19 | +## Setup |
| 20 | + |
| 21 | +### 1. Install the Mercure Plugin |
| 22 | + |
| 23 | +```bash |
| 24 | +composer require josbeir/cakephp-mercure |
| 25 | +``` |
| 26 | + |
| 27 | +Load the plugin: |
| 28 | + |
| 29 | +```php |
| 30 | +// config/plugins.php |
| 31 | +return [ |
| 32 | + 'Mercure' => [], |
| 33 | + // ... |
| 34 | +]; |
| 35 | +``` |
| 36 | + |
| 37 | +### 2. Configure Mercure |
| 38 | + |
| 39 | +Create `config/app_mercure.php`: |
| 40 | + |
| 41 | +```php |
| 42 | +<?php |
| 43 | +use Cake\Http\Cookie\CookieInterface; |
| 44 | + |
| 45 | +return [ |
| 46 | + 'Mercure' => [ |
| 47 | + // Internal URL for server-side publishing (inside container/server) |
| 48 | + 'url' => 'http://localhost/.well-known/mercure', |
| 49 | + |
| 50 | + // External URL for browser EventSource connections |
| 51 | + 'public_url' => 'https://your-domain.com/.well-known/mercure', |
| 52 | + |
| 53 | + 'jwt' => [ |
| 54 | + 'secret' => 'your-mercure-jwt-secret', |
| 55 | + 'algorithm' => 'HS256', |
| 56 | + 'publish' => ['*'], |
| 57 | + 'subscribe' => [], |
| 58 | + ], |
| 59 | + |
| 60 | + 'cookie' => [ |
| 61 | + 'name' => 'mercureAuthorization', |
| 62 | + 'secure' => true, |
| 63 | + 'httponly' => true, |
| 64 | + 'samesite' => CookieInterface::SAMESITE_LAX, |
| 65 | + ], |
| 66 | + ], |
| 67 | +]; |
| 68 | +``` |
| 69 | + |
| 70 | +Load it in `config/bootstrap.php`: |
| 71 | + |
| 72 | +```php |
| 73 | +Configure::load('app_mercure'); |
| 74 | +``` |
| 75 | + |
| 76 | +### 3. FrankenPHP with Mercure |
| 77 | + |
| 78 | +If using FrankenPHP, add Mercure to your Caddyfile or environment: |
| 79 | + |
| 80 | +``` |
| 81 | +CADDY_SERVER_EXTRA_DIRECTIVES= |
| 82 | +mercure { |
| 83 | + publisher_jwt your-mercure-jwt-secret |
| 84 | + subscriber_jwt your-mercure-jwt-secret |
| 85 | + anonymous |
| 86 | + cors_origins * |
| 87 | +} |
| 88 | +``` |
| 89 | + |
| 90 | +For DDEV, create `.ddev/docker-compose.mercure.yaml`: |
| 91 | + |
| 92 | +```yaml |
| 93 | +services: |
| 94 | + web: |
| 95 | + environment: |
| 96 | + - |- |
| 97 | + CADDY_SERVER_EXTRA_DIRECTIVES= |
| 98 | + mercure { |
| 99 | + publisher_jwt your-mercure-jwt-secret |
| 100 | + subscriber_jwt your-mercure-jwt-secret |
| 101 | + anonymous |
| 102 | + cors_origins * |
| 103 | + } |
| 104 | +``` |
| 105 | +
|
| 106 | +## Creating a Queue Task with Mercure Updates |
| 107 | +
|
| 108 | +```php |
| 109 | +<?php |
| 110 | +declare(strict_types=1); |
| 111 | + |
| 112 | +namespace App\Queue\Task; |
| 113 | + |
| 114 | +use Cake\Core\Configure; |
| 115 | +use Mercure\Publisher; |
| 116 | +use Mercure\Update\JsonUpdate; |
| 117 | +use Queue\Queue\Task; |
| 118 | + |
| 119 | +class MyProgressTask extends Task { |
| 120 | + |
| 121 | + public ?int $timeout = 120; |
| 122 | + |
| 123 | + public function run(array $data, int $jobId): void { |
| 124 | + $topic = $data['topic'] ?? '/jobs/' . $jobId; |
| 125 | + $steps = 10; |
| 126 | + |
| 127 | + // Check if Mercure is configured |
| 128 | + $mercureConfigured = (bool)Configure::read('Mercure.url'); |
| 129 | + |
| 130 | + // Publish start event |
| 131 | + if ($mercureConfigured) { |
| 132 | + $this->publishUpdate($topic, [ |
| 133 | + 'status' => 'started', |
| 134 | + 'progress' => 0, |
| 135 | + 'message' => 'Job started', |
| 136 | + 'jobId' => $jobId, |
| 137 | + ]); |
| 138 | + } |
| 139 | + |
| 140 | + for ($i = 1; $i <= $steps; $i++) { |
| 141 | + // Do actual work here... |
| 142 | + sleep(1); |
| 143 | + |
| 144 | + $progress = (int)(($i / $steps) * 100); |
| 145 | + |
| 146 | + // Update queue progress (for DB tracking) |
| 147 | + $this->QueuedJobs->updateProgress($jobId, $i / $steps, "Step {$i} of {$steps}"); |
| 148 | + |
| 149 | + // Publish Mercure update (for real-time UI) |
| 150 | + if ($mercureConfigured) { |
| 151 | + $this->publishUpdate($topic, [ |
| 152 | + 'status' => 'progress', |
| 153 | + 'progress' => $progress, |
| 154 | + 'step' => $i, |
| 155 | + 'totalSteps' => $steps, |
| 156 | + 'message' => "Processing step {$i} of {$steps}", |
| 157 | + 'jobId' => $jobId, |
| 158 | + ]); |
| 159 | + } |
| 160 | + } |
| 161 | + |
| 162 | + // Publish completion event |
| 163 | + if ($mercureConfigured) { |
| 164 | + $this->publishUpdate($topic, [ |
| 165 | + 'status' => 'completed', |
| 166 | + 'progress' => 100, |
| 167 | + 'message' => 'Job completed successfully!', |
| 168 | + 'jobId' => $jobId, |
| 169 | + ]); |
| 170 | + } |
| 171 | + } |
| 172 | + |
| 173 | + protected function publishUpdate(string $topic, array $data): void { |
| 174 | + try { |
| 175 | + Publisher::publish(JsonUpdate::create( |
| 176 | + topics: $topic, |
| 177 | + data: $data, |
| 178 | + )); |
| 179 | + } catch (\Exception $e) { |
| 180 | + $this->io->error('Mercure publish failed: ' . $e->getMessage()); |
| 181 | + } |
| 182 | + } |
| 183 | +} |
| 184 | +``` |
| 185 | +
|
| 186 | +## Controller |
| 187 | +
|
| 188 | +```php |
| 189 | +<?php |
| 190 | +namespace App\Controller; |
| 191 | +
|
| 192 | +use Cake\Core\Configure; |
| 193 | +
|
| 194 | +class JobsController extends AppController { |
| 195 | +
|
| 196 | + public function progress(): void { |
| 197 | + $sid = $this->request->getSession()->id(); |
| 198 | + $topic = '/jobs/user/' . $sid; |
| 199 | + |
| 200 | + $this->set('topic', $topic); |
| 201 | + $this->set('mercurePublicUrl', Configure::read('Mercure.public_url')); |
| 202 | + } |
| 203 | + |
| 204 | + public function startJob() { |
| 205 | + $this->request->allowMethod('post'); |
| 206 | + |
| 207 | + $queuedJobsTable = $this->fetchTable('Queue.QueuedJobs'); |
| 208 | + $sid = $this->request->getSession()->id(); |
| 209 | + $topic = '/jobs/user/' . $sid; |
| 210 | + |
| 211 | + $queuedJobsTable->createJob( |
| 212 | + 'MyProgress', |
| 213 | + ['topic' => $topic], |
| 214 | + ['reference' => 'user-job-' . $sid], |
| 215 | + ); |
| 216 | + |
| 217 | + $this->Flash->success('Job started!'); |
| 218 | + return $this->redirect(['action' => 'progress']); |
| 219 | + } |
| 220 | +} |
| 221 | +``` |
| 222 | + |
| 223 | +## Template with EventSource |
| 224 | + |
| 225 | +```php |
| 226 | +<?php |
| 227 | +// templates/Jobs/progress.php |
| 228 | +$topic = $topic ?? '/jobs/default'; |
| 229 | +$mercurePublicUrl = $mercurePublicUrl ?? null; |
| 230 | +?> |
| 231 | + |
| 232 | +<div id="progress-container"> |
| 233 | + <div class="progress"> |
| 234 | + <div id="progress-bar" class="progress-bar" style="width: 0%">0%</div> |
| 235 | + </div> |
| 236 | + <p id="status-message">Waiting for job...</p> |
| 237 | +</div> |
| 238 | + |
| 239 | +<?php if ($mercurePublicUrl): ?> |
| 240 | +<script> |
| 241 | +(function() { |
| 242 | + const topic = <?= json_encode($topic) ?>; |
| 243 | + const mercureUrl = <?= json_encode($mercurePublicUrl) ?>; |
| 244 | +
|
| 245 | + const url = new URL(mercureUrl); |
| 246 | + url.searchParams.append('topic', topic); |
| 247 | +
|
| 248 | + const eventSource = new EventSource(url, { withCredentials: true }); |
| 249 | +
|
| 250 | + eventSource.onmessage = function(event) { |
| 251 | + const data = JSON.parse(event.data); |
| 252 | +
|
| 253 | + document.getElementById('progress-bar').style.width = data.progress + '%'; |
| 254 | + document.getElementById('progress-bar').textContent = data.progress + '%'; |
| 255 | + document.getElementById('status-message').textContent = data.message; |
| 256 | +
|
| 257 | + if (data.status === 'completed') { |
| 258 | + document.getElementById('progress-bar').classList.add('bg-success'); |
| 259 | + } |
| 260 | + }; |
| 261 | +
|
| 262 | + eventSource.onerror = function() { |
| 263 | + console.log('Connection error, will auto-reconnect...'); |
| 264 | + }; |
| 265 | +})(); |
| 266 | +</script> |
| 267 | +<?php endif; ?> |
| 268 | +``` |
| 269 | +
|
| 270 | +## Running Workers |
| 271 | +
|
| 272 | +### Development (DDEV with FrankenPHP) |
| 273 | +
|
| 274 | +Add to `.ddev/config.frankenphp.yaml`: |
| 275 | +
|
| 276 | +```yaml |
| 277 | +web_extra_daemons: |
| 278 | + - name: "frankenphp" |
| 279 | + command: "frankenphp run --config /etc/frankenphp/Caddyfile --adapter=caddyfile" |
| 280 | + directory: /var/www/html |
| 281 | + - name: "queue-worker" |
| 282 | + command: "bash -c 'sleep 5 && DDEV_PROJECT=myproject bin/cake queue run -v'" |
| 283 | + directory: /var/www/html |
| 284 | +``` |
| 285 | +
|
| 286 | +### Production (systemd) |
| 287 | +
|
| 288 | +Create `/etc/systemd/system/myapp-queue.service`: |
| 289 | +
|
| 290 | +```ini |
| 291 | +[Unit] |
| 292 | +Description=CakePHP Queue Worker |
| 293 | +After=network.target mysql.service |
| 294 | +
|
| 295 | +[Service] |
| 296 | +Type=simple |
| 297 | +User=www-data |
| 298 | +Group=www-data |
| 299 | +WorkingDirectory=/var/www/myapp |
| 300 | +ExecStart=/usr/bin/php bin/cake queue run |
| 301 | +Restart=always |
| 302 | +RestartSec=5 |
| 303 | +Environment=APP_ENV=production |
| 304 | +
|
| 305 | +[Install] |
| 306 | +WantedBy=multi-user.target |
| 307 | +``` |
| 308 | +
|
| 309 | +Enable and start: |
| 310 | +
|
| 311 | +```bash |
| 312 | +sudo systemctl enable myapp-queue |
| 313 | +sudo systemctl start myapp-queue |
| 314 | +``` |
| 315 | +
|
| 316 | +### Production (supervisor) |
| 317 | +
|
| 318 | +Create `/etc/supervisor/conf.d/myapp-queue.conf`: |
| 319 | +
|
| 320 | +```ini |
| 321 | +[program:myapp-queue] |
| 322 | +command=/usr/bin/php bin/cake queue run |
| 323 | +directory=/var/www/myapp |
| 324 | +user=www-data |
| 325 | +autostart=true |
| 326 | +autorestart=true |
| 327 | +numprocs=2 |
| 328 | +process_name=%(program_name)s_%(process_num)02d |
| 329 | +stderr_logfile=/var/log/myapp/queue-error.log |
| 330 | +stdout_logfile=/var/log/myapp/queue.log |
| 331 | +``` |
| 332 | +
|
| 333 | +### Scaling Workers |
| 334 | +
|
| 335 | +The `maxworkers` config limits concurrent workers across all servers: |
| 336 | +
|
| 337 | +```php |
| 338 | +'Queue' => [ |
| 339 | + 'maxworkers' => 4, |
| 340 | +], |
| 341 | +``` |
| 342 | +
|
| 343 | +Scale horizontally by running workers on multiple servers - they share the same database queue and respect `maxworkers`. |
| 344 | +
|
| 345 | +For longer-running production workers, increase `workerLifetime`: |
| 346 | +
|
| 347 | +```php |
| 348 | +'Queue' => [ |
| 349 | + 'workerLifetime' => 3600, // 1 hour (0 = unlimited) |
| 350 | +], |
| 351 | +``` |
| 352 | +
|
| 353 | +## Testing |
| 354 | +
|
| 355 | +Mock the Mercure Publisher in tests to prevent HTTP requests: |
| 356 | +
|
| 357 | +```php |
| 358 | +use Mercure\Publisher; |
| 359 | +use Mercure\TestSuite\MockPublisher; |
| 360 | +
|
| 361 | +public function setUp(): void { |
| 362 | + parent::setUp(); |
| 363 | + Publisher::setInstance(new MockPublisher()); |
| 364 | +} |
| 365 | +
|
| 366 | +public function tearDown(): void { |
| 367 | + parent::tearDown(); |
| 368 | + Publisher::clear(); |
| 369 | +} |
| 370 | +``` |
| 371 | +
|
| 372 | +## See Also |
| 373 | +
|
| 374 | +- [josbeir/cakephp-mercure](https://github.com/josbeir/cakephp-mercure) - CakePHP Mercure plugin |
| 375 | +- [Mercure Protocol](https://mercure.rocks/) - Real-time protocol |
| 376 | +- [FrankenPHP](https://frankenphp.dev/) - PHP app server with built-in Mercure |
0 commit comments