Skip to content

Commit eef6c50

Browse files
committed
add further constraints to interruptible tap processor; update tests and readme
1 parent f3637e4 commit eef6c50

File tree

4 files changed

+179
-86
lines changed

4 files changed

+179
-86
lines changed

changelog.md

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,18 @@
22

33
## 4.0.0 - 2025-08-18
44

5+
### Added
6+
7+
* `InterruptipleTapProcessor`, which implements both `InterruptipleProcessor` and `TapProcessor`
8+
* Added generic typing information across the package
9+
510
### Changed
611

712
* **Minimum PHP version is now 8.3**
8-
* Removed Orchestra Testbench in favor of PestPHP
13+
* Test framework is now Pest
914
* Updated `nunomaduro/collision` to ^8.0
10-
* Added generic typing information across the package
11-
* Updated tests to be a bit more exhaustive
12-
* Adds an `InterruptipleTapProcessor` that implements both `InterruptipleProcessor` and `TapProcessor`, with the exception that before and after callbacks are optional. I realise this means `InterruptipleProcessor` can be made redundant, but the idea is to simplify this package drastically in v5 (likely to use a single processor that does it all).
15+
* Tests are now a bit more exhaustive
16+
* `TapProcessor` now requires at least one callback when instantiated.
1317

1418
## 3.0.0 - 2023-02-07
1519

readme.md

Lines changed: 91 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,44 @@
1-
<!-- exclude-from-website: -->
21
# Rockett\Pipeline
32

