Skip to content
This repository was archived by the owner on Jul 19, 2024. It is now read-only.

Commit 7201f7e

Browse files
ack messages individually after callable returns successfully
1 parent cdb487d commit 7201f7e

File tree

3 files changed

+24
-23
lines changed

3 files changed

+24
-23
lines changed

changelog.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## 1.0.1 - ?
4+
5+
* Ack messages individually after callable returns successfully
6+
7+
* Initial release
8+
39
## 1.0.0 - 2016-09-05
410

511
* Initial release

src/GoogleCloudPubSubAdapter.php

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,31 +44,20 @@ public function subscribe($channel, callable $handler)
4444
$isSubscriptionLoopActive = true;
4545

4646
while ($isSubscriptionLoopActive) {
47-
$ackIds = [];
48-
$payloads = [];
49-
5047
$messages = $subscription->pull();
5148

5249
foreach ($messages as $message) {
53-
$ackIds[] = $message['ackId'];
54-
5550
// the cloud library base64 encodes messages
5651
$payload = base64_decode($message['message']['data']);
5752
$payload = Utils::unserializeMessagePayload($payload);
5853

5954
if ($payload === 'unsubscribe') {
6055
$isSubscriptionLoopActive = false;
6156
} else {
62-
$payloads[] = $payload;
57+
call_user_func($handler, $payload);
6358
}
64-
}
65-
66-
if (!empty($ackIds)) {
67-
$subscription->acknowledgeBatch($ackIds);
68-
}
6959

70-
foreach ($payloads as $payload) {
71-
call_user_func($handler, $payload);
60+
$subscription->acknowledge($message['ackId']);
7261
}
7362
}
7463
}

tests/GoogleCloudPubSubAdapterTest.php

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,17 @@ public function testSubscribeWhenSubscriptionMustBeCreated()
101101
$subscription->shouldReceive('pull')
102102
->once()
103103
->andReturn($messageBatch1);
104+
$subscription->shouldReceive('acknowledge')
105+
->with(1)
106+
->once();
107+
$subscription->shouldReceive('acknowledge')
108+
->with(2)
109+
->once();
104110
$subscription->shouldReceive('pull')
105111
->once()
106112
->andReturn($messageBatch2);
107-
$subscription->shouldReceive('acknowledgeBatch')
108-
->with([1, 2])
109-
->once();
110-
$subscription->shouldReceive('acknowledgeBatch')
111-
->with([3])
113+
$subscription->shouldReceive('acknowledge')
114+
->with(3)
112115
->once();
113116

114117
$topic = Mockery::mock(Topic::class);
@@ -173,14 +176,17 @@ public function testSubscribeWhenSubscriptionExists()
173176
$subscription->shouldReceive('pull')
174177
->once()
175178
->andReturn($messageBatch1);
179+
$subscription->shouldReceive('acknowledge')
180+
->with(1)
181+
->once();
182+
$subscription->shouldReceive('acknowledge')
183+
->with(2)
184+
->once();
176185
$subscription->shouldReceive('pull')
177186
->once()
178187
->andReturn($messageBatch2);
179-
$subscription->shouldReceive('acknowledgeBatch')
180-
->with([1, 2])
181-
->once();
182-
$subscription->shouldReceive('acknowledgeBatch')
183-
->with([3])
188+
$subscription->shouldReceive('acknowledge')
189+
->with(3)
184190
->once();
185191

186192
$topic = Mockery::mock(Topic::class);

0 commit comments

Comments
 (0)