Skip to content

Commit e7423cc

Browse files
authored
Merge pull request #26 from r2luna/issue-18
issue 18
2 parents e1ebc81 + 8ebefb1 commit e7423cc

File tree

9 files changed

+365
-4
lines changed

9 files changed

+365
-4
lines changed

README.md

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,171 @@ class SendWelcomeNotifications extends Task implements ShouldQueue
150150
}
151151
```
152152

153+
#### Setting a Specific Queue with `#[OnQueue]`
154+
155+
> [!WARNING]
156+
> Do **not** declare `public string $queue = 'my-queue'` on a Task — this causes a PHP fatal error because Laravel's `Queueable` trait already declares `$queue` without a type hint.
157+
158+
Use the `#[OnQueue('queue-name')]` attribute to assign a specific queue to a Task or Process:
159+
160+
```php
161+
use Brain\Attributes\OnQueue;
162+
use Brain\Task;
163+
use Illuminate\Contracts\Queue\ShouldQueue;
164+
165+
#[OnQueue('emails')]
166+
class SendWelcomeNotifications extends Task implements ShouldQueue
167+
{
168+
public function handle(): self
169+
{
170+
// This task will run on the "emails" queue
171+
return $this;
172+
}
173+
}
174+
```
175+
176+
When applied to a **Process**, the attribute does two things:
177+
178+
1. The Process itself is dispatched to that queue (if it implements `ShouldQueue`)
179+
2. All queued child tasks **inherit** the Process queue — unless the task defines its own `#[OnQueue]`
180+
181+
```php
182+
use Brain\Attributes\OnQueue;
183+
use Brain\Process;
184+
use Illuminate\Contracts\Queue\ShouldQueue;
185+
186+
#[OnQueue('strava')]
187+
class SyncActivitiesProcess extends Process implements ShouldQueue
188+
{
189+
protected array $tasks = [
190+
FetchActivities::class, // ShouldQueue → runs on "strava" (inherited)
191+
SaveActivities::class, // sync task → unaffected
192+
NotifyUser::class, // has #[OnQueue('emails')] → runs on "emails" (own queue wins)
193+
];
194+
}
195+
```
196+
197+
#### Queue Execution Flows
198+
199+
Below are the three most common queue configurations.
200+
201+
**1. Process + all Tasks on the same queue**
202+
203+
The process and every queued task run on `strava`:
204+
205+
```mermaid
206+
flowchart LR
207+
D((Dispatch)) --> Q[strava queue]
208+
209+
subgraph Q[strava queue]
210+
direction LR
211+
P[SyncActivitiesProcess] --> T1[FetchActivities]
212+
T1 --> T2[TransformData]
213+
T2 --> T3[SaveActivities]
214+
end
215+
216+
style Q fill:#1a1a2e,stroke:#e94560,color:#eee
217+
style P fill:#0f3460,stroke:#e94560,color:#eee
218+
style T1 fill:#16213e,stroke:#0f3460,color:#eee
219+
style T2 fill:#16213e,stroke:#0f3460,color:#eee
220+
style T3 fill:#16213e,stroke:#0f3460,color:#eee
221+
```
222+
223+
```php
224+
#[OnQueue('strava')]
225+
class SyncActivitiesProcess extends Process implements ShouldQueue
226+
{
227+
protected array $tasks = [
228+
FetchActivities::class, // implements ShouldQueue
229+
TransformData::class, // implements ShouldQueue
230+
SaveActivities::class, // implements ShouldQueue
231+
];
232+
}
233+
```
234+
235+
**2. Process on a queue, Tasks run synchronously inside it**
236+
237+
The process is queued, but internally its tasks run one after another in the same job:
238+
239+
```mermaid
240+
flowchart LR
241+
D((Dispatch)) --> Q
242+
243+
subgraph Q[strava queue]
244+
subgraph P[SyncActivitiesProcess]
245+
direction LR
246+
T1[FetchActivities] --> T2[TransformData] --> T3[SaveActivities]
247+
end
248+
end
249+
250+
style Q fill:#1a1a2e,stroke:#e94560,color:#eee
251+
style P fill:#0f3460,stroke:#e94560,color:#eee
252+
style T1 fill:#16213e,stroke:#0f3460,color:#eee
253+
style T2 fill:#16213e,stroke:#0f3460,color:#eee
254+
style T3 fill:#16213e,stroke:#0f3460,color:#eee
255+
```
256+
257+
```php
258+
#[OnQueue('strava')]
259+
class SyncActivitiesProcess extends Process implements ShouldQueue
260+
{
261+
protected array $tasks = [
262+
FetchActivities::class, // sync (no ShouldQueue)
263+
TransformData::class, // sync
264+
SaveActivities::class, // sync
265+
];
266+
}
267+
```
268+
269+
**3. Process on a queue, mixed sync and queued Tasks**
270+
271+
The process is queued. Some tasks run synchronously inside the process job, others are dispatched to their own queue jobs:
272+
273+
```mermaid
274+
flowchart LR
275+
D((Dispatch)) --> Q
276+
277+
subgraph Q[strava queue]
278+
subgraph P[SyncActivitiesProcess]
279+
direction LR
280+
T1[FetchActivities\nsync] --> T2[TransformData\nsync]
281+
end
282+
T2 --> T3[SaveActivities\nqueued]
283+
T3 --> T4[NotifyUser\nqueued]
284+
end
285+
286+
T4 -.-> E[emails queue]
287+
288+
subgraph E[emails queue]
289+
T4real[NotifyUser]
290+
end
291+
292+
style Q fill:#1a1a2e,stroke:#e94560,color:#eee
293+
style E fill:#1a1a2e,stroke:#53a653,color:#eee
294+
style P fill:#0f3460,stroke:#e94560,color:#eee
295+
style T1 fill:#16213e,stroke:#0f3460,color:#eee
296+
style T2 fill:#16213e,stroke:#0f3460,color:#eee
297+
style T3 fill:#16213e,stroke:#0f3460,color:#eee
298+
style T4 fill:#16213e,stroke:#0f3460,color:#eee
299+
style T4real fill:#16213e,stroke:#53a653,color:#eee
300+
```
301+
302+
```php
303+
#[OnQueue('strava')]
304+
class SyncActivitiesProcess extends Process implements ShouldQueue
305+
{
306+
protected array $tasks = [
307+
FetchActivities::class, // sync — runs inside the process job
308+
TransformData::class, // sync — runs inside the process job
309+
SaveActivities::class, // implements ShouldQueue → dispatched to "strava"
310+
NotifyUser::class, // implements ShouldQueue + #[OnQueue('emails')] → dispatched to "emails"
311+
];
312+
}
313+
```
314+
315+
> [!TIP]
316+
> A task's own `#[OnQueue]` always takes precedence over the Process-level queue.
317+
153318
#### Conditional Run Tasks in a Process
154319

