Skip to content

Commit 990e1df

Browse files
committed
feat: implements tx
1 parent 1cc4f8a commit 990e1df

File tree

3 files changed

+246
-0
lines changed

3 files changed

+246
-0
lines changed
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
<?php
2+
3+
use Goopil\RabbitRs\PhpClient;
4+
use Goopil\RabbitRs\AmqpMessage;
5+
6+
// ---------------------------------------------------------------------------------------------
7+
// TRANSACTION TESTS
8+
// ---------------------------------------------------------------------------------------------
9+
10+
test('PhpChannel transaction methods work correctly', function () {
11+
// Description: Tests that PhpChannel transaction methods work correctly.
12+
$c = mq_client();
13+
$ch = $c->openChannel();
14+
15+
// Test tx_select
16+
expect($ch->txSelect())->toBeTrue();
17+
18+
// Test tx_commit
19+
expect($ch->txCommit())->toBeTrue();
20+
21+
// Test tx_rollback
22+
expect($ch->txSelect())->toBeTrue();
23+
expect($ch->txRollback())->toBeTrue();
24+
25+
$ch->close();
26+
$c->close();
27+
});
28+
29+
test('PhpChannel transaction with publish and rollback', function () {
30+
// Description: Tests that messages published in a transaction can be rolled back.
31+
$c = mq_client();
32+
$ch = $c->openChannel();
33+
34+
// Declare a temporary queue for testing
35+
$queue = 'test_transaction_queue_' . uniqid();
36+
$ch->queueDeclare($queue, ['exclusive' => true]);
37+
38+
// First, publish a message without transaction to establish baseline
39+
$message1 = new AmqpMessage('normal_message');
40+
expect($ch->basicPublish('', $queue, $message1))->toBeTrue();
41+
42+
// Give some time for the message to be processed
43+
usleep(100000); // 100ms
44+
45+
// Verify the normal message is delivered
46+
$delivery1 = $ch->basicGet($queue, ['no_ack' => true]);
47+
expect($delivery1)->not->toBeNull();
48+
expect($delivery1->getBody())->toBe('normal_message');
49+
50+
// Start transaction
51+
expect($ch->txSelect())->toBeTrue();
52+
53+
// Publish a message within the transaction
54+
$message2 = new AmqpMessage('transaction_test_message');
55+
expect($ch->basicPublish('', $queue, $message2))->toBeTrue();
56+
57+
// Rollback the transaction
58+
expect($ch->txRollback())->toBeTrue();
59+
60+
// Verify that no additional message was delivered (queue should be empty)
61+
$delivery2 = $ch->basicGet($queue, ['no_ack' => true]);
62+
expect($delivery2)->toBeNull();
63+
64+
$ch->close();
65+
$c->close();
66+
});
67+
68+
test('PhpChannel transaction with publish and commit', function () {
69+
// Description: Tests that messages published in a transaction are delivered after commit.
70+
$c = mq_client();
71+
$ch = $c->openChannel();
72+
73+
// Declare a temporary queue for testing
74+
$queue = 'test_transaction_queue_' . uniqid();
75+
$ch->queueDeclare($queue, ['exclusive' => true]);
76+
77+
// First, verify queue is empty
78+
$delivery = $ch->basicGet($queue, ['no_ack' => true]);
79+
expect($delivery)->toBeNull();
80+
81+
// Start transaction
82+
expect($ch->txSelect())->toBeTrue();
83+
84+
// Publish a message within the transaction
85+
$message = new AmqpMessage('transaction_test_message');
86+
expect($ch->basicPublish('', $queue, $message))->toBeTrue();
87+
88+
// Verify message is not yet available (transaction not committed)
89+
$delivery = $ch->basicGet($queue, ['no_ack' => true]);
90+
expect($delivery)->toBeNull();
91+
92+
// Commit the transaction
93+
expect($ch->txCommit())->toBeTrue();
94+
95+
// Give some time for the message to be processed
96+
usleep(100000); // 100ms
97+
98+
// Verify that the message was delivered (queue should contain the message)
99+
$delivery = $ch->basicGet($queue, ['no_ack' => true]);
100+
expect($delivery)->not->toBeNull();
101+
expect($delivery->getBody())->toBe('transaction_test_message');
102+
103+
$ch->close();
104+
$c->close();
105+
});
106+
107+
test('PhpChannel transaction with multiple messages and rollback', function () {
108+
// Description: Tests that multiple messages published in a transaction can be rolled back.
109+
$c = mq_client();
110+
$ch = $c->openChannel();
111+
112+
// Declare a temporary queue for testing
113+
$queue = 'test_transaction_queue_' . uniqid();
114+
$ch->queueDeclare($queue, ['exclusive' => true]);
115+
116+
// First, verify queue is empty
117+
$delivery = $ch->basicGet($queue, ['no_ack' => true]);
118+
expect($delivery)->toBeNull();
119+
120+
// Start transaction
121+
expect($ch->txSelect())->toBeTrue();
122+
123+
// Publish multiple messages within the transaction
124+
for ($i = 0; $i < 5; $i++) {
125+
$message = new AmqpMessage("transaction_message_$i");
126+
expect($ch->basicPublish('', $queue, $message))->toBeTrue();
127+
}
128+
129+
// Verify messages are not yet available (transaction not committed)
130+
$delivery = $ch->basicGet($queue, ['no_ack' => true]);
131+
expect($delivery)->toBeNull();
132+
133+
// Rollback the transaction
134+
expect($ch->txRollback())->toBeTrue();
135+
136+
// Verify that no messages were delivered (queue should still be empty)
137+
$delivery = $ch->basicGet($queue, ['no_ack' => true]);
138+
expect($delivery)->toBeNull();
139+
140+
$ch->close();
141+
$c->close();
142+
});
143+
144+
test('PhpChannel transaction with multiple messages and commit', function () {
145+
// Description: Tests that multiple messages published in a transaction are delivered after commit.
146+
$c = mq_client();
147+
$ch = $c->openChannel();
148+
149+
// Declare a temporary queue for testing
150+
$queue = 'test_transaction_queue_' . uniqid();
151+
$ch->queueDeclare($queue, ['exclusive' => true]);
152+
153+
// First, verify queue is empty
154+
$delivery = $ch->basicGet($queue, ['no_ack' => true]);
155+
expect($delivery)->toBeNull();
156+
157+
// Start transaction
158+
expect($ch->txSelect())->toBeTrue();
159+
160+
// Publish multiple messages within the transaction
161+
$messages = [];
162+
for ($i = 0; $i < 3; $i++) {
163+
$message = new AmqpMessage("transaction_message_$i");
164+
$messages[] = "transaction_message_$i";
165+
expect($ch->basicPublish('', $queue, $message))->toBeTrue();
166+
}
167+
168+
// Verify messages are not yet available (transaction not committed)
169+
$delivery = $ch->basicGet($queue, ['no_ack' => true]);
170+
expect($delivery)->toBeNull();
171+
172+
// Commit the transaction
173+
expect($ch->txCommit())->toBeTrue();
174+
175+
// Give some time for the messages to be processed
176+
usleep(200000); // 200ms
177+
178+
// Verify that all messages were delivered
179+
$receivedMessages = [];
180+
for ($i = 0; $i < 3; $i++) {
181+
$delivery = $ch->basicGet($queue, ['no_ack' => true]);
182+
expect($delivery)->not->toBeNull();
183+
$receivedMessages[] = $delivery->getBody();
184+
}
185+
186+
// Verify we received all messages
187+
expect(count($receivedMessages))->toBe(3);
188+
189+
// Verify queue is now empty
190+
$delivery = $ch->basicGet($queue, ['no_ack' => true]);
191+
expect($delivery)->toBeNull();
192+
193+
$ch->close();
194+
$c->close();
195+
});

