Skip to content
This repository was archived by the owner on Jul 19, 2024. It is now read-only.

Commit 89dedbe

Browse files
add support for background batching messages (#5)
1 parent 13272fa commit 89dedbe

File tree

6 files changed

+243
-23
lines changed

6 files changed

+243
-23
lines changed

.travis.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ php:
44
- 5.6
55
- 7.0
66
- 7.1
7-
- hhvm
87
- nightly
98

109
before_script:

README.md

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,6 @@ A Google Cloud adapter for the [php-pubsub](https://github.com/Superbalist/php-p
1616
composer require superbalist/php-pubsub-google-cloud
1717
```
1818

19-
## gRPC Support
20-
21-
Google Cloud PHP v0.12.0 added support for communication over the gRPC protocol.
22-
23-
> gRPC is great for high-performance, low-latency applications, and is highly recommended in cases where performance and latency are concerns.
24-
25-
The library will automatically choose gRPC over REST if all dependencies are installed.
26-
* [gRPC PECL extension](https://pecl.php.net/package/gRPC)
27-
* [google/proto-client-php composer package](https://github.com/googleapis/gax-php)
28-
* [googleapis/proto-client-php composer package](https://github.com/googleapis/proto-client-php)
29-
30-
```bash
31-
pecl install grpc
32-
33-
composer require google/gax
34-
composer require google/proto-client-php ^0.10.0
35-
```
36-
3719
## Usage
3820

3921
```php
@@ -66,6 +48,44 @@ $adapter->publish('my_channel', 1);
6648
$adapter->publish('my_channel', false);
6749
```
6850

51+
## gRPC Support
52+
53+
Google Cloud PHP v0.12.0 added support for communication over the gRPC protocol.
54+
55+
> gRPC is great for high-performance, low-latency applications, and is highly recommended in cases where performance and latency are concerns.
56+
57+
The library will automatically choose gRPC over REST if all dependencies are installed.
58+
* [gRPC PECL extension](https://pecl.php.net/package/gRPC)
59+
* [google/proto-client-php composer package](https://github.com/googleapis/gax-php)
60+
* [googleapis/proto-client-php composer package](https://github.com/googleapis/proto-client-php)
61+
62+
```bash
63+
pecl install grpc
64+
65+
composer require google/gax
66+
composer require google/proto-client
67+
```
68+
69+
## Background Batch Message Support
70+
71+
Google Cloud v0.33.0 added support for queueing messages and publishing in the background. This is available in
72+
version 5+ of this package which requires a min version of google/cloud ^0.33.0.
73+
74+
You can enable background batch messaging by setting `$backgroundBatching` to `true` when constructing the
75+
`GoogleCloudPubSubAdapter` or by calling `setBackgroundBatching(true)` on an existing adapter.
76+
77+
If the [semaphore](http://php.net/manual/en/book.sem.php) and [pcntl](http://php.net/manual/en/book.pcntl.php) PHP extensions are
78+
enabled AND the `IS_BATCH_DAEMON_RUNNING` ENV var is set to `true`, the library will queue messages to be published by
79+
the [Batch Daemon](https://github.com/GoogleCloudPlatform/google-cloud-php/blob/master/src/Core/Batch/BatchDaemon.php).
80+
The Batch Daemon needs to be manually run as a long-lived background process.
81+
82+
For all other cases, messages will be queued in memory and will be published before the script terminates using a
83+
vendor registered shutdown handler.
84+
85+
**Please Note**
86+
87+
This is marked by google/cloud as an experimental feature & may change before release in backwards-incompatible ways.
88+
6989
## Examples
7090

7191
The library comes with [examples](examples) for the adapter and a [Dockerfile](Dockerfile) for

changelog.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## 5.0.0 - 2017-07-25
4+
5+
* Add support for using Google Cloud batch requests (aka background daemon)
6+
37
## 4.0.1 - 2017-07-21
48

59
* Allow for google/cloud ^0.29.0|^0.30.0|^0.31.0|^0.32.0|^0.33.0|^0.34.0|^0.35.0

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
"require": {
1212
"php": ">=5.6.0",
1313
"superbalist/php-pubsub": "^2.0",
14-
"google/cloud": "^0.11.0|^0.12.0|^0.13.0|^0.20.0|^0.21.0|^0.22.0|^0.23.0|^0.24.0|^0.25.0|^0.26.0|^0.27.0|^0.28.0|^0.29.0|^0.30.0|^0.31.0|^0.32.0|^0.33.0|^0.34.0|^0.35.0"
14+
"google/cloud": "^0.33.0|^0.34.0|^0.35.0"
1515
},
1616
"autoload": {
1717
"psr-4": {

src/GoogleCloudPubSubAdapter.php

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,22 +29,30 @@ class GoogleCloudPubSubAdapter implements PubSubAdapterInterface
2929
*/
3030
protected $autoCreateSubscriptions;
3131

32+
/**
33+
* @var bool
34+
*/
35+
protected $backgroundBatching;
36+
3237
/**
3338
* @param PubSubClient $client
3439
* @param string $clientIdentifier
3540
* @param bool $autoCreateTopics
3641
* @param bool $autoCreateSubscriptions
42+
* @param bool $backgroundBatching
3743
*/
3844
public function __construct(
3945
PubSubClient $client,
4046
$clientIdentifier = null,
4147
$autoCreateTopics = true,
42-
$autoCreateSubscriptions = true
48+
$autoCreateSubscriptions = true,
49+
$backgroundBatching = false
4350
) {
4451
$this->client = $client;
4552
$this->clientIdentifier = $clientIdentifier;
4653
$this->autoCreateTopics = $autoCreateTopics;
4754
$this->autoCreateSubscriptions = $autoCreateSubscriptions;
55+
$this->backgroundBatching = $backgroundBatching;
4856
}
4957

5058
/**
@@ -123,6 +131,35 @@ public function areSubscriptionsAutoCreated()
123131
return $this->autoCreateSubscriptions;
124132
}
125133

134+
/**
135+
* Set whether or not background batching is enabled.
136+
*
137+
* This is available from Google Cloud 0.33+ - https://github.com/GoogleCloudPlatform/google-cloud-php/releases/tag/v0.33.0
138+
*
139+
* If the http://php.net/manual/en/book.sem.php and http://php.net/manual/en/book.pcntl.php extensions are enabled
140+
* AND the IS_BATCH_DAEMON_RUNNING ENV var is set to true, the library will queue messages to be published by the
141+
* Batch Daemon (https://github.com/GoogleCloudPlatform/google-cloud-php/blob/master/src/Core/Batch/BatchDaemon.php)
142+
*
143+
* For all other cases, messages will be queued in memory and will be published before the script terminates using
144+
* a vendor registered shutdown handler.
145+
*
146+
* @param bool $backgroundBatching
147+
*/
148+
public function setBackgroundBatching($backgroundBatching)
149+
{
150+
$this->backgroundBatching = $backgroundBatching;
151+
}
152+
153+
/**
154+
* Check whether or not background batching is enabled.
155+
*
156+
* @return bool
157+
*/
158+
public function isBackgroundBatchingEnabled()
159+
{
160+
return $this->backgroundBatching;
161+
}
162+
126163
/**
127164
* Subscribe a handler to a channel.
128165
*
@@ -166,7 +203,13 @@ public function subscribe($channel, callable $handler)
166203
public function publish($channel, $message)
167204
{
168205
$topic = $this->getTopicForChannel($channel);
169-
$topic->publish(['data' => Utils::serializeMessage($message)]);
206+
$payload = Utils::serializeMessage($message);
207+
208+
if ($this->backgroundBatching) {
209+
$topic->batchPublisher()->publish(['data' => $payload]);
210+
} else {
211+
$topic->publish(['data' => $payload]);
212+
}
170213
}
171214

172215
/**
@@ -181,7 +224,15 @@ public function publishBatch($channel, array $messages)
181224
$messages = array_map(function ($message) {
182225
return ['data' => Utils::serializeMessage($message)];
183226
}, $messages);
184-
$topic->publishBatch($messages);
227+
228+
if ($this->backgroundBatching) {
229+
$batchPublisher = $topic->batchPublisher();
230+
foreach ($messages as $message) {
231+
$batchPublisher->publish($message);
232+
}
233+
} else {
234+
$topic->publishBatch($messages);
235+
}
185236
}
186237

187238
/**

tests/GoogleCloudPubSubAdapterTest.php

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Tests;
44

5+
use Google\Cloud\PubSub\BatchPublisher;
56
use Google\Cloud\PubSub\Message;
67
use Google\Cloud\PubSub\PubSubClient;
78
use Google\Cloud\PubSub\Subscription;
@@ -49,6 +50,19 @@ public function testGetSetAutoCreateSubscriptions()
4950
$this->assertFalse($adapter->areSubscriptionsAutoCreated());
5051
}
5152

53+
public function testGetSetBackgroundBatching()
54+
{
55+
$client = Mockery::mock(PubSubClient::class);
56+
$adapter = new GoogleCloudPubSubAdapter($client);
57+
$this->assertFalse($adapter->isBackgroundBatchingEnabled());
58+
59+
$adapter->setBackgroundBatching(true);
60+
$this->assertTrue($adapter->isBackgroundBatchingEnabled());
61+
62+
$adapter = new GoogleCloudPubSubAdapter($client, null, true, true, true);
63+
$this->assertTrue($adapter->isBackgroundBatchingEnabled());
64+
}
65+
5266
public function testPublishWhenTopicMustBeCreated()
5367
{
5468
$topic = Mockery::mock(Topic::class);
@@ -74,6 +88,39 @@ public function testPublishWhenTopicMustBeCreated()
7488
$adapter->publish('channel_name', ['hello' => 'world']);
7589
}
7690

91+
public function testPublishWhenTopicMustBeCreatedAndBackgroundBatchingIsEnabled()
92+
{
93+
$batchPublisher = Mockery::mock(BatchPublisher::class);
94+
95+
$topic = Mockery::mock(Topic::class);
96+
$topic->shouldReceive('exists')
97+
->once()
98+
->andReturn(false);
99+
$topic->shouldReceive('create')
100+
->once();
101+
102+
$topic->shouldReceive('batchPublisher')
103+
->once()
104+
->andReturn($batchPublisher);
105+
106+
$batchPublisher->shouldReceive('publish')
107+
->with([
108+
'data' => '{"hello":"world"}',
109+
])
110+
->once();
111+
112+
$client = Mockery::mock(PubSubClient::class);
113+
$client->shouldReceive('topic')
114+
->with('channel_name')
115+
->once()
116+
->andReturn($topic);
117+
118+
$adapter = new GoogleCloudPubSubAdapter($client);
119+
$adapter->setBackgroundBatching(true);
120+
121+
$adapter->publish('channel_name', ['hello' => 'world']);
122+
}
123+
77124
public function testPublishWhenTopicExists()
78125
{
79126
$topic = Mockery::mock(Topic::class);
@@ -98,6 +145,37 @@ public function testPublishWhenTopicExists()
98145
$adapter->publish('channel_name', ['hello' => 'world']);
99146
}
100147

148+
public function testPublishWhenTopicExistsAndBackgroundBatchingIsEnabled()
149+
{
150+
$batchPublisher = Mockery::mock(BatchPublisher::class);
151+
152+
$topic = Mockery::mock(Topic::class);
153+
$topic->shouldReceive('exists')
154+
->once()
155+
->andReturn(true);
156+
$topic->shouldNotHaveReceived('create');
157+
$topic->shouldReceive('batchPublisher')
158+
->once()
159+
->andReturn($batchPublisher);
160+
161+
$batchPublisher->shouldReceive('publish')
162+
->with([
163+
'data' => '{"hello":"world"}',
164+
])
165+
->once();
166+
167+
$client = Mockery::mock(PubSubClient::class);
168+
$client->shouldReceive('topic')
169+
->with('channel_name')
170+
->once()
171+
->andReturn($topic);
172+
173+
$adapter = new GoogleCloudPubSubAdapter($client);
174+
$adapter->setBackgroundBatching(true);
175+
176+
$adapter->publish('channel_name', ['hello' => 'world']);
177+
}
178+
101179
public function testPublishWhenAutoTopicCreationIsDisabled()
102180
{
103181
$topic = Mockery::mock(Topic::class);
@@ -120,6 +198,35 @@ public function testPublishWhenAutoTopicCreationIsDisabled()
120198
$adapter->publish('channel_name', ['hello' => 'world']);
121199
}
122200

201+
public function testPublishWhenAutoTopicCreationIsDisabledAndBackgroundBatchingIsEnabled()
202+
{
203+
$batchPublisher = Mockery::mock(BatchPublisher::class);
204+
205+
$topic = Mockery::mock(Topic::class);
206+
$topic->shouldNotHaveReceived('exists');
207+
$topic->shouldNotHaveReceived('create');
208+
$topic->shouldReceive('batchPublisher')
209+
->once()
210+
->andReturn($batchPublisher);
211+
212+
$batchPublisher->shouldReceive('publish')
213+
->with([
214+
'data' => '{"hello":"world"}',
215+
])
216+
->once();
217+
218+
$client = Mockery::mock(PubSubClient::class);
219+
$client->shouldReceive('topic')
220+
->with('channel_name')
221+
->once()
222+
->andReturn($topic);
223+
224+
$adapter = new GoogleCloudPubSubAdapter($client, null, false);
225+
$adapter->setBackgroundBatching(true);
226+
227+
$adapter->publish('channel_name', ['hello' => 'world']);
228+
}
229+
123230
public function testPublishBatch()
124231
{
125232
$topic = Mockery::mock(Topic::class);
@@ -148,6 +255,45 @@ public function testPublishBatch()
148255
$adapter->publishBatch('channel_name', $messages);
149256
}
150257

258+
public function testPublishBatchWhenBackgroundBatchingIsEnabled()
259+
{
260+
$batchPublisher = Mockery::mock(BatchPublisher::class);
261+
262+
$topic = Mockery::mock(Topic::class);
263+
$topic->shouldReceive('exists')
264+
->once()
265+
->andReturn(true);
266+
$topic->shouldReceive('batchPublisher')
267+
->once()
268+
->andReturn($batchPublisher);
269+
270+
$batchPublisher->shouldReceive('publish')
271+
->with([
272+
'data' => '{"hello":"world"}',
273+
])
274+
->once();
275+
$batchPublisher->shouldReceive('publish')
276+
->with([
277+
'data' => '"booo!"',
278+
])
279+
->once();
280+
281+
$client = Mockery::mock(PubSubClient::class);
282+
$client->shouldReceive('topic')
283+
->with('channel_name')
284+
->once()
285+
->andReturn($topic);
286+
287+
$adapter = new GoogleCloudPubSubAdapter($client);
288+
$adapter->setBackgroundBatching(true);
289+
290+
$messages = [
291+
['hello' => 'world'],
292+
'booo!',
293+
];
294+
$adapter->publishBatch('channel_name', $messages);
295+
}
296+
151297
public function testSubscribeWhenSubscriptionMustBeCreated()
152298
{
153299
$message1 = new Message(['data' => '{"hello":"world"}'], ['ackId' => 1]);

0 commit comments

Comments
 (0)