@@ -26,19 +26,22 @@ public function __construct(CloudTasksClient $client, Request $request, OpenIdVe
2626 */
2727 public function handle ($ task = null )
2828 {
29- $ this ->authorizeRequest ();
30-
3129 $ task = $ task ?: $ this ->captureTask ();
3230
33- $ this ->listenForEvents ();
31+ $ command = unserialize ($ task ['data ' ]['command ' ]);
32+ $ connection = $ command ->connection ?? 'cloudtasks ' ;
33+
34+ $ this ->authorizeRequest ($ connection );
35+
36+ $ this ->listenForEvents ($ connection );
3437
35- $ this ->handleTask ($ task );
38+ $ this ->handleTask ($ connection , $ task );
3639 }
3740
3841 /**
3942 * @throws CloudTasksException
4043 */
41- public function authorizeRequest ()
44+ public function authorizeRequest ($ connection )
4245 {
4346 if (!$ this ->request ->hasHeader ('Authorization ' )) {
4447 throw new CloudTasksException ('Missing [Authorization] header ' );
@@ -49,7 +52,7 @@ public function authorizeRequest()
4952
5053 $ decodedToken = $ this ->publicKey ->decodeOpenIdToken ($ openIdToken , $ kid );
5154
52- $ this ->validateToken ($ decodedToken );
55+ $ this ->validateToken ($ connection , $ decodedToken );
5356 }
5457
5558 /**
@@ -58,13 +61,13 @@ public function authorizeRequest()
5861 * @param $openIdToken
5962 * @throws CloudTasksException
6063 */
61- protected function validateToken ($ openIdToken )
64+ protected function validateToken ($ connection , $ openIdToken )
6265 {
6366 if (!in_array ($ openIdToken ->iss , ['https://accounts.google.com ' , 'accounts.google.com ' ])) {
6467 throw new CloudTasksException ('The given OpenID token is not valid ' );
6568 }
6669
67- if ($ openIdToken ->aud != Config::handler ()) {
70+ if ($ openIdToken ->aud != Config::handler ($ connection )) {
6871 throw new CloudTasksException ('The given OpenID token is not valid ' );
6972 }
7073
@@ -93,11 +96,11 @@ private function captureTask()
9396 return $ task ;
9497 }
9598
96- private function listenForEvents ()
99+ private function listenForEvents ($ connection )
97100 {
98- app ('events ' )->listen (JobFailed::class, function ($ event ) {
101+ app ('events ' )->listen (JobFailed::class, function ($ event ) use ( $ connection ) {
99102 app ('queue.failer ' )->log (
100- ' cloudtasks ' , $ event ->job ->getQueue (),
103+ $ connection , $ event ->job ->getQueue (),
101104 $ event ->job ->getRawBody (), $ event ->exception
102105 );
103106 });
@@ -107,24 +110,24 @@ private function listenForEvents()
107110 * @param $task
108111 * @throws CloudTasksException
109112 */
110- private function handleTask ($ task )
113+ private function handleTask ($ connection , $ task )
111114 {
112115 $ job = new CloudTasksJob ($ task );
113116
114117 $ job ->setAttempts (request ()->header ('X-CloudTasks-TaskRetryCount ' ) + 1 );
115118 $ job ->setQueue (request ()->header ('X-Cloudtasks-Queuename ' ));
116- $ job ->setMaxTries ($ this ->getQueueMaxTries ($ job ));
119+ $ job ->setMaxTries ($ this ->getQueueMaxTries ($ connection , $ job ));
117120
118121 $ worker = $ this ->getQueueWorker ();
119122
120- $ worker ->process (' cloudtasks ' , $ job , new WorkerOptions ());
123+ $ worker ->process ($ connection , $ job , new WorkerOptions ());
121124 }
122125
123- private function getQueueMaxTries (CloudTasksJob $ job )
126+ private function getQueueMaxTries ($ connection , CloudTasksJob $ job )
124127 {
125128 $ queueName = $ this ->client ->queueName (
126- Config::project (),
127- Config::location (),
129+ Config::project ($ connection ),
130+ Config::location ($ connection ),
128131 $ job ->getQueue ()
129132 );
130133
0 commit comments