Skip to content

Commit 10c3a3f

Browse files
Acknowledge PubSub event message
1 parent 1da4a70 commit 10c3a3f

File tree

8 files changed

+232
-44
lines changed

8 files changed

+232
-44
lines changed

composer.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@
5252
"laravel": {
5353
"providers": [
5454
"RichanFongdasen\\GCRWorker\\ServiceProvider"
55-
]
55+
],
56+
"aliases": {
57+
"GcrQueue": "RichanFongdasen\\GCRWorker\\Facade\\GcrQueue"
58+
}
5659
}
5760
},
5861
"scripts": {
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?php
2+
3+
namespace RichanFongdasen\GCRWorker\Concerns;
4+
5+
use ErrorException;
6+
use Kainxspirits\PubSubQueue\Connectors\PubSubConnector;
7+
use Kainxspirits\PubSubQueue\PubSubQueue;
8+
9+
trait CreatesPubSubQueue
10+
{
11+
/**
12+
* Pub/Sub connector instance.
13+
*
14+
* @var PubSubConnector
15+
*/
16+
protected $connector;
17+
18+
/**
19+
* The base PubSubQueue instance.
20+
*
21+
* @var PubSubQueue
22+
*/
23+
protected $pubSub;
24+
25+
/**
26+
* Create the PubSubQueue instance.
27+
*
28+
* @throws ErrorException
29+
*
30+
* @return PubSubQueue
31+
*/
32+
private function createQueue(): PubSubQueue
33+
{
34+
$queue = $this->connector->connect(config('queue.connections.pubsub', ['project_id' => 'default-project-id']));
35+
36+
if (!($queue instanceof PubSubQueue)) {
37+
throw new ErrorException('Failed to retrieve PubSubQueue instance.');
38+
}
39+
40+
return $queue;
41+
}
42+
43+
/**
44+
* Retrieve the PubSubConnector instance.
45+
*
46+
* @return PubSubConnector
47+
*/
48+
public function getConnector(): PubSubConnector
49+
{
50+
return $this->connector;
51+
}
52+
53+
/**
54+
* Retrieve the PubSubQueue instance.
55+
*
56+
* @return PubSubQueue
57+
*/
58+
public function getPubSubQueue(): PubSubQueue
59+
{
60+
return $this->pubSub;
61+
}
62+
63+
/**
64+
* Initialize the PubSubQueue instance.
65+
*
66+
* @throws ErrorException
67+
*/
68+
protected function initializeQueue(): void
69+
{
70+
$this->pubSub = $this->createQueue();
71+
}
72+
73+
/**
74+
* Set the PubSubConnector instance.
75+
*
76+
* @param PubSubConnector $connector
77+
*/
78+
public function setConnector(PubSubConnector $connector): void
79+
{
80+
$this->connector = $connector;
81+
}
82+
}

src/Facade/GcrQueue.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?php
2+
3+
namespace RichanFongdasen\GCRWorker\Facade;
4+
5+
use Illuminate\Support\Facades\Facade;
6+
use RichanFongdasen\GCRWorker\GcrPubSubQueue;
7+
use RichanFongdasen\GCRWorker\GcrPubSubQueueFake;
8+
9+
class GcrQueue extends Facade
10+
{
11+
/**
12+
* Replace the bound instance with a fake.
13+
*
14+
* @return GcrPubSubQueueFake
15+
*/
16+
public static function fake(): GcrPubSubQueueFake
17+
{
18+
$fake = app(GcrPubSubQueueFake::class);
19+
static::swap($fake);
20+
21+
return $fake;
22+
}
23+
24+
/**
25+
* Get the registered name of the component.
26+
*
27+
* @return string
28+
*/
29+
protected static function getFacadeAccessor()
30+
{
31+
return GcrPubSubQueue::class;
32+
}
33+
}

src/GcrPubSubQueue.php

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
namespace RichanFongdasen\GCRWorker;
4+
5+
use Google\Cloud\PubSub\Message;
6+
use Kainxspirits\PubSubQueue\Connectors\PubSubConnector;
7+
use RichanFongdasen\GCRWorker\Concerns\CreatesPubSubQueue;
8+
9+
class GcrPubSubQueue
10+
{
11+
use CreatesPubSubQueue;
12+
13+
/**
14+
* GcrPubSubQueue constructor.
15+
*
16+
* @param PubSubConnector $connector
17+
* @throws \ErrorException
18+
*/
19+
public function __construct(PubSubConnector $connector)
20+
{
21+
$this->setConnector($connector);
22+
$this->initializeQueue();
23+
}
24+
25+
/**
26+
* Acknowledge the given message.
27+
*
28+
* @param Message $message
29+
*/
30+
public function acknowledge(Message $message): void
31+
{
32+
$this->pubSub->acknowledge($message);
33+
}
34+
}

src/GcrPubSubQueueFake.php

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
<?php
2+
3+
namespace RichanFongdasen\GCRWorker;
4+
5+
use Google\Cloud\PubSub\Message;
6+
use Illuminate\Support\Collection;
7+
use Kainxspirits\PubSubQueue\Connectors\PubSubConnector;
8+
use PHPUnit\Framework\Assert as PHPUnit;
9+
use RichanFongdasen\GCRWorker\Concerns\CreatesPubSubQueue;
10+
11+
class GcrPubSubQueueFake
12+
{
13+
use CreatesPubSubQueue;
14+
15+
/**
16+
* The collection of acknowledged messages.
17+
*
18+
* @var Collection
19+
*/
20+
protected $acknowledgedMessages;
21+
22+
/**
23+
* GcrPubSubQueue constructor.
24+
*
25+
* @param PubSubConnector $connector
26+
* @throws \ErrorException
27+
*/
28+
public function __construct(PubSubConnector $connector)
29+
{
30+
$this->acknowledgedMessages = collect();
31+
32+
$this->setConnector($connector);
33+
$this->initializeQueue();
34+
}
35+
36+
/**
37+
* Acknowledge the given message.
38+
*
39+
* @param Message $message
40+
*/
41+
public function acknowledge(Message $message): void
42+
{
43+
$this->acknowledgedMessages->put($message->id(), $message);
44+
}
45+
46+
/**
47+
* Assert if the acknowledged messages count equals to the given value.
48+
*
49+
* @param int $count
50+
*/
51+
public function assertAcknowledgedMessagesCount(int $count): void
52+
{
53+
PHPUnit::assertEquals($count, $this->acknowledgedMessages->count());
54+
}
55+
56+
/**
57+
* Assert if the given message id has been acknowledged.
58+
*
59+
* @param string $messageId
60+
*/
61+
public function assertMessageHasAcknowledged(string $messageId): void
62+
{
63+
PHPUnit::assertTrue($this->acknowledgedMessages->has($messageId));
64+
}
65+
}

src/PubSubEvent.php

Lines changed: 6 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,49 +2,28 @@
22

33
namespace RichanFongdasen\GCRWorker;
44

5-
use ErrorException;
65
use Google\Cloud\PubSub\Message;
76
use Illuminate\Container\Container;
8-
use Kainxspirits\PubSubQueue\Connectors\PubSubConnector;
97
use Kainxspirits\PubSubQueue\Jobs\PubSubJob;
10-
use Kainxspirits\PubSubQueue\PubSubQueue;
8+
use RichanFongdasen\GCRWorker\Facade\GcrQueue;
119

1210
class PubSubEvent
1311
{
14-
/**
15-
* Pub/Sub connector instance.
16-
*
17-
* @var PubSubConnector
18-
*/
19-
protected $connector;
20-
2112
/**
2213
* Laravel IOC Container instance.
2314
*
2415
* @var Container
2516
*/
2617
protected $container;
2718

28-
/**
29-
* Pub/Sub queue instance.
30-
*
31-
* @var PubSubQueue
32-
*/
33-
protected $queue;
34-
3519
/**
3620
* PubSubEvent constructor.
3721
*
38-
* @param Container $container
39-
* @param PubSubConnector $connector
40-
*
41-
* @throws ErrorException
22+
* @param Container $container
4223
*/
43-
public function __construct(Container $container, PubSubConnector $connector)
24+
public function __construct(Container $container)
4425
{
4526
$this->container = $container;
46-
$this->connector = $connector;
47-
$this->queue = $this->getQueue();
4827
}
4928

5029
/**
@@ -58,31 +37,13 @@ protected function createJob(Message $message): PubSubJob
5837
{
5938
return new PubSubJob(
6039
$this->container,
61-
$this->queue,
40+
GcrQueue::getPubSubQueue(),
6241
$message,
6342
config('queue.connections.pubsub.driver'),
6443
config('queue.connections.pubsub.queue')
6544
);
6645
}
6746

68-
/**
69-
* Get the PubSubQueue instance.
70-
*
71-
* @throws ErrorException
72-
*
73-
* @return PubSubQueue
74-
*/
75-
protected function getQueue(): PubSubQueue
76-
{
77-
$queue = $this->connector->connect(config('queue.connections.pubsub'));
78-
79-
if (!($queue instanceof PubSubQueue)) {
80-
throw new ErrorException('Failed to retrieve PubSubQueue instance.');
81-
}
82-
83-
return $queue;
84-
}
85-
8647
/**
8748
* Handle the given Pub/Sub event message.
8849
*
@@ -92,6 +53,8 @@ public function handle(Message $message): void
9253
{
9354
set_time_limit(config('gcr-worker.max_execution_time'));
9455

56+
GcrQueue::acknowledge($message);
57+
9558
$this->createJob($message)->fire();
9659
}
9760
}

tests/Feature/PubSubEventHandlingTest.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace RichanFongdasen\GCRWorker\Tests\Feature;
44

55
use Illuminate\Support\Facades\Cache;
6+
use RichanFongdasen\GCRWorker\Facade\GcrQueue;
67
use RichanFongdasen\GCRWorker\Tests\TestCase;
78

89
class PubSubEventHandlingTest extends TestCase
@@ -19,6 +20,8 @@ public function it_will_return_403_on_unauthorized_pubsub_request()
1920
/** @test */
2021
public function it_can_handle_pubsub_invocation_as_expected()
2122
{
23+
GcrQueue::fake();
24+
2225
config(['gcr-worker.allow_event_invocation' => true]);
2326

2427
$data = json_decode(file_get_contents(dirname(__DIR__, 2).'/dummies/message.json'), true);
@@ -29,6 +32,9 @@ public function it_can_handle_pubsub_invocation_as_expected()
2932
->assertStatus(200)
3033
->assertJsonFragment(['info' => 'The Pub/Sub queued job has completed.']);
3134

35+
GcrQueue::assertAcknowledgedMessagesCount(1);
36+
GcrQueue::assertMessageHasAcknowledged('1777817206939726');
37+
3238
self::assertEquals('completed', Cache::get('dummy-job-status'));
3339
}
3440
}

tests/TestCase.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace RichanFongdasen\GCRWorker\Tests;
44

55
use Orchestra\Testbench\TestCase as BaseTest;
6+
use RichanFongdasen\GCRWorker\Facade\GcrQueue;
67

78
abstract class TestCase extends BaseTest
89
{
@@ -46,6 +47,7 @@ protected function getPackageAliases($app): array
4647

4748
return [
4849
'Route' => \Illuminate\Support\Facades\Route::class,
50+
'GcrQueue' => GcrQueue::class,
4951
];
5052
}
5153

0 commit comments

Comments
 (0)