Skip to content

Commit 52aeff1

Browse files
ISSUE #1675: Postpone webhooks process when other process is importing data
1 parent 2db5779 commit 52aeff1

File tree

6 files changed

+57
-14
lines changed

6 files changed

+57
-14
lines changed

composer.lock

Lines changed: 11 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/reference.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,6 +1098,17 @@
10981098
* doctrine?: DoctrineConfig,
10991099
* doctrine_migrations?: DoctrineMigrationsConfig,
11001100
* twig?: TwigConfig,
1101+
* "when@dev"?: array{
1102+
* imports?: ImportsConfig,
1103+
* parameters?: ParametersConfig,
1104+
* services?: ServicesConfig,
1105+
* framework?: FrameworkConfig,
1106+
* flysystem?: FlysystemConfig,
1107+
* monolog?: MonologConfig,
1108+
* doctrine?: DoctrineConfig,
1109+
* doctrine_migrations?: DoctrineMigrationsConfig,
1110+
* twig?: TwigConfig,
1111+
* },
11011112
* "when@prod"?: array{
11021113
* imports?: ImportsConfig,
11031114
* parameters?: ParametersConfig,
@@ -1200,6 +1211,7 @@ public static function config(array $config): array
12001211
* deprecated?: array{package:string, version:string, message?:string},
12011212
* }
12021213
* @psalm-type RoutesConfig = array{
1214+
* "when@dev"?: array<string, RouteConfig|ImportConfig|AliasConfig>,
12031215
* "when@prod"?: array<string, RouteConfig|ImportConfig|AliasConfig>,
12041216
* "when@test"?: array<string, RouteConfig|ImportConfig|AliasConfig>,
12051217
* ...<string, RouteConfig|ImportConfig|AliasConfig>

src/Application/importDataAndBuildAppCronAction.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use App\Domain\Strava\Webhook\WebhookEventRepository;
1515
use App\Infrastructure\CQRS\Command\Bus\CommandBus;
1616
use App\Infrastructure\Daemon\Cron\RunnableCronAction;
17+
use App\Infrastructure\Daemon\Mutex\LockIsAlreadyAcquired;
1718
use App\Infrastructure\Daemon\Mutex\LockName;
1819
use App\Infrastructure\Daemon\Mutex\Mutex;
1920
use App\Infrastructure\DependencyInjection\Mutex\WithMutex;
@@ -66,7 +67,12 @@ public function run(SymfonyStyle $output): void
6667
public function runForWebhooks(SymfonyStyle $output): void
6768
{
6869
$this->migrationRunner->run($output);
69-
$this->mutex->acquireLock('importDataAndBuildAppCronAction');
70+
try {
71+
$this->mutex->acquireLock('processWebhooks');
72+
} catch (LockIsAlreadyAcquired) {
73+
// Another process is importing data, postpone webhook processing.
74+
return;
75+
}
7076

7177
if (!$webhookEvents = $this->webhookEventRepository->grab()) {
7278
// No webhooks to process.

src/Console/DetectCorruptedActivitiesConsoleCommand.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public function __construct(
5252
protected function execute(InputInterface $input, OutputInterface $output): int
5353
{
5454
$output = new SymfonyStyle($input, $output);
55-
$this->mutex->acquireLock('BuildAppConsoleCommand');
55+
$this->mutex->acquireLock('DetectCorruptedActivitiesConsoleCommand');
5656

5757
$this->outputConsoleIntro($output);
5858

src/Infrastructure/Daemon/Mutex/Mutex.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public function acquireLock(string $lockAcquiredBy): void
4949
}
5050

5151
$data = Json::decode($row);
52-
$heartbeat = $data[self::HEARTBEAT_KEY];
52+
$heartbeat = $data[self::HEARTBEAT_KEY] ?? null;
5353
$isStale = $now - $heartbeat > self::STALE_THRESHOLD_IN_SECONDS;
5454

5555
if ($isStale) {

tests/Application/importDataAndBuildAppCronActionTest.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,31 @@ public function testRunForWebhooks(): void
8282
$this->assertMatchesJsonSnapshot(Json::encode($this->commandBus->getDispatchedCommands()));
8383
}
8484

85+
public function testRunForWebhooksWhenLockIsAlreadyAcquired(): void
86+
{
87+
$output = new SpySymfonyStyleOutput(new StringInput('input'), new NullOutput());
88+
89+
$event = WebhookEvent::create(
90+
objectId: '1',
91+
objectType: 'activity',
92+
aspectType: WebhookAspectType::CREATE,
93+
payload: [],
94+
);
95+
$this->getContainer()->get(WebhookEventRepository::class)->add($event);
96+
97+
$this->getConnection()->executeStatement(
98+
'INSERT INTO KeyValue (`key`, `value`) VALUES (:key, :value)',
99+
['key' => 'lock.importDataOrBuildApp', 'value' => '{"lockAcquiredBy": "test", "heartbeat": 1764806400}']
100+
);
101+
102+
$this->migrationRunner
103+
->expects($this->once())
104+
->method('run');
105+
106+
$this->importAndBuildAppCronAction->runForWebhooks($output);
107+
$this->assertEmpty($this->commandBus->getDispatchedCommands());
108+
}
109+
85110
#[\Override]
86111
protected function setUp(): void
87112
{

0 commit comments

Comments
 (0)