Skip to content

Commit e2664e8

Browse files
committed
minor symfony#54606 [Messenger] Messenger: avoid DELETE statement flood when using Doctrine+MySQL backend (giosh94mhz)
This PR was submitted for the 5.4 branch but it was merged into the 7.1 branch instead. Discussion ---------- [Messenger] Messenger: avoid DELETE statement flood when using Doctrine+MySQL backend | Q | A | ------------- | --- | Branch | 7.1 | Bug fix? | no | New feature? | no | Deprecations? | no | Issues | | License | MIT ### The issue I found an issue with the Doctrine bridge of Messenger when using MySQL. In version 4.4 (commit 12271a4 ) the MySQL removal of acknowledged/rejected was implemented by updating the queued message ID with a fake date (i.e. 9999-12-31) and then removing the messages later. The problem is that this DELETE statement is done every time a connection is checked for job, which usually happens every seconds (in my case, even multiple time per seconds since I use multiple consumers and queues). Since I use MySQL binary logging and replica, a quick count of recent logged queries highlighted the issue (it's a quick `cat binlog | sort | uniq -c | sort -n`): ``` 30 UPDATE my_job SET delivered_at = 'NNNN-NN-NN NN:NN:NN' WHERE id = 'NNNNNNN' 3621 DELETE FROM my_job WHERE delivered_at = 'NNNN-NN-NN NN:NN:NN' ``` As you can see, for only 30 messages handles, there were 3600 (1 hour) DELETE statements. ### The fix I've added a private variable to track if an update has been made, as so if a DELETE is required. This effectively stops the flood. I kept the default to `false` to reduce this value to minimum, and after verifying that all invalid records are not queried thanks to `createAvailableMessagesQueryBuilder`. ### Unit tests There's no regression on actual tests, but I haven't found a specific test to update for this. I'm not sure how this could be implemented, so if needed I waiting for hints! Commits ------- 662c8e4 Messenger: avoid DELETE statement flood when using Doctrine+MySQL backend
2 parents e6c875e + 662c8e4 commit e2664e8

File tree

1 file changed

+14
-3
lines changed
  • src/Symfony/Component/Messenger/Bridge/Doctrine/Transport

1 file changed

+14
-3
lines changed

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class Connection implements ResetInterface
5252
protected ?float $queueEmptiedAt = null;
5353

5454
private bool $autoSetup;
55+
private bool $doMysqlCleanup = false;
5556

5657
/**
5758
* Constructor.
@@ -75,6 +76,7 @@ public function __construct(
7576
public function reset(): void
7677
{
7778
$this->queueEmptiedAt = null;
79+
$this->doMysqlCleanup = false;
7880
}
7981

8082
public function getConfiguration(): array
@@ -152,9 +154,10 @@ public function send(string $body, array $headers, int $delay = 0): string
152154

153155
public function get(): ?array
154156
{
155-
if ($this->driverConnection->getDatabasePlatform() instanceof AbstractMySQLPlatform) {
157+
if ($this->doMysqlCleanup && $this->driverConnection->getDatabasePlatform() instanceof AbstractMySQLPlatform) {
156158
try {
157159
$this->driverConnection->delete($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59']);
160+
$this->doMysqlCleanup = false;
158161
} catch (DriverException $e) {
159162
// Ignore the exception
160163
} catch (TableNotFoundException $e) {
@@ -252,7 +255,11 @@ public function ack(string $id): bool
252255
{
253256
try {
254257
if ($this->driverConnection->getDatabasePlatform() instanceof AbstractMySQLPlatform) {
255-
return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0;
258+
if ($updated = $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0) {
259+
$this->doMysqlCleanup = true;
260+
}
261+
262+
return $updated;
256263
}
257264

258265
return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
@@ -265,7 +272,11 @@ public function reject(string $id): bool
265272
{
266273
try {
267274
if ($this->driverConnection->getDatabasePlatform() instanceof AbstractMySQLPlatform) {
268-
return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0;
275+
if ($updated = $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0) {
276+
$this->doMysqlCleanup = true;
277+
}
278+
279+
return $updated;
269280
}
270281

271282
return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;

0 commit comments

Comments
 (0)