Skip to content

Commit bde2db2

Browse files
committed
array-based solution for the attempts-bug (i.e. attempts-counter is disappearing on failed-jobs)
1 parent 4715acb commit bde2db2

File tree

2 files changed

+523
-502
lines changed

2 files changed

+523
-502
lines changed

src/Jobs/AMQPJob.php

Lines changed: 110 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -15,116 +15,114 @@
1515
*/
1616
class AMQPJob extends Job implements \Illuminate\Contracts\Queue\Job
1717
{
18-
/**
19-
* @var string
20-
*/
21-
protected $queue;
22-
23-
/**
24-
* @var AMQPMessage
25-
*/
26-
protected $amqpMessage;
27-
/**
28-
* @var AMQPChannel
29-
*/
30-
private $channel;
31-
32-
/**
33-
* @param Container $container
34-
* @param string $queue
35-
* @param $channel
36-
* @param string $amqpMessage
37-
*/
38-
public function __construct($container, $queue, $channel, $amqpMessage)
39-
{
40-
$this->container = $container;
41-
$this->queue = $queue;
42-
$this->amqpMessage = $amqpMessage;
43-
$this->channel = $channel;
44-
}
45-
46-
/**
47-
* Get the raw body string for the job.
48-
*
49-
* @return string
50-
*/
51-
public function getRawBody()
52-
{
53-
return $this->amqpMessage->body;
54-
}
55-
56-
/**
57-
* Release the job back into the queue.
58-
*
59-
* @param int $delay
60-
*
61-
* @return void
62-
*/
63-
public function release($delay = 0)
64-
{
65-
$this->delete();
66-
67-
$body = $this->amqpMessage->body;
68-
$body = json_decode($body, true);
69-
70-
$attempts = $this->attempts();
71-
72-
// write attempts to body
73-
$body['data']['attempts'] = $attempts + 1;
74-
75-
$job = $body['job'];
76-
$data = $body['data'];
77-
78-
/** @var QueueContract $queue */
79-
$queue = $this->container['queue']->connection();
80-
if ($delay > 0) {
81-
$queue->later($delay, $job, $data, $this->getQueue());
82-
} else {
83-
$queue->push($job, $data, $this->getQueue());
84-
}
85-
}
86-
87-
/**
88-
* Delete the job from the queue.
89-
*
90-
* @return void
91-
*/
92-
public function delete()
93-
{
94-
parent::delete();
95-
$this->channel->basic_ack($this->amqpMessage->delivery_info['delivery_tag']);
96-
}
97-
98-
/**
99-
* Get the number of times the job has been attempted.
100-
*
101-
* @return int
102-
*/
103-
public function attempts()
104-
{
105-
$body = json_decode($this->amqpMessage->body, true);
106-
107-
return isset($body['data']['attempts']) ? $body['data']['attempts'] : 0;
108-
}
109-
110-
/**
111-
* Get queue name
112-
*
113-
* @return string
114-
*/
115-
public function getQueue()
116-
{
117-
return $this->queue;
118-
}
119-
120-
/**
121-
* Get the job identifier.
122-
*
123-
* @return string
124-
* @throws \OutOfBoundsException
125-
*/
126-
public function getJobId()
127-
{
128-
return $this->amqpMessage->get('message_id');
129-
}
18+
/**
19+
* @var string
20+
*/
21+
protected $queue;
22+
23+
/**
24+
* @var AMQPMessage
25+
*/
26+
protected $amqpMessage;
27+
/**
28+
* @var AMQPChannel
29+
*/
30+
private $channel;
31+
32+
/**
33+
* @param Container $container
34+
* @param string $queue
35+
* @param $channel
36+
* @param string $amqpMessage
37+
*/
38+
public function __construct($container, $queue, $channel, $amqpMessage)
39+
{
40+
$this->container = $container;
41+
$this->queue = $queue;
42+
$this->amqpMessage = $amqpMessage;
43+
$this->channel = $channel;
44+
}
45+
46+
/**
47+
* Get the raw body string for the job.
48+
*
49+
* @return string
50+
*/
51+
public function getRawBody()
52+
{
53+
return $this->amqpMessage->body;
54+
}
55+
56+
/**
57+
* Release the job back into the queue.
58+
*
59+
* @param int $delay
60+
*
61+
* @return void
62+
*/
63+
public function release($delay = 0)
64+
{
65+
$this->delete();
66+
67+
$body = $this->amqpMessage->body;
68+
$body = json_decode($body, true);
69+
70+
$body['attempts'] = $this->attempts() + 1;
71+
$job = $body['job'];
72+
73+
/** @var QueueContract $queue */
74+
$queue = $this->container['queue']->connection();
75+
if ($delay > 0) {
76+
$queue->later($delay, $job, $body, $this->getQueue());
77+
} else {
78+
$queue->push($job, $body, $this->getQueue());
79+
}
80+
81+
// TODO: IS THIS NECESSARY?
82+
parent::release();
83+
}
84+
85+
/**
86+
* Delete the job from the queue.
87+
*
88+
* @return void
89+
*/
90+
public function delete()
91+
{
92+
parent::delete();
93+
$this->channel->basic_ack($this->amqpMessage->delivery_info['delivery_tag']);
94+
}
95+
96+
/**
97+
* Get the number of times the job has been attempted.
98+
*
99+
* @return int
100+
*/
101+
public function attempts()
102+
{
103+
$body = json_decode($this->amqpMessage->body, true);
104+
105+
return isset($body['attempts']) ? $body['attempts'] : 0;
106+
}
107+
108+
/**
109+
* Get queue name
110+
*
111+
* @return string
112+
*/
113+
public function getQueue()
114+
{
115+
return $this->queue;
116+
}
117+
118+
/**
119+
* Get the job identifier.
120+
*
121+
* @return string
122+
* @throws \OutOfBoundsException
123+
*/
124+
public function getJobId()
125+
{
126+
return $this->amqpMessage->get('message_id');
127+
}
130128
}

0 commit comments

Comments
 (0)