Skip to content

Commit d3a441f

Browse files
committed
feat: implements basic recover & exchange unbind
1 parent 990e1df commit d3a441f

File tree

3 files changed

+168
-0
lines changed

3 files changed

+168
-0
lines changed
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
<?php
2+
3+
use Goopil\RabbitRs\PhpClient;
4+
use Goopil\RabbitRs\AmqpMessage;
5+
6+
// ---------------------------------------------------------------------------------------------
7+
// EXCHANGE UNBIND TESTS
8+
// ---------------------------------------------------------------------------------------------
9+
10+
test('exchangeUnbind stops routing between exchanges', function () {
11+
$c = mq_client(); $ch = $c->openChannel();
12+
13+
$src = 'ex.src.ub.' . bin2hex(random_bytes(3));
14+
$dst = 'ex.dst.ub.' . bin2hex(random_bytes(3));
15+
$q = 'q.x2x.ub.' . bin2hex(random_bytes(3));
16+
17+
expect($ch->exchangeDeclare($src, 'direct'))->toBeTrue();
18+
expect($ch->exchangeDeclare($dst, 'fanout'))->toBeTrue();
19+
expect($ch->queueDeclare($q, ['exclusive' => true, 'auto_delete' => true]))->toBeTrue();
20+
21+
expect($ch->queueBind($q, $dst, ''))->toBeTrue();
22+
expect($ch->exchangeBind($dst, $src, 'rk'))->toBeTrue();
23+
24+
// sanity: routed once
25+
expect($ch->basicPublish($src, 'rk', new AmqpMessage('before-unbind')))->toBeTrue();
26+
$d1 = $ch->basicGet($q); expect($d1?->getBody())->toBe('before-unbind');
27+
28+
// unbind breaks the route
29+
expect($ch->exchangeUnbind($dst, $src, 'rk'))->toBeTrue();
30+
31+
expect($ch->basicPublish($src, 'rk', new AmqpMessage('after-unbind')))->toBeTrue();
32+
$d2 = $ch->basicGet($q); expect($d2)->toBeNull();
33+
34+
$ch->close(); $c->close();
35+
});
36+
37+
test('exchangeUnbind with arguments works correctly', function () {
38+
$c = mq_client(); $ch = $c->openChannel();
39+
40+
$src = 'ex.src.args.' . bin2hex(random_bytes(3));
41+
$dst = 'ex.dst.args.' . bin2hex(random_bytes(3));
42+
$q = 'q.x2x.args.' . bin2hex(random_bytes(3));
43+
44+
expect($ch->exchangeDeclare($src, 'topic'))->toBeTrue();
45+
expect($ch->exchangeDeclare($dst, 'fanout'))->toBeTrue();
46+
expect($ch->queueDeclare($q, ['exclusive' => true, 'auto_delete' => true]))->toBeTrue();
47+
48+
expect($ch->queueBind($q, $dst, ''))->toBeTrue();
49+
50+
// Bind with arguments
51+
expect($ch->exchangeBind($dst, $src, 'test.key', [
52+
'arguments' => ['test-arg' => 'value']
53+
]))->toBeTrue();
54+
55+
// Unbind with same arguments
56+
expect($ch->exchangeUnbind($dst, $src, 'test.key', [
57+
'arguments' => ['test-arg' => 'value']
58+
]))->toBeTrue();
59+
60+
$ch->close(); $c->close();
61+
});
62+
63+
// ---------------------------------------------------------------------------------------------
64+
// BASIC RECOVER TESTS
65+
// ---------------------------------------------------------------------------------------------
66+
67+
test('basicRecover requeues unacked messages', function () {
68+
$c = mq_client(); $ch = $c->openChannel();
69+
70+
$q = 'q.recover.' . bin2hex(random_bytes(3));
71+
expect($ch->queueDeclare($q, ['exclusive' => true, 'auto_delete' => true]))->toBeTrue();
72+
73+
// Publish a few messages
74+
for ($i = 1; $i <= 3; $i++) {
75+
expect($ch->basicPublish('', $q, new AmqpMessage("message-$i")))->toBeTrue();
76+
}
77+
78+
// Get messages without acking them
79+
$deliveries = [];
80+
for ($i = 1; $i <= 3; $i++) {
81+
$delivery = $ch->basicGet($q, ['no_ack' => false]);
82+
expect($delivery)->toBeInstanceOf(\Goopil\RabbitRs\AmqpDelivery::class);
83+
$deliveries[] = $delivery;
84+
}
85+
86+
// Queue should be empty now
87+
$empty = $ch->basicGet($q, ['no_ack' => false]);
88+
expect($empty)->toBeNull();
89+
90+
// Call basic.recover with requeue=true
91+
expect($ch->basicRecover(true))->toBeTrue();
92+
93+
// Messages should be available again
94+
for ($i = 1; $i <= 3; $i++) {
95+
$delivery = $ch->basicGet($q, ['no_ack' => false]);
96+
expect($delivery)->toBeInstanceOf(\Goopil\RabbitRs\AmqpDelivery::class);
97+
expect($delivery->getBody())->toBe("message-$i");
98+
}
99+
100+
$ch->close(); $c->close();
101+
});
102+
103+
test('basicRecover with requeue=false throws NOT_IMPLEMENTED on RabbitMQ', function () {
104+
$c = mq_client(); $ch = $c->openChannel();
105+
106+
$q = 'q.recover.no.requeue.' . bin2hex(random_bytes(3));
107+
expect($ch->queueDeclare($q, ['exclusive' => true, 'auto_delete' => true]))->toBeTrue();
108+
109+
// Publish a message
110+
expect($ch->basicPublish('', $q, new AmqpMessage('test-message')))->toBeTrue();
111+
112+
// Get message without acking it
113+
$delivery = $ch->basicGet($q, ['no_ack' => false]);
114+
expect($delivery)->toBeInstanceOf(\Goopil\RabbitRs\AmqpDelivery::class);
115+
116+
// Queue should be empty now
117+
$empty = $ch->basicGet($q, ['no_ack' => false]);
118+
expect($empty)->toBeNull();
119+
120+
// Call basic.recover with requeue=false - should throw NOT_IMPLEMENTED on RabbitMQ
121+
expect(fn() => $ch->basicRecover(false))->toThrow(Exception::class);
122+
123+
// Channel is closed by broker after NOT_IMPLEMENTED; tolerate close()
124+
try { $ch->close(); } catch (Exception $e) { /* already closed by broker */ }
125+
$c->close();
126+
});
127+
128+
test('basicRecover works with default requeue parameter', function () {
129+
$c = mq_client(); $ch = $c->openChannel();
130+
131+
$q = 'q.recover.default.' . bin2hex(random_bytes(3));
132+
expect($ch->queueDeclare($q, ['exclusive' => true, 'auto_delete' => true]))->toBeTrue();
133+
134+
// Publish a message
135+
expect($ch->basicPublish('', $q, new AmqpMessage('test-message')))->toBeTrue();
136+
137+
// Get message without acking it
138+
$delivery = $ch->basicGet($q, ['no_ack' => false]);
139+
expect($delivery)->toBeInstanceOf(\Goopil\RabbitRs\AmqpDelivery::class);
140+
141+
// Call basic.recover without parameters (should default to requeue=true)
142+
expect($ch->basicRecover())->toBeTrue();
143+
144+
// Message should be available again
145+
$delivery = $ch->basicGet($q, ['no_ack' => false]);
146+
expect($delivery)->toBeInstanceOf(\Goopil\RabbitRs\AmqpDelivery::class);
147+
expect($delivery->getBody())->toBe('test-message');
148+
149+
$ch->close(); $c->close();
150+
});

