Skip to content

Commit ddaf028

Browse files
committed
adding more stubs and BackpressureQueue impl and test
1 parent 1963d35 commit ddaf028

File tree

11 files changed

+348
-33
lines changed

11 files changed

+348
-33
lines changed

composer.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
},
2929
"require-dev": {
3030
"amphp/sync": "^2.3",
31+
"illuminate/cache": "^12.47",
3132
"phpstan/phpstan": "^2.1",
3233
"phpunit/phpunit": "12.5.4",
3334
"react/async": "^4.3",
@@ -38,6 +39,7 @@
3839
"squizlabs/php_codesniffer": "^4.0",
3940
"symfony/framework-bundle": "^8.0",
4041
"symfony/mercure": "^0.7.2",
42+
"symfony/rate-limiter": "^8.0",
4143
"testcontainers/testcontainers": "^1.0"
4244
},
4345
"suggest": {

src/Bridge/Laravel/Seal/LaravelAtomicLockSeal.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
namespace Clegginabox\Airlock\Bridge\Laravel\Seal;
66

7+
/**
8+
* Wraps Illuminate\Cache\Lock
9+
*/
710
class LaravelAtomicLockSeal
811
{
912
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Clegginabox\Airlock\Bridge\Laravel\Seal;
6+
7+
/**
8+
* Wraps Illuminate\Cache\RateLimiter
9+
*/
10+
class LaravelRateLimiterSeal {}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
6+
namespace Clegginabox\Airlock\Bridge\Symfony\Seal;
7+
8+
class SymfonyRateLimiterSeal {}

src/HealthCheckerInterface.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<?php
2+
3+
namespace Clegginabox\Airlock;
4+
5+
interface HealthCheckerInterface
6+
{
7+
/**
8+
* @return float 0.0 (dead) to 1.0 (fully healthy)
9+
*/
10+
public function getScore(): float;
11+
}

src/Queue/BackpressureQueue.php

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Clegginabox\Airlock\Queue;
6+
7+
use Clegginabox\Airlock\HealthCheckerInterface;
8+
9+
/**
10+
* BackpressureQueue - Adaptive admission based on downstream health.
11+
*
12+
* THE CONCEPT:
13+
* ------------
14+
* Normal queues admit at a fixed rate. Backpressure queues adapt based on
15+
* signals from downstream systems. If your database is slow, admit fewer
16+
* people. If it recovers, speed up again.
17+
*
18+
* Think of it like a tap that adjusts water flow based on how fast the sink
19+
* is draining. If the sink is backing up, reduce the flow.
20+
*
21+
* CORE IDEA:
22+
* ----------
23+
* This is a DECORATOR around another queue. It doesn't replace FIFO/Lottery
24+
* logic - it wraps it and adds a "should we even try right now?" check.
25+
*
26+
* BackpressureQueue
27+
* └── wraps RedisFifoQueue (or any other queue)
28+
*
29+
* HEALTH SIGNALS - where does backpressure info come from?
30+
* --------------------------------------------------------
31+
* Option A: External health checker (injected dependency)
32+
* - A service that polls your DB, API, etc. and returns a 0.0-1.0 health score
33+
* - Queue asks "what's the health?" before each peek/pop
34+
*
35+
* Option B: Redis key convention
36+
* - Something else writes to a Redis key like "backpressure:health" (0.0-1.0)
37+
* - Queue reads that key directly
38+
* - Simple, decoupled - a cron job or the app itself updates the key
39+
*
40+
* Option C: Self-measuring (advanced)
41+
* - Track how long admitted users take to release()
42+
* - If latency spikes, reduce admission rate
43+
* - Requires feedback loop from the Seal back to the Queue
44+
*
45+
* WHAT DOES "BACKPRESSURE" ACTUALLY CHANGE?
46+
* -----------------------------------------
47+
* When health is low, you have choices:
48+
*
49+
* 1. THROTTLE PEEK/POP
50+
* - peek() returns null even if queue has people
51+
* - "Sorry, system is busy, try again shortly"
52+
* - Simple but blunt
53+
*
54+
* 2. PROBABILISTIC ADMISSION
55+
* - health = 0.3 means 30% chance peek() returns the real head
56+
* - Randomly skip turns based on health score
57+
* - Smoother degradation
58+
*
59+
* 3. DELAYED ADMISSION
60+
* - peek() returns head only if enough time has passed since last admission
61+
* - Low health = longer delays between admissions
62+
* - Rate limiting based on health
63+
*
64+
*
65+
* HEALTH CHECKER INTERFACE (you'll need this):
66+
* --------------------------------------------
67+
*
68+
* interface HealthCheckerInterface
69+
* {
70+
* /** @return float 0.0 (dead) to 1.0 (fully healthy)
71+
* public function getScore(): float;
72+
* }
73+
*
74+
* // Simple Redis-based implementation:
75+
* class RedisHealthChecker implements HealthCheckerInterface
76+
* {
77+
* public function __construct(private Redis $redis, private string $key) {}
78+
*
79+
* public function getScore(): float
80+
* {
81+
* return (float) ($this->redis->get($this->key) ?? 1.0);
82+
* }
83+
* }
84+
*
85+
* WHO SETS THE HEALTH SCORE?
86+
* --------------------------
87+
* That's outside this class. Could be:
88+
* - A middleware that tracks response times and writes to Redis
89+
* - A cron job that pings your DB and updates the key
90+
* - Your app's exception handler (errors spike = reduce health)
91+
* - An external monitoring tool via webhook
92+
*
93+
* QUESTIONS TO CONSIDER:
94+
* ----------------------
95+
* 1. Should getPosition() reflect backpressure? (Probably not - your position
96+
* in line doesn't change, just how fast the line moves)
97+
*
98+
* 2. What about pop()? Should it also respect backpressure or just peek()?
99+
* (Usually just peek - once you're peeked/selected, you should be admitted)
100+
*
101+
* 3. Should there be a "circuit breaker" mode where health < X completely
102+
* stops admission vs gradual degradation?
103+
*
104+
* 4. How do you prevent oscillation? (Health drops -> fewer users -> health
105+
* recovers -> flood of users -> health drops again)
106+
* Consider: smoothing/averaging, hysteresis, ramp-up delays
107+
*/
108+
class BackpressureQueue implements QueueInterface
109+
{
110+
public function __construct(
111+
private QueueInterface $inner,
112+
private HealthCheckerInterface $healthChecker,
113+
private float $minHealthToAdmit = 0.2,
114+
) {}
115+
116+
// Your implementation here
117+
public function add(string $identifier, int $priority = 0): int
118+
{
119+
return $this->inner->add($identifier, $priority);
120+
}
121+
122+
public function remove(string $identifier): void
123+
{
124+
$this->inner->remove($identifier);
125+
}
126+
127+
public function peek(): ?string
128+
{
129+
$score = $this->healthChecker->getScore();
130+
131+
if ($score < $this->minHealthToAdmit) {
132+
return null;
133+
}
134+
135+
return $this->inner->peek();
136+
}
137+
138+
public function getPosition(string $identifier): ?int
139+
{
140+
return $this->inner->getPosition($identifier);
141+
}
142+
}

src/Seal/WindowRateSeal.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
declare(strict_types=1);
44

5-
65
namespace Clegginabox\Airlock\Seal;
76

7+
/**
8+
* A rate-limiting seal that grants up to N admits per window
9+
*/
810
class WindowRateSeal {}

tests/Factory/RedisFactory.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Clegginabox\Airlock\Tests\Factory;
6+
7+
use Redis;
8+
9+
class RedisFactory
10+
{
11+
public static function create(): Redis
12+
{
13+
$url = $_ENV['REDIS_URL'] ?? throw new \RuntimeException('REDIS_URL not set in bootstrap');
14+
15+
$parsed = parse_url($url);
16+
17+
$redis = new Redis();
18+
$redis->connect(
19+
$parsed['host'] ?? '127.0.0.1',
20+
$parsed['port'] ?? 6379,
21+
);
22+
23+
return $redis;
24+
}
25+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Clegginabox\Airlock\Tests\Integration\Queue;
6+
7+
use Clegginabox\Airlock\HealthCheckerInterface;
8+
use Clegginabox\Airlock\Queue\BackpressureQueue;
9+
use Clegginabox\Airlock\Queue\RedisFifoQueue;
10+
use Clegginabox\Airlock\Tests\Factory\RedisFactory;
11+
use PHPUnit\Framework\TestCase;
12+
use Redis;
13+
14+
class BackpressureQueueTest extends TestCase
15+
{
16+
private Redis $redis;
17+
18+
private RedisFifoQueue $fifoQueue;
19+
20+
protected function setUp(): void
21+
{
22+
parent::setUp();
23+
24+
$this->redis = RedisFactory::create();
25+
$this->redis->flushAll();
26+
27+
$this->fifoQueue = new RedisFifoQueue(
28+
$this->redis,
29+
'test:queue:list',
30+
'test:queue:set'
31+
);
32+
}
33+
34+
public function testItAllowsPeekWhenHealthIsGood(): void
35+
{
36+
$healthCheck = new class implements HealthCheckerInterface {
37+
public function getScore(): float
38+
{
39+
return 1;
40+
}
41+
};
42+
43+
$backpressureQueue = new BackpressureQueue(
44+
$this->fifoQueue,
45+
$healthCheck,
46+
0.5
47+
);
48+
49+
$backpressureQueue->add('user_A');
50+
$this->assertSame('user_A', $backpressureQueue->peek());
51+
}
52+
53+
public function testPeekReturnsNullWhenHealthIsBad(): void
54+
{
55+
$healthCheck = new class implements HealthCheckerInterface {
56+
public function getScore(): float
57+
{
58+
return 0;
59+
}
60+
};
61+
62+
$backpressureQueue = new BackpressureQueue(
63+
$this->fifoQueue,
64+
$healthCheck,
65+
0.5
66+
);
67+
68+
$backpressureQueue->add('user_A');
69+
$this->assertNull($backpressureQueue->peek());
70+
}
71+
72+
/**
73+
* At threshold = allow entry
74+
*/
75+
public function testPeekAtExactThreshold(): void
76+
{
77+
$healthCheck = new class implements HealthCheckerInterface {
78+
public function getScore(): float
79+
{
80+
return 0.5;
81+
}
82+
};
83+
84+
$backpressureQueue = new BackpressureQueue(
85+
$this->fifoQueue,
86+
$healthCheck,
87+
0.5
88+
);
89+
90+
$backpressureQueue->add('user_A');
91+
$this->assertEquals('user_A', $backpressureQueue->peek());
92+
}
93+
94+
public function testAddAndRemoveStillDelegate(): void
95+
{
96+
$healthCheck = new class implements HealthCheckerInterface {
97+
public function getScore(): float
98+
{
99+
return 1;
100+
}
101+
};
102+
103+
$backpressureQueue = new BackpressureQueue(
104+
$this->fifoQueue,
105+
$healthCheck,
106+
0.5
107+
);
108+
109+
$backpressureQueue->add('user_A');
110+
$backpressureQueue->add('user_B');
111+
$backpressureQueue->add('user_C');
112+
113+
$backpressureQueue->remove('user_A');
114+
$this->assertSame('user_B', $backpressureQueue->peek());
115+
}
116+
117+
public function testHealthIsCheckedOnEveryPeek(): void
118+
{
119+
$healthCheck = new class implements HealthCheckerInterface {
120+
public float $score = 1.0;
121+
public function getScore(): float
122+
{
123+
return $this->score;
124+
}
125+
};
126+
127+
$backpressureQueue = new BackpressureQueue(
128+
$this->fifoQueue,
129+
$healthCheck,
130+
0.5
131+
);
132+
133+
$backpressureQueue->add('user_A');
134+
135+
$this->assertSame('user_A', $backpressureQueue->peek());
136+
137+
$healthCheck->score = 0.0;
138+
$this->assertNull($backpressureQueue->peek());
139+
}
140+
}

0 commit comments

Comments
 (0)