22
33namespace Superbalist \PubSub \GoogleCloud ;
44
5+ use Exception ;
56use Google \Cloud \PubSub \Message ;
67use Google \Cloud \PubSub \PubSubClient ;
78use Google \Cloud \PubSub \Subscription ;
@@ -50,13 +51,21 @@ class GoogleCloudPubSubAdapter implements PubSubAdapterInterface
5051 */
5152 protected $ returnImmediatelyPause ;
5253
54+ /**
55+ * @var boolean
56+ */
57+ private $ alwaysThrowException ;
58+
5359 /**
5460 * @param PubSubClient $client
5561 * @param string $clientIdentifier
5662 * @param bool $autoCreateTopics
5763 * @param bool $autoCreateSubscriptions
5864 * @param bool $backgroundBatching
5965 * @param int $maxMessages
66+ * @param bool $returnImmediately
67+ * @param int $returnImmediatelyPause
68+ * @param bool $alwaysThrowException
6069 */
6170 public function __construct (
6271 PubSubClient $ client ,
@@ -66,7 +75,8 @@ public function __construct(
6675 $ backgroundBatching = false ,
6776 $ maxMessages = 1000 ,
6877 $ returnImmediately = false ,
69- $ returnImmediatelyPause = 500000
78+ $ returnImmediatelyPause = 500000 ,
79+ $ alwaysThrowException = false
7080 ) {
7181 $ this ->client = $ client ;
7282 $ this ->clientIdentifier = $ clientIdentifier ;
@@ -76,6 +86,7 @@ public function __construct(
7686 $ this ->maxMessages = $ maxMessages ;
7787 $ this ->returnImmediately = $ returnImmediately ;
7888 $ this ->returnImmediatelyPause = (int ) $ returnImmediatelyPause ;
89+ $ this ->alwaysThrowException = $ alwaysThrowException ;
7990 }
8091
8192 /**
@@ -228,11 +239,31 @@ public function setMaxMessages($maxMessages)
228239 $ this ->maxMessages = $ maxMessages ;
229240 }
230241
242+ /**
243+ * Always throw exception from user callback function
244+ * if set to true message will NOT be acked
245+ *
246+ * @param bool alwaysThrowException
247+ */
248+ public function setAlwaysThrowException ($ alwaysThrowException ) {
249+ $ this ->alwaysThrowException = (bool ) $ alwaysThrowException ;
250+ }
251+
252+ /**
253+ * Return the always throw exception property
254+ *
255+ * @return bool
256+ */
257+ public function getAlwaysThrowException () {
258+ return $ this ->alwaysThrowException ;
259+ }
260+
231261 /**
232262 * Subscribe a handler to a channel.
233263 *
234264 * @param string $channel
235265 * @param callable $handler
266+ * @throws Exception
236267 */
237268 public function subscribe ($ channel , callable $ handler )
238269 {
@@ -260,7 +291,13 @@ public function subscribe($channel, callable $handler)
260291 if ($ payload === 'unsubscribe ' ) {
261292 $ isSubscriptionLoopActive = false ;
262293 } else {
263- call_user_func ($ handler , $ payload );
294+ try {
295+ call_user_func ($ handler , $ payload );
296+ } catch (Exception $ e ) {
297+ if ($ this ->alwaysThrowException ) {
298+ throw $ e ;
299+ }
300+ }
264301 }
265302
266303 $ subscription ->acknowledge ($ message );
0 commit comments