Skip to content

Commit 21e5a71

Browse files
authored
Handle signals (pcntl) when projecting (#585)
* reusable signal handler scope * reusable signal handler scope * global signal termination handler * fix tests * fix tests * review * review * test message * introduce TerminationListener interface
1 parent d753c90 commit 21e5a71

File tree

11 files changed

+233
-108
lines changed

11 files changed

+233
-108
lines changed

packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/BasicMessagingModule.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
use Ecotone\Messaging\Conversion\UuidToString\UuidToStringConverter;
2929
use Ecotone\Messaging\Endpoint\ChannelAdapterConsumerBuilder;
3030
use Ecotone\Messaging\Endpoint\EventDriven\EventDrivenConsumerBuilder;
31+
use Ecotone\Messaging\Endpoint\Interceptor\TerminationListener;
32+
use Ecotone\Messaging\Endpoint\Interceptor\PcntlTerminationListener;
3133
use Ecotone\Messaging\Endpoint\PollingConsumer\PollingConsumerBuilder;
3234
use Ecotone\Messaging\Endpoint\PollingMetadata;
3335
use Ecotone\Messaging\Gateway\ConsoleCommandRunner;
@@ -225,6 +227,12 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
225227

226228
$messagingConfiguration->registerServiceDefinition(PollingMetadataConverter::class, new Definition(PollingMetadataConverter::class));
227229

230+
$messagingConfiguration->registerServiceDefinition(
231+
PcntlTerminationListener::class,
232+
new Definition(PcntlTerminationListener::class)
233+
);
234+
$messagingConfiguration->registerServiceDefinition(TerminationListener::class, new Reference(PcntlTerminationListener::class));
235+
228236
$messagingConfiguration->registerServiceDefinition(LicenceDecider::class, new Definition(LicenceDecider::class, [$messagingConfiguration->isRunningForEnterpriseLicence()]));
229237
}
230238

packages/Ecotone/src/Messaging/Endpoint/InterceptedChannelAdapterBuilder.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Ecotone\Messaging\Config\Container\MessagingContainerBuilder;
1212
use Ecotone\Messaging\Config\Container\PollingMetadataReference;
1313
use Ecotone\Messaging\Config\Container\Reference;
14+
use Ecotone\Messaging\Endpoint\Interceptor\PcntlTerminationListener;
1415
use Ecotone\Messaging\Endpoint\PollingConsumer\InterceptedConsumerRunner;
1516
use Ecotone\Messaging\Endpoint\PollingConsumer\PollingConsumerErrorChannelInterceptor;
1617
use Ecotone\Messaging\Gateway\MessagingEntrypoint;
@@ -66,6 +67,7 @@ public function registerConsumer(MessagingContainerBuilder $builder): void
6667
$messagePoller,
6768
new PollingMetadataReference($this->endpointId),
6869
new Reference(EcotoneClockInterface::class),
70+
new Reference(PcntlTerminationListener::class),
6971
new Reference(LoggingGateway::class),
7072
new Reference(MessagingEntrypoint::class),
7173
new Reference(ExpressionEvaluationService::REFERENCE),
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ecotone\Messaging\Endpoint\Interceptor;
6+
7+
/**
8+
* Global termination signal service that maintains a single termination flag
9+
* and handles signal registration for SIGINT, SIGTERM, and SIGQUIT.
10+
*
11+
* licence Apache-2.0
12+
*/
13+
class PcntlTerminationListener implements TerminationListener
14+
{
15+
private bool $terminationRequested = false;
16+
private bool $enabled = false;
17+
18+
/**
19+
* @var array<int, callable|int|string>
20+
*/
21+
private array $originalHandlers = [];
22+
23+
public function __destruct()
24+
{
25+
$this->disable();
26+
}
27+
28+
/**
29+
* Enable signal handling by registering handlers for termination signals.
30+
* If already enabled, resets the termination flag.
31+
*/
32+
public function enable(): void
33+
{
34+
if (! extension_loaded('pcntl')) {
35+
return;
36+
}
37+
38+
if ($this->enabled) {
39+
$this->terminationRequested = false;
40+
return;
41+
}
42+
43+
$this->enabled = true;
44+
$this->terminationRequested = false;
45+
46+
// Store original handlers
47+
foreach ([SIGINT, SIGTERM, SIGQUIT] as $signal) {
48+
$this->originalHandlers[$signal] = pcntl_signal_get_handler($signal);
49+
pcntl_signal($signal, fn (int $signal) => $this->terminationRequested = true);
50+
}
51+
52+
// Enable async signals
53+
pcntl_async_signals(true);
54+
}
55+
56+
/**
57+
* Disable signal handling by restoring original handlers.
58+
*/
59+
public function disable(): void
60+
{
61+
if (! extension_loaded('pcntl')) {
62+
return;
63+
}
64+
65+
if (! $this->enabled) {
66+
return;
67+
}
68+
69+
// Restore original handlers
70+
foreach ($this->originalHandlers as $signal => $handler) {
71+
pcntl_signal($signal, $handler);
72+
}
73+
74+
$this->originalHandlers = [];
75+
$this->enabled = false;
76+
$this->terminationRequested = false;
77+
}
78+
79+
/**
80+
* Check if termination was requested.
81+
*/
82+
public function shouldTerminate(): bool
83+
{
84+
return $this->terminationRequested;
85+
}
86+
87+
/**
88+
* Manually reset the termination flag without disabling handlers.
89+
*/
90+
public function reset(): void
91+
{
92+
$this->terminationRequested = false;
93+
}
94+
}

