forked from krowinski/php-mysql-replication
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMySQLReplicationFactory.php
More file actions
132 lines (114 loc) · 3.93 KB
/
MySQLReplicationFactory.php
File metadata and controls
132 lines (114 loc) · 3.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
<?php
declare(strict_types=1);
namespace MySQLReplication;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\DriverManager;
use MySQLReplication\BinLog\BinLogServerInfo;
use MySQLReplication\BinLog\BinLogSocketConnect;
use MySQLReplication\Cache\ArrayCache;
use MySQLReplication\Config\Config;
use MySQLReplication\Event\Event;
use MySQLReplication\Event\RowEvent\RowEventBuilder;
use MySQLReplication\Event\RowEvent\RowEventFactory;
use MySQLReplication\Repository\MySQLRepository;
use MySQLReplication\Repository\RepositoryInterface;
use MySQLReplication\Socket\Socket;
use MySQLReplication\Socket\SocketInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Psr\SimpleCache\CacheInterface;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
class MySQLReplicationFactory
{
private ?Connection $connection = null;
private EventDispatcherInterface $eventDispatcher;
private Event $event;
private BinLogSocketConnect $binLogSocketConnect;
public function __construct(
Config $config,
?RepositoryInterface $repository = null,
?CacheInterface $cache = null,
?EventDispatcherInterface $eventDispatcher = null,
?SocketInterface $socket = null,
?LoggerInterface $logger = null
) {
$config->validate();
if ($repository === null) {
$this->connection = DriverManager::getConnection(
[
'user' => $config->user,
'password' => $config->password,
'host' => $config->host,
'port' => $config->port,
'driver' => 'pdo_mysql',
'charset' => $config->charset,
]
);
$repository = new MySQLRepository($this->connection);
}
$cache = $cache ?: new ArrayCache($config->tableCacheSize);
$logger = $logger ?: new NullLogger();
$socket = $socket ?: new Socket();
$this->eventDispatcher = $eventDispatcher ?: new EventDispatcher();
$this->binLogSocketConnect = new BinLogSocketConnect($repository, $socket, $logger, $config);
$this->event = new Event(
$this->binLogSocketConnect,
new RowEventFactory(
new RowEventBuilder(
$repository,
$cache,
$config,
$this->binLogSocketConnect->getBinLogServerInfo(),
$logger
)
),
$this->eventDispatcher,
$cache,
$config,
$this->binLogSocketConnect->getBinLogServerInfo()
);
}
public function registerSubscriber(EventSubscriberInterface $eventSubscribers): void
{
$this->eventDispatcher->addSubscriber($eventSubscribers);
}
public function unregisterSubscriber(EventSubscriberInterface $eventSubscribers): void
{
$this->eventDispatcher->removeSubscriber($eventSubscribers);
}
public function getDbConnection(): ?Connection
{
return $this->connection;
}
public function run(): void
{
/** @phpstan-ignore-next-line */
while (1) {
$this->consume();
}
}
/**
* Run replication, checking $shouldStop callback on each iteration, to be able to gracefully stop the process.
*
* @param callable $shouldStop Returns true if the process should stop
*/
public function runWithStopCheck(callable $shouldStop): void
{
while (true) {
if ($shouldStop()) {
break;
}
$this->consume();
}
}
public function consume(): void
{
$this->event->consume();
}
public function getServerInfo(): BinLogServerInfo
{
return $this->binLogSocketConnect->getBinLogServerInfo();
}
}