Skip to content

Commit 5e69dd9

Browse files
author
fedor.f
committed
publish message with custom queue options : flags | attributes
1 parent 5cd846e commit 5e69dd9

File tree

2 files changed

+22
-1
lines changed

2 files changed

+22
-1
lines changed

Tests/Transport/AmqpExt/ConnectionTest.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,24 @@ public function testItCanDisableTheSetup()
233233
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto-setup=false', [], true, $factory);
234234
$connection->publish('body');
235235
}
236+
237+
public function testPublishWithQueueOptions()
238+
{
239+
$factory = new TestAmqpFactory(
240+
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
241+
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
242+
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
243+
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
244+
);
245+
$headers = [
246+
'type' => '*',
247+
];
248+
$amqpExchange->expects($this->once())->method('publish')
249+
->with('body', null, 1, ['delivery_mode' => 2, 'headers' => $headers]);
250+
251+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[flags]=1', [], true, $factory);
252+
$connection->publish('body', $headers);
253+
}
236254
}
237255

238256
class TestAmqpFactory extends AmqpFactory

Transport/AmqpExt/Connection.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,10 @@ public function publish(string $body, array $headers = []): void
133133
$this->setup();
134134
}
135135

136-
$this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, AMQP_NOPARAM, ['headers' => $headers]);
136+
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
137+
$attributes = array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]);
138+
139+
$this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, $flags, $attributes);
137140
}
138141

139142
/**

0 commit comments

Comments
 (0)