packages/Ecotone/src/Messaging/Endpoint/Interceptor/SignalInterceptor.php

Lines changed: 6 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -4,79 +4,31 @@
44

55
namespace Ecotone\Messaging\Endpoint\Interceptor;
66

7-
use Ecotone\Messaging\Config\ConfigurationException;
87
use Ecotone\Messaging\Endpoint\ConsumerInterceptor;
98
use Ecotone\Messaging\Endpoint\ConsumerInterceptorTrait;
109

11-
/**
12-
* Class SignalInterceptor
13-
* @package Ecotone\Messaging\Endpoint\Interceptor
14-
* @author Dariusz Gafka <support@simplycodedsoftware.com>
15-
*/
1610
/**
1711
* licence Apache-2.0
1812
*/
1913
class SignalInterceptor implements ConsumerInterceptor
2014
{
2115
use ConsumerInterceptorTrait;
22-
private bool $shouldBeStopped = false;
23-
private ?bool $pcntlAsyncSignalsOriginalState = null;
24-
/**
25-
* @var array<int, callable|int>|null
26-
*/
27-
private ?array $beforeHandlers = null;
2816

29-
/**
30-
* @inheritDoc
31-
*/
17+
public function __construct(private PcntlTerminationListener $pcntlTerminationListener) {
18+
}
19+
3220
public function onStartup(): void
3321
{
34-
if (! extension_loaded('pcntl')) {
35-
throw ConfigurationException::create('pcntl extension need to be loaded in order to catch system signals');
36-
}
37-
38-
$this->pcntlAsyncSignalsOriginalState = pcntl_async_signals();
39-
pcntl_async_signals(true);
40-
41-
$this->beforeHandlers = [
42-
SIGTERM => pcntl_signal_get_handler(SIGTERM),
43-
SIGQUIT => pcntl_signal_get_handler(SIGQUIT),
44-
SIGINT => pcntl_signal_get_handler(SIGINT),
45-
];
46-
47-
pcntl_signal(SIGTERM, $this->stopConsumer(...));
48-
pcntl_signal(SIGQUIT, $this->stopConsumer(...));
49-
pcntl_signal(SIGINT, $this->stopConsumer(...));
22+
$this->pcntlTerminationListener->enable();
5023
}
5124

5225
public function onShutdown(): void
5326
{
54-
if ($this->beforeHandlers !== null) {
55-
foreach ($this->beforeHandlers as $signal => $handler) {
56-
pcntl_signal($signal, $handler);
57-
}
58-
$this->beforeHandlers = null;
59-
}
60-
61-
if ($this->pcntlAsyncSignalsOriginalState !== null) {
62-
pcntl_async_signals($this->pcntlAsyncSignalsOriginalState);
63-
$this->pcntlAsyncSignalsOriginalState = null;
64-
}
27+
$this->pcntlTerminationListener->disable();
6528
}
6629

67-
/**
68-
* @inheritDoc
69-
*/
7030
public function shouldBeStopped(): bool
7131
{
72-
return $this->shouldBeStopped;
73-
}
74-
75-
/**
76-
* @param int $signal
77-
*/
78-
public function stopConsumer(int $signal): void
79-
{
80-
$this->shouldBeStopped = true;
32+
return $this->pcntlTerminationListener->shouldTerminate();
8133
}
8234
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
/*
3+
* licence Enterprise
4+
*/
5+
declare(strict_types=1);
6+
7+
namespace Ecotone\Messaging\Endpoint\Interceptor;
8+
9+
interface TerminationListener
10+
{
11+
public function shouldTerminate(): bool;
12+
}

packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedConsumer.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Ecotone\Messaging\Endpoint\Interceptor\LimitExecutionAmountInterceptor;
1212
use Ecotone\Messaging\Endpoint\Interceptor\LimitMemoryUsageInterceptor;
1313
use Ecotone\Messaging\Endpoint\Interceptor\SignalInterceptor;
14+
use Ecotone\Messaging\Endpoint\Interceptor\PcntlTerminationListener;
1415
use Ecotone\Messaging\Endpoint\Interceptor\TimeLimitInterceptor;
1516
use Ecotone\Messaging\Endpoint\PollingMetadata;
1617
use Ecotone\Messaging\Gateway\MessagingEntrypoint;
@@ -104,7 +105,7 @@ public function stop(): void
104105
* @return ConsumerInterceptor[]
105106
* @throws \Ecotone\Messaging\MessagingException
106107
*/
107-
public static function createInterceptorsForPollingMetadata(PollingMetadata $pollingMetadata, LoggerInterface $logger, EcotoneClockInterface $clock): array
108+
public static function createInterceptorsForPollingMetadata(PollingMetadata $pollingMetadata, LoggerInterface $logger, EcotoneClockInterface $clock, PcntlTerminationListener $pcntlTerminationListener): array
108109
{
109110
$interceptors = [];
110111
if ($pollingMetadata->getHandledMessageLimit() > 0) {
@@ -114,7 +115,7 @@ public static function createInterceptorsForPollingMetadata(PollingMetadata $pol
114115
$interceptors[] = new LimitMemoryUsageInterceptor($pollingMetadata->getMemoryLimitInMegabytes());
115116
}
116117
if ($pollingMetadata->isWithSignalInterceptors()) {
117-
$interceptors[] = new SignalInterceptor();
118+
$interceptors[] = new SignalInterceptor($pcntlTerminationListener);
118119
}
119120
if ($pollingMetadata->getExecutionAmountLimit() > 0) {
120121
$interceptors[] = new LimitExecutionAmountInterceptor($pollingMetadata->getExecutionAmountLimit());

packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedConsumerRunner.php

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Ecotone\Messaging\Endpoint\ConsumerLifecycle;
66
use Ecotone\Messaging\Endpoint\EndpointRunner;
77
use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata;
8+
use Ecotone\Messaging\Endpoint\Interceptor\PcntlTerminationListener;
89
use Ecotone\Messaging\Endpoint\PollingMetadata;
910
use Ecotone\Messaging\Gateway\MessagingEntrypoint;
1011
use Ecotone\Messaging\Handler\ExpressionEvaluationService;
@@ -22,12 +23,13 @@
2223
class InterceptedConsumerRunner implements EndpointRunner
2324
{
2425
public function __construct(
25-
private NonProxyGateway $gateway,
26-
private MessagePoller $messagePoller,
27-
private PollingMetadata $defaultPollingMetadata,
28-
private EcotoneClockInterface $clock,
29-
private LoggingGateway $logger,
30-
private MessagingEntrypoint $messagingEntrypoint,
26+
private NonProxyGateway $gateway,
27+
private MessagePoller $messagePoller,
28+
private PollingMetadata $defaultPollingMetadata,
29+
private EcotoneClockInterface $clock,
30+
private PcntlTerminationListener $pcntlTerminationListener,
31+
private LoggingGateway $logger,
32+
private MessagingEntrypoint $messagingEntrypoint,
3133
private ExpressionEvaluationService $expressionEvaluationService,
3234
) {
3335
}
@@ -41,7 +43,7 @@ public function createConsumer(?ExecutionPollingMetadata $executionPollingMetada
4143
{
4244
$this->logger->info('Message Consumer starting to consume messages');
4345
$pollingMetadata = $this->defaultPollingMetadata->applyExecutionPollingMetadata($executionPollingMetadata);
44-
$interceptors = InterceptedConsumer::createInterceptorsForPollingMetadata($pollingMetadata, $this->logger, $this->clock);
46+
$interceptors = InterceptedConsumer::createInterceptorsForPollingMetadata($pollingMetadata, $this->logger, $this->clock, $this->pcntlTerminationListener);
4547
$interceptedGateway = new InterceptedGateway($this->gateway, $interceptors);
4648

4749
$trigger = $this->createTrigger($pollingMetadata);

packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedPollingConsumerBuilder.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Ecotone\Messaging\Config\Container\Reference;
1616
use Ecotone\Messaging\Endpoint\AcknowledgeConfirmationInterceptor;
1717
use Ecotone\Messaging\Endpoint\InboundChannelAdapterEntrypoint;
18+
use Ecotone\Messaging\Endpoint\Interceptor\PcntlTerminationListener;
1819
use Ecotone\Messaging\Endpoint\MessageHandlerConsumerBuilder;
1920
use Ecotone\Messaging\Gateway\MessagingEntrypoint;
2021
use Ecotone\Messaging\Handler\ChannelResolver;
@@ -108,6 +109,7 @@ public function registerConsumer(MessagingContainerBuilder $builder, MessageHand
108109
$this->compileMessagePoller($builder, $messageHandlerBuilder),
109110
new PollingMetadataReference($endpointId),
110111
new Reference(EcotoneClockInterface::class),
112+
new Reference(PcntlTerminationListener::class),
111113
new Reference(LoggingGateway::class),
112114
new Reference(MessagingEntrypoint::class),
113115
new Reference(ExpressionEvaluationService::REFERENCE),

packages/Ecotone/src/Projecting/Config/ProjectingModule.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use Ecotone\Messaging\Config\ModulePackageList;
2020
use Ecotone\Messaging\Config\ModuleReferenceSearchService;
2121
use Ecotone\Messaging\Config\ServiceConfiguration;
22+
use Ecotone\Messaging\Endpoint\Interceptor\TerminationListener;
2223
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
2324
use Ecotone\Messaging\Handler\Processor\MethodInvoker\Converter\HeaderBuilder;
2425
use Ecotone\Messaging\Handler\Processor\MethodInvoker\Converter\ValueBuilder;
@@ -98,6 +99,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
9899
$components[$projectionName][StreamSource::class] ?? throw ConfigurationException::create("Projection with name {$projectionName} does not have stream source configured. Please check your configuration."),
99100
$components[$projectionName][PartitionProvider::class] ?? new Definition(NullPartitionProvider::class),
100101
$projectionName,
102+
new Reference(TerminationListener::class),
101103
$projectionBuilder->batchSize(), // batchSize
102104
$projectionBuilder->automaticInitialization(),
103105
])

0 commit comments

Comments
 (0)