Skip to content

Commit d55b59f

Browse files
authored
Merge pull request #2 from mikerockett/v4
v4
2 parents ff4fd97 + eef6c50 commit d55b59f

21 files changed

+1027
-254
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ jobs:
1313
os:
1414
- "ubuntu-latest"
1515
php:
16-
- "8.2"
1716
- "8.3"
17+
- "8.4"
1818
experimental:
1919
- false
2020

changelog.md

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
# Changelog
22

3-
## 4.0.0 - Unreleased
3+
## 4.0.0 - 2025-08-18
44

5-
### Changed
5+
### Added
66

7-
* **Minimum PHP version is now 8.2**
8-
* Removed Orchestra Testbench in favor of PestPHP
9-
* Updated nunomaduro/collision to ^8.4
7+
* `InterruptipleTapProcessor`, which implements both `InterruptipleProcessor` and `TapProcessor`
108
* Added generic typing information across the package
119

10+
### Changed
11+
12+
* **Minimum PHP version is now 8.3**
13+
* Test framework is now Pest
14+
* Updated `nunomaduro/collision` to ^8.0
15+
* Tests are now a bit more exhaustive
16+
* `TapProcessor` now requires at least one callback when instantiated.
17+
1218
## 3.0.0 - 2023-02-07
1319

1420
### Changed

