Skip to content

Commit 3ee5817

Browse files
authored
Merge pull request #11 from maksimru/master
@maksimru - properly encoded delayed option arguments (avoid AMQPProtocolChanne… … da17b7f @maksimru - skip bad payload jobs 2f06374 @maksimru - size blank method f14da84 @maksimru - compatibility fix @maksimru - laravel 5.3 update a89d56b @maksimru - support of heartbeat and timeouts 88bda43
2 parents 6f887a5 + 88bda43 commit 3ee5817

File tree

4 files changed

+159
-148
lines changed

4 files changed

+159
-148
lines changed

composer.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
"license": "GPL-2.0",
66
"require": {
77
"php": ">=5.4.0",
8-
"videlalvaro/php-amqplib": "2.4.*"
8+
"php-amqplib/php-amqplib": "2.6.*"
99
},
1010
"require-dev": {
11-
"laravel/framework": ">=4.0.0",
11+
"laravel/framework": ">=5.3.0",
1212
"squizlabs/php_codesniffer": "1.*",
13-
"orchestra/testbench": "3.0.4",
13+
"orchestra/testbench": "3.3.*",
1414
"phpunit/phpunit": "4.3.*"
1515
},
1616
"autoload": {

src/Connectors/AmqpConnector.php

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
use Forumhouse\LaravelAmqp\Queue\AMQPQueue;
66
use Illuminate\Queue\Connectors\ConnectorInterface;
7-
use PhpAmqpLib\Connection\AMQPConnection;
7+
use PhpAmqpLib\Connection\AMQPStreamConnection;
88

99
/**
1010
* Class AmqpConnector
@@ -22,12 +22,21 @@ class AmqpConnector implements ConnectorInterface
2222
*/
2323
public function connect(array $config)
2424
{
25-
$connection = new AMQPConnection(
25+
$connection = new AMQPStreamConnection(
2626
$config['host'],
2727
$config['port'],
2828
$config['user'],
2929
$config['password'],
30-
isset($config['vhost']) ? $config['vhost'] : '/'
30+
isset($config['vhost']) ? $config['vhost'] : '/',
31+
false,
32+
'AMQPLAIN',
33+
null,
34+
'en_US',
35+
isset($config['connection_timeout']) ? $config['connection_timeout'] : 3,
36+
isset($config['read_write_timeout']) ? $config['read_write_timeout'] : 3,
37+
null,
38+
isset($config['keepalive']) ? $config['keepalive'] : false,
39+
isset($config['heartbeat']) ? $config['heartbeat'] : 0
3140
);
3241

3342
if (!isset($config['exchange_type'])) {

src/Jobs/AMQPJob.php

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,7 @@ public function __construct($container, $queue, $channel, $amqpMessage)
4242
$this->amqpMessage = $amqpMessage;
4343
$this->channel = $channel;
4444
}
45-
46-
/**
47-
* Fire the job.
48-
*
49-
* @return void
50-
*/
51-
public function fire()
52-
{
53-
$this->resolveAndFire(json_decode($this->amqpMessage->body, true));
54-
}
55-
45+
5646
/**
5747
* Get the raw body string for the job.
5848
*
@@ -63,27 +53,6 @@ public function getRawBody()
6353
return $this->amqpMessage->body;
6454
}
6555

66-
/**
67-
* Delete the job from the queue.
68-
*
69-
* @return void
70-
*/
71-
public function delete()
72-
{
73-
parent::delete();
74-
$this->channel->basic_ack($this->amqpMessage->delivery_info['delivery_tag']);
75-
}
76-
77-
/**
78-
* Get queue name
79-
*
80-
* @return string
81-
*/
82-
public function getQueue()
83-
{
84-
return $this->queue;
85-
}
86-
8756
/**
8857
* Release the job back into the queue.
8958
*
@@ -115,6 +84,17 @@ public function release($delay = 0)
11584
}
11685
}
11786

87+
/**
88+
* Delete the job from the queue.
89+
*
90+
* @return void
91+
*/
92+
public function delete()
93+
{
94+
parent::delete();
95+
$this->channel->basic_ack($this->amqpMessage->delivery_info['delivery_tag']);
96+
}
97+
11898
/**
11999
* Get the number of times the job has been attempted.
120100
*
@@ -127,6 +107,16 @@ public function attempts()
127107
return isset($body['data']['attempts']) ? $body['data']['attempts'] : 0;
128108
}
129109

110+
/**
111+
* Get queue name
112+
*
113+
* @return string
114+
*/
115+
public function getQueue()
116+
{
117+
return $this->queue;
118+
}
119+
130120
/**
131121
* Get the job identifier.
132122
*

0 commit comments

Comments
 (0)