|
2 | 2 |
|
3 | 3 | namespace Superbalist\PubSub\GoogleCloud; |
4 | 4 |
|
| 5 | +use Google\Cloud\PubSub\Message; |
5 | 6 | use Google\Cloud\PubSub\PubSubClient; |
6 | 7 | use Superbalist\PubSub\PubSubAdapterInterface; |
7 | 8 | use Superbalist\PubSub\Utils; |
@@ -34,8 +35,12 @@ class GoogleCloudPubSubAdapter implements PubSubAdapterInterface |
34 | 35 | * @param bool $autoCreateTopics |
35 | 36 | * @param bool $autoCreateSubscriptions |
36 | 37 | */ |
37 | | - public function __construct(PubSubClient $client, $clientIdentifier = null, $autoCreateTopics = true, $autoCreateSubscriptions = true) |
38 | | - { |
| 38 | + public function __construct( |
| 39 | + PubSubClient $client, |
| 40 | + $clientIdentifier = null, |
| 41 | + $autoCreateTopics = true, |
| 42 | + $autoCreateSubscriptions = true |
| 43 | + ) { |
39 | 44 | $this->client = $client; |
40 | 45 | $this->clientIdentifier = $clientIdentifier; |
41 | 46 | $this->autoCreateTopics = $autoCreateTopics; |
@@ -134,17 +139,17 @@ public function subscribe($channel, callable $handler) |
134 | 139 | $messages = $subscription->pull(); |
135 | 140 |
|
136 | 141 | foreach ($messages as $message) { |
137 | | - // the cloud library base64 encodes messages |
138 | | - $payload = base64_decode($message['message']['data']); |
139 | | - $payload = Utils::unserializeMessagePayload($payload); |
| 142 | + /** @var Message $message */ |
| 143 | + |
| 144 | + $payload = Utils::unserializeMessagePayload($message->data()); |
140 | 145 |
|
141 | 146 | if ($payload === 'unsubscribe') { |
142 | 147 | $isSubscriptionLoopActive = false; |
143 | 148 | } else { |
144 | 149 | call_user_func($handler, $payload); |
145 | 150 | } |
146 | 151 |
|
147 | | - $subscription->acknowledge($message['ackId']); |
| 152 | + $subscription->acknowledge($message); |
148 | 153 | } |
149 | 154 | } |
150 | 155 | } |
|
0 commit comments