Skip to content

Commit d2b1ce2

Browse files
Pull fresh PubSub message from subscription to get a message with ackId
1 parent fa87035 commit d2b1ce2

File tree

3 files changed

+42
-1
lines changed

3 files changed

+42
-1
lines changed

src/GcrPubSubQueue.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace RichanFongdasen\GCRWorker;
44

5+
use ErrorException;
56
use Google\Cloud\PubSub\Message;
67
use Kainxspirits\PubSubQueue\Connectors\PubSubConnector;
78
use RichanFongdasen\GCRWorker\Concerns\CreatesPubSubQueue;
@@ -32,4 +33,30 @@ public function acknowledge(Message $message): void
3233
{
3334
$this->pubSub->acknowledge($message);
3435
}
36+
37+
/**
38+
* Pull a specific PubSub message from PubSub topic specified by the given message id.
39+
*
40+
* @param Message $original
41+
* @return Message
42+
* @throws ErrorException
43+
*/
44+
public function pullFreshMessage(Message $original): Message
45+
{
46+
$topic = $this->pubSub->getTopic($this->pubSub->getQueue(null));
47+
$subscription = $topic->subscription($this->pubSub->getSubscriberName());
48+
49+
$messages = $subscription->pull(['returnImmediately' => true, 'maxMessages' => 10]);
50+
51+
foreach ($messages as $message) {
52+
if (!($message instanceof Message)) {
53+
continue;
54+
}
55+
if ($message->id() === $original->id()) {
56+
return $message;
57+
}
58+
}
59+
60+
throw new ErrorException(sprintf('Failed to pull a PubSub message with id "%s"', $original->id()));
61+
}
3562
}

src/GcrPubSubQueueFake.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,15 @@ public function assertMessageHasAcknowledged(string $messageId): void
6363
{
6464
PHPUnit::assertTrue($this->acknowledgedMessages->has($messageId));
6565
}
66+
67+
/**
68+
* Pull a specific PubSub message from PubSub topic specified by the given message id.
69+
*
70+
* @param Message $original
71+
* @return Message
72+
*/
73+
public function pullFreshMessage(Message $original): Message
74+
{
75+
return $original;
76+
}
6677
}

src/Requests/PubSubEventRequest.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Google\Cloud\PubSub\Message;
66
use Illuminate\Foundation\Http\FormRequest;
7+
use RichanFongdasen\GCRWorker\Facade\GcrQueue;
78

89
class PubSubEventRequest extends FormRequest
910
{
@@ -27,7 +28,9 @@ public function getPubSubMessage(): Message
2728
$requestData = $this->all();
2829
$requestData['message']['data'] = base64_decode($requestData['message']['data']);
2930

30-
return new Message($requestData['message']);
31+
$message = new Message($requestData['message']);
32+
33+
return GcrQueue::pullFreshMessage($message);
3134
}
3235

3336
/**

0 commit comments

Comments
 (0)