Skip to content

Commit 5362bdf

Browse files
committed
SendProcess lock service
1 parent d50d840 commit 5362bdf

File tree

3 files changed

+236
-0
lines changed

3 files changed

+236
-0
lines changed

src/Domain/Messaging/Repository/SendProcessRepository.php

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,75 @@
77
use PhpList\Core\Domain\Common\Repository\AbstractRepository;
88
use PhpList\Core\Domain\Common\Repository\CursorPaginationTrait;
99
use PhpList\Core\Domain\Common\Repository\Interfaces\PaginatableRepositoryInterface;
10+
use PhpList\Core\Domain\Messaging\Model\SendProcess;
1011

1112
class SendProcessRepository extends AbstractRepository implements PaginatableRepositoryInterface
1213
{
1314
use CursorPaginationTrait;
15+
16+
public function deleteByPage(string $page): void
17+
{
18+
$this->createQueryBuilder('sp')
19+
->delete()
20+
->where('sp.page = :page')
21+
->setParameter('page', $page)
22+
->getQuery()
23+
->execute();
24+
}
25+
26+
public function countAliveByPage(string $page): int
27+
{
28+
return (int)$this->createQueryBuilder('sp')
29+
->select('COUNT(sp.id)')
30+
->where('sp.page = :page')
31+
->andWhere('sp.alive > 0')
32+
->setParameter('page', $page)
33+
->getQuery()
34+
->getSingleScalarResult();
35+
}
36+
37+
public function findNewestAlive(string $page): ?SendProcess
38+
{
39+
return $this->createQueryBuilder('sp')
40+
->where('sp.page = :page')
41+
->andWhere('sp.alive > 0')
42+
->setParameter('page', $page)
43+
->orderBy('sp.started', 'DESC')
44+
->setMaxResults(1)
45+
->getQuery()
46+
->getOneOrNullResult();
47+
}
48+
49+
public function markDeadById(int $id): void
50+
{
51+
$this->createQueryBuilder('sp')
52+
->update()
53+
->set('sp.alive', ':zero')
54+
->where('sp.id = :id')
55+
->setParameter('zero', 0)
56+
->setParameter('id', $id)
57+
->getQuery()
58+
->execute();
59+
}
60+
61+
public function incrementAlive(int $id): void
62+
{
63+
$this->createQueryBuilder('sp')
64+
->update()
65+
->set('sp.alive', 'sp.alive + 1')
66+
->where('sp.id = :id')
67+
->setParameter('id', $id)
68+
->getQuery()
69+
->execute();
70+
}
71+
72+
public function getAliveValue(int $id): int
73+
{
74+
return (int)$this->createQueryBuilder('sp')
75+
->select('sp.alive')
76+
->where('sp.id = :id')
77+
->setParameter('id', $id)
78+
->getQuery()
79+
->getSingleScalarResult();
80+
}
1481
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace PhpList\Core\Domain\Messaging\Service;
6+
7+
use PhpList\Core\Domain\Messaging\Repository\SendProcessRepository;
8+
use PhpList\Core\Domain\Messaging\Service\Manager\SendProcessManager;
9+
use Psr\Log\LoggerInterface;
10+
use Symfony\Component\String\UnicodeString;
11+
12+
class LockService
13+
{
14+
public function __construct(
15+
private readonly SendProcessRepository $repo,
16+
private readonly SendProcessManager $manager,
17+
private readonly LoggerInterface $logger,
18+
private readonly int $staleAfterSeconds = 600,
19+
private readonly int $sleepSeconds = 20,
20+
private readonly int $maxWaitCycles = 10
21+
) {}
22+
23+
/**
24+
* Acquire a per-page lock (phpList getPageLock behavior).
25+
*
26+
* @return int|null inserted row id when acquired; null if we gave up
27+
*/
28+
public function acquirePageLock(
29+
string $page,
30+
bool $force = false,
31+
bool $isCli = false,
32+
bool $multiSend = false,
33+
int $maxSendProcesses = 1,
34+
?string $clientIp = null,
35+
): ?int {
36+
$page = $this->sanitizePage($page);
37+
$max = $isCli ? ($multiSend ? max(1, $maxSendProcesses) : 1) : 1;
38+
39+
if ($force) {
40+
$this->logger->info('Force set, killing other send processes (deleting lock rows).');
41+
$this->repo->deleteByPage($page);
42+
}
43+
44+
$waited = 0;
45+
while (true) {
46+
$count = $this->repo->countAliveByPage($page);
47+
$running = $this->manager->findNewestAliveWithAge($page);
48+
49+
if ($count >= $max) {
50+
$age = (int)($running['age'] ?? 0);
51+
52+
if ($age > $this->staleAfterSeconds && isset($running['id'])) {
53+
$this->repo->markDeadById((int)$running['id']);
54+
55+
continue;
56+
}
57+
58+
$this->logger->info(sprintf(
59+
'A process for this page is already running and it was still alive %d seconds ago',
60+
$age
61+
));
62+
63+
if ($isCli) {
64+
$this->logger->info("Running commandline, quitting. We'll find out what to do in the next run.");
65+
return null;
66+
}
67+
68+
$this->logger->info('Sleeping for 20 seconds, aborting will quit');
69+
sleep($this->sleepSeconds);
70+
71+
if (++$waited > $this->maxWaitCycles) {
72+
$this->logger->info('We have been waiting too long, I guess the other process is still going ok');
73+
return null;
74+
}
75+
76+
continue;
77+
}
78+
79+
$processIdentifier = $isCli
80+
? (php_uname('n') ?: 'localhost') . ':' . getmypid()
81+
: ($clientIp ?? '0.0.0.0');
82+
83+
$sendProcess = $this->manager->create($page, $processIdentifier);
84+
85+
return $sendProcess->getId();
86+
}
87+
}
88+
89+
public function keepLock(int $processId): void
90+
{
91+
$this->repo->incrementAlive($processId);
92+
}
93+
94+
public function checkLock(int $processId): int
95+
{
96+
return $this->repo->getAliveValue($processId);
97+
}
98+
99+
public function release(int $processId): void
100+
{
101+
$this->repo->markDeadById($processId);
102+
}
103+
104+
private function sanitizePage(string $page): string
105+
{
106+
$u = new UnicodeString($page);
107+
$clean = preg_replace('/\W/', '', (string)$u);
108+
return $clean === '' ? 'default' : $clean;
109+
}
110+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace PhpList\Core\Domain\Messaging\Service\Manager;
6+
7+
use DateTime;
8+
use Doctrine\ORM\EntityManagerInterface;
9+
use PhpList\Core\Domain\Messaging\Model\SendProcess;
10+
use PhpList\Core\Domain\Messaging\Repository\SendProcessRepository;
11+
12+
class SendProcessManager
13+
{
14+
private SendProcessRepository $repository;
15+
private EntityManagerInterface $entityManager;
16+
17+
public function __construct(SendProcessRepository $repository, EntityManagerInterface $entityManager)
18+
{
19+
$this->repository = $repository;
20+
$this->entityManager = $entityManager;
21+
}
22+
23+
public function create(string $page, string $processIdentifier): SendProcess
24+
{
25+
$sendProcess = new SendProcess();
26+
$sendProcess->setStartedDate(new DateTime('now'));
27+
$sendProcess->setAlive(1);
28+
$sendProcess->setIpaddress($processIdentifier);
29+
$sendProcess->setPage($page);
30+
31+
$this->entityManager->persist($sendProcess);
32+
$this->entityManager->flush();
33+
34+
return $sendProcess;
35+
}
36+
37+
38+
/**
39+
* @return array{id:int, age:int}|null
40+
*/
41+
public function findNewestAliveWithAge(string $page): ?array
42+
{
43+
$row = $this->repository->findNewestAlive($page);
44+
45+
if (!$row instanceof SendProcess) {
46+
return null;
47+
}
48+
49+
$modified = $row->getUpdatedAt();
50+
$age = $modified
51+
? max(0, time() - (int)$modified->format('U'))
52+
: 0;
53+
54+
return [
55+
'id' => $row->getId(),
56+
'age' => $age,
57+
];
58+
}
59+
}

0 commit comments

Comments
 (0)