155320
You can use a protected function `runIf()` to conditionally run a task.

src/Attributes/OnQueue.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Brain\Attributes;
6+
7+
use Attribute;
8+
9+
#[Attribute(Attribute::TARGET_CLASS)]
10+
class OnQueue
11+
{
12+
public function __construct(public string $queue) {}
13+
}

src/Console/BrainMap.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace Brain\Console;
66

7+
use Brain\Attributes\OnQueue;
78
use Brain\Process;
89
use Brain\Task;
910
use Exception;
@@ -159,9 +160,13 @@ private function loadProcessesFor(string $domainPath): array
159160
$chainValue = $chainProperty->getValue(new $reflection->name([]));
160161
$value = $value->getPathname();
161162

163+
$onQueueAttr = $reflection->getAttributes(OnQueue::class);
164+
$queueName = $onQueueAttr !== [] ? $onQueueAttr[0]->newInstance()->queue : null;
165+
162166
return [
163167
'name' => basename($value, '.php'),
164168
'chain' => $chainValue,
169+
'onQueue' => $queueName,
165170
'tasks' => $this->getProcessesTasks($reflection),
166171
];
167172
})
@@ -211,10 +216,14 @@ private function getTask(SplFileInfo|string $task): array
211216
$reflection = $this->getReflectionClass($task);
212217
$isProcess = $reflection->isSubclassOf(Process::class);
213218