src/core/channels/channel.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,36 @@ impl AmqpChannel {
363363
res
364364
}
365365

366+
pub fn tx_select(&self) -> Result<()> {
367+
self.ensure_open()?;
368+
let ch = self.clone_channel();
369+
370+
RUNTIME.block_on(async move {
371+
ch.tx_select().await?;
372+
Ok(())
373+
})
374+
}
375+
376+
pub fn tx_commit(&self) -> Result<()> {
377+
self.ensure_open()?;
378+
let ch = self.clone_channel();
379+
380+
RUNTIME.block_on(async move {
381+
ch.tx_commit().await?;
382+
Ok(())
383+
})
384+
}
385+
386+
pub fn tx_rollback(&self) -> Result<()> {
387+
self.ensure_open()?;
388+
let ch = self.clone_channel();
389+
390+
RUNTIME.block_on(async move {
391+
ch.tx_rollback().await?;
392+
Ok(())
393+
})
394+
}
395+
366396
pub fn close(&self) -> Result<()> {
367397
if self.closed.swap(true, Ordering::SeqCst) {
368398
// Already closed; be idempotent

src/php/channel.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,27 @@ impl PhpChannel {
260260
Ok(true)
261261
}
262262

263+
pub fn tx_select(&self) -> PhpResult<bool> {
264+
self.inner
265+
.tx_select()
266+
.map_err(|e| PhpException::default(php_safe(format!("tx_select failed: {e}"))))?;
267+
Ok(true)
268+
}
269+
270+
pub fn tx_commit(&self) -> PhpResult<bool> {
271+
self.inner
272+
.tx_commit()
273+
.map_err(|e| PhpException::default(php_safe(format!("tx_commit failed: {e}"))))?;
274+
Ok(true)
275+
}
276+
277+
pub fn tx_rollback(&self) -> PhpResult<bool> {
278+
self.inner
279+
.tx_rollback()
280+
.map_err(|e| PhpException::default(php_safe(format!("tx_rollback failed: {e}"))))?;
281+
Ok(true)
282+
}
283+
263284
pub fn close(&self) -> PhpResult<bool> {
264285
// If the broker already closed the channel with an error (e.g., NOT_FOUND),
265286
// decide whether to surface it or tolerate close() depending on the op.

0 commit comments

Comments
 (0)