composer.json

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,17 @@
3131
"Rockett\\Pipeline\\": "src"
3232
}
3333
},
34+
"autoload-dev": {
35+
"psr-4": {
36+
"Tests\\": "tests/"
37+
}
38+
},
3439
"require": {
35-
"php": "^8.2"
40+
"php": "^8.3"
3641
},
3742
"require-dev": {
38-
"nunomaduro/collision": "^8.4",
39-
"pestphp/pest": "^2.35"
43+
"nunomaduro/collision": "^8.0",
44+
"pestphp/pest": "^3.0"
4045
},
4146
"scripts": {
4247
"test": "./vendor/bin/pest"

phpunit.xml

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,30 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/10.5/phpunit.xsd" bootstrap="vendor/autoload.php" executionOrder="depends,defects" beStrictAboutOutputDuringTests="true" failOnRisky="true" failOnWarning="true" cacheDirectory=".phpunit.cache">
3-
<testsuites>
4-
<testsuite name="Pipeline Test Suite">
5-
<directory suffix="Test.php">
6-
tests
7-
</directory>
8-
</testsuite>
9-
</testsuites>
10-
<source>
11-
<include>
12-
<directory suffix=".php">
13-
./src
14-
</directory>
15-
</include>
16-
<exclude />
17-
</source>
2+
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:noNamespaceSchemaLocation="vendor/phpunit/phpunit/phpunit.xsd"
4+
bootstrap="vendor/autoload.php"
5+
colors="true"
6+
>
7+
<testsuites>
8+
<testsuite name="Feature">
9+
<directory>tests/Feature</directory>
10+
</testsuite>
11+
</testsuites>
12+
<source>
13+
<include>
14+
<directory>src</directory>
15+
</include>
16+
</source>
17+
<php>
18+
<env name="APP_ENV" value="testing"/>
19+
<env name="APP_MAINTENANCE_DRIVER" value="file"/>
20+
<env name="BCRYPT_ROUNDS" value="4"/>
21+
<env name="CACHE_STORE" value="array"/>
22+
<!-- <env name="DB_CONNECTION" value="sqlite"/> -->
23+
<!-- <env name="DB_DATABASE" value=":memory:"/> -->
24+
<env name="MAIL_MAILER" value="array"/>
25+
<env name="PULSE_ENABLED" value="false"/>
26+
<env name="QUEUE_CONNECTION" value="sync"/>
27+
<env name="SESSION_DRIVER" value="array"/>
28+
<env name="TELESCOPE_ENABLED" value="false"/>
29+
</php>
1830
</phpunit>

readme.md

Lines changed: 91 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,44 @@
1-
<!-- exclude-from-website: -->
21
# Rockett\Pipeline
32

43
![GitHub License](https://img.shields.io/github/license/mikerockett/pipeline?style=for-the-badge)
54
![Packagist Version](https://img.shields.io/packagist/v/rockett/pipeline?label=Release&style=for-the-badge)
65
![Packagist Downloads](https://img.shields.io/packagist/dm/rockett/pipeline?label=Installs&style=for-the-badge)
76
![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/mikerockett/pipeline/test.yml?label=Tests&style=for-the-badge)
8-
<!-- /exclude-from-website -->
97

10-
Built atop [League’s excellent package](https://github.com/thephpleague/pipeline), `Rockett\Pipeline` provides an implementation of the [pipeline pattern](https://en.wikipedia.org/wiki/Pipeline_(software)).
8+
Built atop [League's excellent package](https://github.com/thephpleague/pipeline), `Rockett\Pipeline` provides an implementation of the [pipeline pattern](https://en.wikipedia.org/wiki/Pipeline_(software)) with additional processors for **conditional interruption** and **stage tapping**.
9+
10+
## Requirements
11+
12+
- PHP 8.3+
13+
14+
## Installation
15+
16+
```bash
17+
composer require rockett/pipeline
18+
```
19+
20+
## Quick Start
21+
22+
```php
23+
use Rockett\Pipeline\Pipeline;
24+
25+
$pipeline = (new Pipeline)
26+
->pipe(fn($x) => $x * 2)
27+
->pipe(fn($x) => $x + 1);
28+
29+
echo $pipeline->process(10); // Outputs: 21
30+
```
31+
32+
## Table of Contents
33+
34+
- [Pipeline Pattern](#pipeline-pattern)
35+
- [Immutability](#immutability)
36+
- [Usage](#usage)
37+
- [Class-based stages](#class-based-stages)
38+
- [Re-usability](#re-usability)
39+
- [Pipeline Builders](#pipeline-builders)
40+
- [Processors](#processors)
41+
- [Handling Exceptions](#handling-exceptions)
1142

1243
## Pipeline Pattern
1344

@@ -37,7 +68,7 @@ Pipelines are implemented as immutable stage-chains, contracted by the `Pipeline
3768

3869
## Usage
3970

40-
Operations in a pipeline (stages) can accept anything from the pipeline that satisfies the `callable` type-hint. So closures and anything thats invokable will work.
71+
Operations in a pipeline (stages) can accept anything from the pipeline that satisfies the `callable` type-hint. So closures and anything that's invokable will work.
4172

4273
```php
4374
$pipeline = (new Pipeline)->pipe(static function ($traveler) {
@@ -120,68 +151,60 @@ $pipeline = $pipelineBuilder->build();
120151

121152
## Processors
122153

123-
When stages are piped through a pipeline, they are done so using a processor, which is responsible for iterating through each stage and piping it into the owning pipeline. There are three available processors:
154+
**This is where Rockett\Pipeline extends League's package** – when stages are piped through a pipeline, they are done so using a processor, which is responsible for iterating through each stage and piping it into the owning pipeline. There are four available processors:
124155

125156
* `FingersCrossedProcessor` (this is the default)
126-
* `InterruptibleProcessor`
127-
* `TapProcessor`
157+
* `InterruptibleProcessor` – Exit pipelines early based on conditions
158+
* `TapProcessor` – Execute callbacks before/after each stage (requires at least one callback)
159+
* `InterruptibleTapProcessor` – Combines both interruption and tapping (requires at least one tap callback)
128160

129-
It goes without saying that the default processor only iterates and pipes stages. It does nothing else, and there is no way to exit the pipeline without throwing an exception.
161+
The default processor only iterates and pipes stages. It does nothing else, and there is no way to exit the pipeline without throwing an exception.
130162

131163
### Exiting pipelines early
132164

133-
The `InterruptibleProcessor`, on the other hand, provides a mechanism that allows you to exit the pipeline early, if so required. This is done by way of a `callable` that is invoked at every stage as a condition to continuing the pipeline:
165+
The `InterruptibleProcessor` provides a mechanism that allows you to exit the pipeline early, if so required. This is done by way of a `callable` that is invoked at every stage as a condition to continuing the pipeline:
134166

135167
```php
136168
use Rockett\Pipeline\Processors\InterruptibleProcessor;
137169

138170
$processor = new InterruptibleProcessor(
139-
static fn ($traveler) => $traveler->somethingIsntRight()
171+
fn($traveler) => $traveler->hasError()
140172
);
141173

142174
$pipeline = (new Pipeline($processor))
143-
->pipe(new SafeStage)
144-
->pipe(new UnsafeStage)
145-
->pipe(new AnotherSafeStage);
175+
->pipe(new ValidateInput)
176+
->pipe(new ProcessData)
177+
->pipe(new SaveToDatabase);
146178

147-
$output = $pipeline->process($traveler);
179+
$output = $pipeline->process($request);
148180
```
149181

150-
In this example, the callable passed to the processor will check to see if something isn’t right and, if so, it will return `true`, causing the processor exit the pipeline and return the traveler as the output.
182+
In this example, the callable will check if the traveler has an error and, if so, it will return `true`, causing the processor to exit the pipeline early and return the current traveler as the output.
151183

152-
You can also use the `continueUnless` helper to instantiate the interruptible processor:
184+
**Helper methods:**
153185

154186
```php
187+
// Exit when condition is true
155188
$processor = InterruptibleProcessor::continueUnless(
156-
static fn ($traveler) => $traveler->somethingIsntRight()
189+
fn($traveler) => $traveler->hasError()
157190
);
158-
```
159191

160-
If you would like to reverse the condition and only continue when the callable returns true, you can use the `continueWhen` helper instead:
161-
162-
```php
192+
// Exit when condition becomes false
163193
$processor = InterruptibleProcessor::continueWhen(
164-
static fn ($traveler) => $traveler->everythingIsFine()
194+
fn($traveler) => $traveler->isValid()
165195
);
166196
```
167197

168198
### Invoking actions on each stage
169199

170-
Using the `TapProcessor`, you can invoke an action before and/or after a stage is piped through a pipeline. This can be useful if you would like to handle common side-effects outside of each stage, such as logging or broadcasting.
171-
172-
The processor takes two callables:
200+
Using the `TapProcessor`, you can invoke an action before and/or after a stage is piped through a pipeline. This is useful for cross-cutting concerns like logging, metrics, or debugging.
173201

174202
```php
175203
use Rockett\Pipeline\Processors\TapProcessor;
176204

177-
// Define and instantiate a $logger and a $broadcaster …
178-
179205
$processor = new TapProcessor(
180-
// $beforeEach, called before a stage is piped
181-
static fn ($traveler) => $logger->info('Traveller passing through pipeline:', $traveler->toArray()),
182-
183-
// $afterEach, called after a stage is piped and the output captured
184-
static fn ($traveler) => $broadcaster->broadcast($users, 'Something happened', $traveler)
206+
beforeCallback: fn($traveler) => $logger->info('Processing:', $traveler->toArray()),
207+
afterCallback: fn($traveler) => $metrics->increment('pipeline.stage.completed')
185208
);
186209

187210
$pipeline = (new Pipeline($processor))
@@ -192,32 +215,46 @@ $pipeline = (new Pipeline($processor))
192215
$output = $pipeline->process($traveler);
193216
```
194217

195-
Both of these callables are **optional**. By excluding both, the processor will act in the exact same way as the default `FingersCrossedProcessor`.
196-
197-
If you would like to pass only one callback, then you can use the helper methods:
218+
**At least one callback is required.** You can also use fluent methods:
198219

199220
```php
200-
$processor = (new TapProcessor)->beforeEach(/** callable **/); // or …
201-
$processor = (new TapProcessor)->afterEach(/** callable **/);
221+
$processor = (new TapProcessor)
222+
->beforeEach(fn($traveler) => $logger->debug('Before:', $traveler))
223+
->afterEach(fn($traveler) => $logger->debug('After:', $traveler));
202224
```
203225

204-
You can also chain them as an alternative to using the constructor:
226+
### Combining interruption and tapping
227+
228+
The `InterruptibleTapProcessor` combines both features:
205229

206230
```php
207-
$processor = (new TapProcessor)
208-
->beforeEach(/** callable **/)
209-
->afterEach(/** callable **/);
210-
```
231+
use Rockett\Pipeline\Processors\InterruptibleTapProcessor;
232+
233+
$processor = new InterruptibleTapProcessor(
234+
interruptCallback: fn($traveler) => $traveler->shouldStop(),
235+
beforeCallback: fn($traveler) => $logger->info('Processing stage'),
236+
afterCallback: fn($traveler) => $metrics->increment('stage.completed')
237+
);
211238

212-
However, it is encouraged that you use [named arguments](https://stitcher.io/blog/php-8-named-arguments):
239+
// Or using static factory methods (tap callbacks required)
240+
$processor = InterruptibleTapProcessor::continueUnless(
241+
fn($traveler) => $traveler->hasError(),
242+
beforeCallback: fn($traveler) => $logger->debug('Before stage')
243+
);
213244

214-
```php
215-
$processor = new TapProcessor(
216-
beforeEach: /** optional callable **/,
217-
afterEach: /** optional callable **/,
218-
)
245+
// Or using fluent interface
246+
$processor = InterruptibleTapProcessor::continueWhen(
247+
fn($traveler) => $traveler->isValid(),
248+
afterCallback: fn($traveler) => $logger->debug('After stage')
249+
)->beforeEach(fn($traveler) => $logger->debug('Before stage'));
219250
```
220251

252+
> [!NOTE]
253+
> This will likely become the default processor in a future release.
254+
255+
> [!TIP]
256+
> The `InterruptibleTapProcessor` is particularly useful for complex pipelines where you need both conditional logic and observability.
257+
221258
## Handling Exceptions
222259

223260
This package is completely transparent when it comes exceptions and other throwables – it will not catch an exception or silence an error.
@@ -236,6 +273,12 @@ try {
236273
}
237274
```
238275

276+
## Testing
277+
278+
```bash
279+
composer test
280+
```
281+
239282
## License
240283

241284
Pipeline is licensed under the permissive [MIT license](license.md).

src/Builder/PipelineBuilder.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public function add(callable $stage): PipelineBuilderContract
2020
return $this;
2121
}
2222

23-
public function build(ProcessorContract $processor = null): PipelineContract
23+
public function build(ProcessorContract|null $processor = null): PipelineContract
2424
{
2525
return new Pipeline($processor, ...$this->stages);
2626
}

src/Builder/PipelineBuilderContract.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@
1010
interface PipelineBuilderContract
1111
{
1212
public function add(callable $stage): PipelineBuilderContract;
13-
public function build(ProcessorContract $processor = null): PipelineContract;
13+
public function build(ProcessorContract|null $processor = null): PipelineContract;
1414
}

src/Pipeline.php

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,11 @@
1313
*/
1414
class Pipeline implements PipelineContract
1515
{
16-
private ProcessorContract $processor;
17-
1816
/** @var callable[] */
1917
private array $stages;
2018

2119
public function __construct(
22-
?ProcessorContract $processor = null,
20+
private ProcessorContract|null $processor = null,
2321
callable ...$stages
2422
) {
2523
$this->processor = $processor ?? new FingersCrossedProcessor();

0 commit comments

Comments
 (0)