43
![GitHub License](https://img.shields.io/github/license/mikerockett/pipeline?style=for-the-badge)
54
![Packagist Version](https://img.shields.io/packagist/v/rockett/pipeline?label=Release&style=for-the-badge)
65
![Packagist Downloads](https://img.shields.io/packagist/dm/rockett/pipeline?label=Installs&style=for-the-badge)
76
![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/mikerockett/pipeline/test.yml?label=Tests&style=for-the-badge)
8-
<!-- /exclude-from-website -->
97

10-
Built atop [League’s excellent package](https://github.com/thephpleague/pipeline), `Rockett\Pipeline` provides an implementation of the [pipeline pattern](https://en.wikipedia.org/wiki/Pipeline_(software)).
8+
Built atop [League's excellent package](https://github.com/thephpleague/pipeline), `Rockett\Pipeline` provides an implementation of the [pipeline pattern](https://en.wikipedia.org/wiki/Pipeline_(software)) with additional processors for **conditional interruption** and **stage tapping**.
9+
10+
## Requirements
11+
12+
- PHP 8.3+
13+
14+
## Installation
15+
16+
```bash
17+
composer require rockett/pipeline
18+
```
19+
20+
## Quick Start
21+
22+
```php
23+
use Rockett\Pipeline\Pipeline;
24+
25+
$pipeline = (new Pipeline)
26+
->pipe(fn($x) => $x * 2)
27+
->pipe(fn($x) => $x + 1);
28+
29+
echo $pipeline->process(10); // Outputs: 21
30+
```
31+
32+
## Table of Contents
33+
34+
- [Pipeline Pattern](#pipeline-pattern)
35+
- [Immutability](#immutability)
36+
- [Usage](#usage)
37+
- [Class-based stages](#class-based-stages)
38+
- [Re-usability](#re-usability)
39+
- [Pipeline Builders](#pipeline-builders)
40+
- [Processors](#processors)
41+
- [Handling Exceptions](#handling-exceptions)
1142

1243
## Pipeline Pattern
1344

@@ -37,7 +68,7 @@ Pipelines are implemented as immutable stage-chains, contracted by the `Pipeline
3768

3869
## Usage
3970

40-
Operations in a pipeline (stages) can accept anything from the pipeline that satisfies the `callable` type-hint. So closures and anything thats invokable will work.
71+
Operations in a pipeline (stages) can accept anything from the pipeline that satisfies the `callable` type-hint. So closures and anything that's invokable will work.
4172

4273
```php
4374
$pipeline = (new Pipeline)->pipe(static function ($traveler) {
@@ -120,68 +151,60 @@ $pipeline = $pipelineBuilder->build();
120151

121152
## Processors
122153

123-
When stages are piped through a pipeline, they are done so using a processor, which is responsible for iterating through each stage and piping it into the owning pipeline. There are three available processors:
154+
**This is where Rockett\Pipeline extends League's package** – when stages are piped through a pipeline, they are done so using a processor, which is responsible for iterating through each stage and piping it into the owning pipeline. There are four available processors:
124155

125156
* `FingersCrossedProcessor` (this is the default)
126-
* `InterruptibleProcessor`
127-
* `TapProcessor`
157+
* `InterruptibleProcessor` – Exit pipelines early based on conditions
158+
* `TapProcessor` – Execute callbacks before/after each stage (requires at least one callback)
159+
* `InterruptibleTapProcessor` – Combines both interruption and tapping (requires at least one tap callback)
128160

129-
It goes without saying that the default processor only iterates and pipes stages. It does nothing else, and there is no way to exit the pipeline without throwing an exception.
161+
The default processor only iterates and pipes stages. It does nothing else, and there is no way to exit the pipeline without throwing an exception.
130162

131163
### Exiting pipelines early
132164

133-
The `InterruptibleProcessor`, on the other hand, provides a mechanism that allows you to exit the pipeline early, if so required. This is done by way of a `callable` that is invoked at every stage as a condition to continuing the pipeline:
165+
The `InterruptibleProcessor` provides a mechanism that allows you to exit the pipeline early, if so required. This is done by way of a `callable` that is invoked at every stage as a condition to continuing the pipeline:
134166

135167
```php
136168
use Rockett\Pipeline\Processors\InterruptibleProcessor;
137169

138170
$processor = new InterruptibleProcessor(
139-
static fn ($traveler) => $traveler->somethingIsntRight()
171+
fn($traveler) => $traveler->hasError()
140172
);
141173

142174
$pipeline = (new Pipeline($processor))
143-
->pipe(new SafeStage)
144-
->pipe(new UnsafeStage)
145-
->pipe(new AnotherSafeStage);
175+
->pipe(new ValidateInput)
176+
->pipe(new ProcessData)
177+
->pipe(new SaveToDatabase);
146178

147-
$output = $pipeline->process($traveler);
179+
$output = $pipeline->process($request);
148180
```
149181

150-
In this example, the callable passed to the processor will check to see if something isn’t right and, if so, it will return `true`, causing the processor exit the pipeline and return the traveler as the output.
182+
In this example, the callable will check if the traveler has an error and, if so, it will return `true`, causing the processor to exit the pipeline early and return the current traveler as the output.
151183

152-
You can also use the `continueUnless` helper to instantiate the interruptible processor:
184+
**Helper methods:**
153185

154186
```php
187+
// Exit when condition is true
155188
$processor = InterruptibleProcessor::continueUnless(
156-
static fn ($traveler) => $traveler->somethingIsntRight()
189+
fn($traveler) => $traveler->hasError()
157190
);
158-
```
159191

160-
If you would like to reverse the condition and only continue when the callable returns true, you can use the `continueWhen` helper instead:
161-
162-
```php
192+
// Exit when condition becomes false
163193
$processor = InterruptibleProcessor::continueWhen(
164-
static fn ($traveler) => $traveler->everythingIsFine()
194+
fn($traveler) => $traveler->isValid()
165195
);
166196
```
167197

168198
### Invoking actions on each stage
169199

170-
Using the `TapProcessor`, you can invoke an action before and/or after a stage is piped through a pipeline. This can be useful if you would like to handle common side-effects outside of each stage, such as logging or broadcasting.
171-
172-
The processor takes two callables:
200+
Using the `TapProcessor`, you can invoke an action before and/or after a stage is piped through a pipeline. This is useful for cross-cutting concerns like logging, metrics, or debugging.
173201

174202
```php
175203
use Rockett\Pipeline\Processors\TapProcessor;
176204

177-
// Define and instantiate a $logger and a $broadcaster …
178-
179205
$processor = new TapProcessor(
180-
// $beforeEach, called before a stage is piped
181-
static fn ($traveler) => $logger->info('Traveller passing through pipeline:', $traveler->toArray()),
182-
183-
// $afterEach, called after a stage is piped and the output captured
184-
static fn ($traveler) => $broadcaster->broadcast($users, 'Something happened', $traveler)
206+
beforeCallback: fn($traveler) => $logger->info('Processing:', $traveler->toArray()),
207+
afterCallback: fn($traveler) => $metrics->increment('pipeline.stage.completed')
185208
);
186209

187210
$pipeline = (new Pipeline($processor))
@@ -192,32 +215,46 @@ $pipeline = (new Pipeline($processor))
192215
$output = $pipeline->process($traveler);
193216
```
194217

195-
Both of these callables are **optional**. By excluding both, the processor will act in the exact same way as the default `FingersCrossedProcessor`.
196-
197-
If you would like to pass only one callback, then you can use the helper methods:
218+
**At least one callback is required.** You can also use fluent methods:
198219

199220
```php
200-
$processor = (new TapProcessor)->beforeEach(/** callable **/); // or …
201-
$processor = (new TapProcessor)->afterEach(/** callable **/);
221+
$processor = (new TapProcessor)
222+
->beforeEach(fn($traveler) => $logger->debug('Before:', $traveler))
223+
->afterEach(fn($traveler) => $logger->debug('After:', $traveler));
202224
```
203225

204-
You can also chain them as an alternative to using the constructor:
226+
### Combining interruption and tapping
227+
228+
The `InterruptibleTapProcessor` combines both features:
205229

206230
```php
207-
$processor = (new TapProcessor)
208-
->beforeEach(/** callable **/)
209-
->afterEach(/** callable **/);
210-
```
231+
use Rockett\Pipeline\Processors\InterruptibleTapProcessor;
232+
233+
$processor = new InterruptibleTapProcessor(
234+
interruptCallback: fn($traveler) => $traveler->shouldStop(),
235+
beforeCallback: fn($traveler) => $logger->info('Processing stage'),
236+
afterCallback: fn($traveler) => $metrics->increment('stage.completed')
237+
);
211238

212-
However, it is encouraged that you use [named arguments](https://stitcher.io/blog/php-8-named-arguments):
239+
// Or using static factory methods (tap callbacks required)
240+
$processor = InterruptibleTapProcessor::continueUnless(
241+
fn($traveler) => $traveler->hasError(),
242+
beforeCallback: fn($traveler) => $logger->debug('Before stage')
243+
);
213244

214-
```php
215-
$processor = new TapProcessor(
216-
beforeEach: /** optional callable **/,
217-
afterEach: /** optional callable **/,
218-
)
245+
// Or using fluent interface
246+
$processor = InterruptibleTapProcessor::continueWhen(
247+
fn($traveler) => $traveler->isValid(),
248+
afterCallback: fn($traveler) => $logger->debug('After stage')
249+
)->beforeEach(fn($traveler) => $logger->debug('Before stage'));
219250
```
220251

252+
> [!NOTE]
253+
> This will likely become the default processor in a future release.
254+
255+
> [!TIP]
256+
> The `InterruptibleTapProcessor` is particularly useful for complex pipelines where you need both conditional logic and observability.
257+
221258
## Handling Exceptions
222259

223260
This package is completely transparent when it comes exceptions and other throwables – it will not catch an exception or silence an error.
@@ -236,6 +273,12 @@ try {
236273
}
237274
```
238275

276+
## Testing
277+
278+
```bash
279+
composer test
280+
```
281+
239282
## License
240283

241284
Pipeline is licensed under the permissive [MIT license](license.md).

src/Processors/InterruptibleTapProcessor.php

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ public function __construct(
2828
throw new InvalidArgumentException('$callback must be callable');
2929
}
3030

31+
if ($beforeCallback === null && $afterCallback === null) {
32+
throw new InvalidArgumentException(
33+
'At least one of $beforeCallback and $afterCallback must be provided'
34+
);
35+
}
36+
3137
if ($beforeCallback && !is_callable($beforeCallback)) {
3238
throw new InvalidArgumentException('$beforeCallback must be callable');
3339
}
@@ -37,14 +43,28 @@ public function __construct(
3743
}
3844
}
3945

40-
public static function continueUnless(callable $callback): self
41-
{
42-
return new static($callback);
46+
public static function continueUnless(
47+
callable $callback,
48+
callable|null $beforeCallback = null,
49+
callable|null $afterCallback = null
50+
): self {
51+
return new static(
52+
$callback,
53+
$beforeCallback,
54+
$afterCallback
55+
);
4356
}
4457

45-
public static function continueWhen(callable $callback): self
46-
{
47-
return (new static($callback))->withInversedConditioner();
58+
public static function continueWhen(
59+
callable $callback,
60+
callable|null $beforeCallback = null,
61+
callable|null $afterCallback = null
62+
): self {
63+
return (new static(
64+
$callback,
65+
$beforeCallback,
66+
$afterCallback
67+
))->withInversedConditioner();
4868
}
4969

5070
public function beforeEach(callable $callback): self

0 commit comments

Comments
 (0)