src/core/channels/channel.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,16 @@ impl AmqpChannel {
393393
})
394394
}
395395

396+
pub fn basic_recover(&self, requeue: bool) -> Result<()> {
397+
self.ensure_open()?;
398+
let ch = self.clone_channel();
399+
400+
RUNTIME.block_on(async move {
401+
ch.basic_recover(lapin::options::BasicRecoverOptions { requeue }).await?;
402+
Ok(())
403+
})
404+
}
405+
396406
pub fn close(&self) -> Result<()> {
397407
if self.closed.swap(true, Ordering::SeqCst) {
398408
// Already closed; be idempotent

src/php/channel.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,14 @@ impl PhpChannel {
281281
Ok(true)
282282
}
283283

284+
pub fn basic_recover(&self, requeue: Option<bool>) -> PhpResult<bool> {
285+
let requeue = requeue.unwrap_or(true);
286+
self.inner
287+
.basic_recover(requeue)
288+
.map_err(|e| PhpException::default(php_safe(format!("basic_recover failed: {e}"))))?;
289+
Ok(true)
290+
}
291+
284292
pub fn close(&self) -> PhpResult<bool> {
285293
// If the broker already closed the channel with an error (e.g., NOT_FOUND),
286294
// decide whether to surface it or tolerate close() depending on the op.

0 commit comments

Comments
 (0)