219+
$onQueueAttr = $reflection->getAttributes(OnQueue::class);
220+
$queueName = $onQueueAttr !== [] ? $onQueueAttr[0]->newInstance()->queue : null;
221+
214222
$data = [
215223
'name' => $reflection->getShortName(),
216224
'fullName' => $reflection->name,
217225
'queue' => $reflection->implementsInterface(ShouldQueue::class),
226+
'onQueue' => $queueName,
218227
'type' => $isProcess ? 'process' : 'task',
219228
'properties' => $this->getPropertiesFor($reflection),
220229
];

src/Process.php

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44

55
namespace Brain;
66

7+
use Brain\Attributes\OnQueue;
78
use Brain\Processes\Events\Error;
89
use Brain\Processes\Events\Processed;
910
use Brain\Processes\Events\Processing;
1011
use Brain\Tasks\Events\Cancelled;
1112
use Brain\Tasks\Events\Error as TasksError;
1213
use Brain\Tasks\Events\Skipped;
1314
use Exception;
15+
use Illuminate\Bus\Queueable;
1416
use Illuminate\Contracts\Queue\ShouldQueue;
1517
use Illuminate\Foundation\Bus\Dispatchable;
1618
use Illuminate\Support\Facades\Bus;
@@ -29,6 +31,7 @@
2931
class Process
3032
{
3133
use Dispatchable;
34+
use Queueable;
3235

3336
/**
3437
* When running the process we will assign am uuid
@@ -66,6 +69,13 @@ public function __construct(
6669
$this->name = (new ReflectionClass($this))->getName();
6770

6871
Context::add('process', [$this->name, $this->uuid]);
72+
73+
$onQueue = (new ReflectionClass(static::class))
74+
->getAttributes(OnQueue::class);
75+
76+
if ($onQueue !== []) {
77+
$this->onQueue($onQueue[0]->newInstance()->queue);
78+
}
6979
}
7080

7181
/**
@@ -140,8 +150,13 @@ public function handle(): object|array|null
140150
*/
141151
private function runInChain(?object $payload = null): ?object
142152
{
143-
Bus::chain($this->getChainedTasks())
144-
->dispatch();
153+
$chain = Bus::chain($this->getChainedTasks());
154+
155+
if (($queue = $this->resolveQueue()) !== null && ($queue = $this->resolveQueue()) !== '' && ($queue = $this->resolveQueue()) !== '0') {
156+
$chain->onQueue($queue);
157+
}
158+
159+
$chain->dispatch();
145160

146161
return $payload;
147162
}
@@ -194,7 +209,14 @@ private function run(array|object|null $payload): ?object
194209
}
195210

196211
if ($reflectionClass->implementsInterface(ShouldQueue::class)) {
197-
$task::dispatch($payload);
212+
$processQueue = $this->resolveQueue();
213+
$instance = new $task($payload);
214+
215+
if ($instance->queue === null && $processQueue !== null) {
216+
$instance->onQueue($processQueue);
217+
}
218+
219+
dispatch($instance);
198220

199221
continue;
200222
}
@@ -242,6 +264,21 @@ private function run(array|object|null $payload): ?object
242264
return $payload;
243265
}
244266

267+
/**
268+
* Resolve the queue name from the #[OnQueue] attribute on this Process class.
269+
*/
270+
private function resolveQueue(): ?string
271+
{
272+
$attributes = (new ReflectionClass(static::class))
273+
->getAttributes(OnQueue::class);
274+
275+
if ($attributes === []) {
276+
return null;
277+
}
278+
279+
return $attributes[0]->newInstance()->queue;
280+
}
281+
245282
/**
246283
* Fire Event for the Listeners save all the info
247284
* in the database, and we track what is happening to

src/Task.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace Brain;
66

7+
use Brain\Attributes\OnQueue;
78
use Brain\Exceptions\InvalidPayload;
89
use Brain\Tasks\Events\Processed;
910
use Brain\Tasks\Events\Processing;
@@ -57,6 +58,13 @@ public function __construct(
5758
if ($runIn = $this->runIn()) {
5859
$this->delay($runIn);
5960
}
61+
62+
$onQueue = (new ReflectionClass(static::class))
63+
->getAttributes(OnQueue::class);
64+
65+
if ($onQueue !== []) {
66+
$this->onQueue($onQueue[0]->newInstance()->queue);
67+
}
6068
}
6169

6270
/**

0 commit comments

Comments
 (0)