99use Illuminate \Queue \InvalidPayloadException ;
1010use Illuminate \Queue \Queue ;
1111use PhpAmqpLib \Channel \AMQPChannel ;
12- use PhpAmqpLib \Connection \AMQPConnection ;
1312use PhpAmqpLib \Connection \AMQPStreamConnection ;
1413use PhpAmqpLib \Message \AMQPMessage ;
1514use PhpAmqpLib \Wire \AMQPTable ;
@@ -30,7 +29,7 @@ class AMQPQueue extends Queue implements QueueContract
3029 const EXCHANGE_TYPE_FANOUT = 'fanout ' ;
3130
3231 /**
33- * @var AMQPConnection Connection to amqp compatible server
32+ * @var AMQPStreamConnection Connection to amqp compatible server
3433 */
3534 protected $ connection ;
3635
@@ -164,7 +163,7 @@ public function getCustomMessageOptions(){
164163 public function push ($ job , $ data = '' , $ queue = null )
165164 {
166165 $ queue = $ this ->prepareQueue ($ queue );
167- $ payload = new AMQPMessage ($ this ->createPayload ($ job , $ data ), $ this ->messageProperties );
166+ $ payload = new AMQPMessage ($ this ->createPayload ($ job , $ queue , $ data ), $ this ->messageProperties );
168167 $ this ->channel ->basic_publish ($ payload , $ this ->exchangeName , $ this ->getRoutingKey ($ queue ));
169168
170169 return true ;
@@ -185,7 +184,7 @@ public function push($job, $data = '', $queue = null)
185184 public function addMessageToBatch ($ job , $ data = '' , $ queue = null )
186185 {
187186 $ queue = $ this ->prepareQueue ($ queue );
188- $ payload = new AMQPMessage ($ this ->createPayload ($ job , $ data ), $ this ->messageProperties );
187+ $ payload = new AMQPMessage ($ this ->createPayload ($ job , $ queue , $ data ), $ this ->messageProperties );
189188 $ this ->channel ->batch_basic_publish ($ payload , $ this ->exchangeName , $ this ->getRoutingKey ($ queue ));
190189
191190 return true ;
@@ -313,7 +312,7 @@ public function later($delay, $job, $data = '', $queue = null)
313312 $ queue = $ this ->prepareQueue ($ queue );
314313 $ delayedQueueName = $ this ->declareDelayedQueue ($ queue , $ delay );
315314
316- $ payload = new AMQPMessage ($ this ->createPayload ($ job , $ data ), $ this ->messageProperties );
315+ $ payload = new AMQPMessage ($ this ->createPayload ($ job , $ queue , $ data ), $ this ->messageProperties );
317316 $ this ->channel ->basic_publish ($ payload , $ this ->exchangeName , $ delayedQueueName );
318317 return true ;
319318 }
@@ -431,15 +430,16 @@ private function prepareQueue($queue)
431430 * Create a payload string from the given job and data.
432431 *
433432 * @param string $job
433+ * @param string $queue
434434 * @param mixed $data
435435 * @return string
436436 *
437437 * @throws \Illuminate\Queue\InvalidPayloadException
438438 */
439- protected function createPayload ($ job , $ data = '' )
439+ protected function createPayload ($ job , $ queue , $ data = '' )
440440 {
441441 $ data = is_array ($ data ) ? array_merge ($ data , $ this ->getCustomMessageOptions ()) : $ this ->getCustomMessageOptions ();
442- $ payload = json_encode ($ this ->createPayloadArray ($ job , $ data ));
442+ $ payload = json_encode ($ this ->createPayloadArray ($ job , $ queue , $ data ));
443443 if (JSON_ERROR_NONE !== json_last_error ()) {
444444 throw new InvalidPayloadException (
445445 'Unable to JSON encode payload. Error code: ' .json_last_error ()
@@ -453,20 +453,28 @@ protected function createPayload($job, $data = '')
453453 * Create a payload for an object-based queue handler.
454454 *
455455 * @param mixed $job
456+ * @param string $queue
456457 * @return array
457458 */
458- protected function createObjectPayload ($ job )
459+ protected function createObjectPayload ($ job, $ queue )
459460 {
460- return [
461+ $ payload = $ this -> withCreatePayloadHooks ( $ queue , [
461462 'displayName ' => $ this ->getDisplayName ($ job ),
462463 'job ' => 'Illuminate\Queue\CallQueuedHandler@call ' ,
463464 'maxTries ' => $ job ->tries ?? null ,
464465 'timeout ' => $ job ->timeout ?? null ,
465466 'timeoutAt ' => $ this ->getJobExpiration ($ job ),
467+ 'data ' => [
468+ 'commandName ' => $ job ,
469+ 'command ' => $ job ,
470+ ],
471+ ]);
472+
473+ return array_merge ($ payload , [
466474 'data ' => array_merge ([
467475 'commandName ' => get_class ($ job ),
468476 'command ' => serialize (clone $ job ),
469477 ],$ this ->getCustomMessageOptions ()),
470- ];
478+ ]) ;
471479 }
472480}
0 commit comments