Skip to content

Commit a7b4664

Browse files
committed
refactored connection and its factory
1 parent b566404 commit a7b4664

File tree

6 files changed

+122
-88
lines changed

6 files changed

+122
-88
lines changed

src/Bolt/BoltConnection.php

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
use WeakReference;
3434

3535
/**
36-
* @implements ConnectionInterface<V3>
36+
* @implements ConnectionInterface<array{0: V3, 1: AConnection}>
3737
*
3838
* @psalm-import-type BoltMeta from FormatterInterface
3939
*/
@@ -77,20 +77,20 @@ public function __construct(V3 $protocol, AConnection $connection, ConnectionCon
7777

7878
/**
7979
* @psalm-mutation-free
80+
*
81+
* @return array{0: V3, 1: AConnection}
8082
*/
81-
public function getImplementation(): V3
83+
public function getImplementation(): array
8284
{
8385
if (!$this->isOpen()) {
8486
throw new RuntimeException('Connection is closed');
8587
}
8688

87-
return $this->boltProtocol;
89+
return [$this->boltProtocol, $this->connection];
8890
}
8991

9092
/**
9193
* Encryption level can be either '', 's' or 'ssc', which stand for 'no encryption', 'full encryption' and 'self-signed encryption' respectively.
92-
*
93-
* @return string
9494
*/
9595
public function getEncryptionLevel(): string
9696
{
@@ -184,7 +184,7 @@ public function consumeResults(): void
184184
public function reset(): void
185185
{
186186
try {
187-
$this->getImplementation()->reset();
187+
$this->protocol()->reset();
188188
} catch (MessageException $e) {
189189
$this->serverState = 'DEFUNCT';
190190

@@ -206,7 +206,7 @@ public function begin(?string $database, ?float $timeout, BookmarkHolder $holder
206206

207207
$extra = $this->buildRunExtra($database, $timeout, $holder);
208208
try {
209-
$this->getImplementation()->begin($extra);
209+
$this->protocol()->begin($extra);
210210
} catch (IgnoredException $e) {
211211
$this->serverState = 'INTERRUPTED';
212212

@@ -229,7 +229,7 @@ public function discard(?int $qid): void
229229
{
230230
try {
231231
$extra = $this->buildResultExtra(null, $qid);
232-
$bolt = $this->getImplementation();
232+
$bolt = $this->protocol();
233233

234234
if ($bolt instanceof V4) {
235235
$result = $bolt->discard($extra);
@@ -265,7 +265,7 @@ public function run(string $text, array $parameters, ?string $database, ?float $
265265
try {
266266
$extra = $this->buildRunExtra($database, $timeout, $holder);
267267

268-
$tbr = $this->getImplementation()->run($text, $parameters, $extra);
268+
$tbr = $this->protocol()->run($text, $parameters, $extra);
269269

270270
if (str_starts_with($this->serverState, 'TX_')) {
271271
$this->serverState = 'TX_STREAMING';
@@ -296,7 +296,7 @@ public function commit(): void
296296
$this->consumeResults();
297297

298298
try {
299-
$this->getImplementation()->commit();
299+
$this->protocol()->commit();
300300
} catch (MessageException $e) {
301301
$this->serverState = 'FAILED';
302302

@@ -320,7 +320,7 @@ public function rollback(): void
320320
$this->consumeResults();
321321

322322
try {
323-
$this->getImplementation()->rollback();
323+
$this->protocol()->rollback();
324324
} catch (MessageException $e) {
325325
$this->serverState = 'FAILED';
326326

@@ -334,6 +334,11 @@ public function rollback(): void
334334
$this->serverState = 'READY';
335335
}
336336

337+
public function protocol(): V3
338+
{
339+
return $this->getImplementation()[0];
340+
}
341+
337342
/**
338343
* Pulls a result set.
339344
*
@@ -345,7 +350,7 @@ public function pull(?int $qid, ?int $fetchSize): array
345350
{
346351
$extra = $this->buildResultExtra($fetchSize, $qid);
347352

348-
$bolt = $this->getImplementation();
353+
$bolt = $this->protocol();
349354
try {
350355
if (!$bolt instanceof V4) {
351356
/** @var non-empty-list<list> $tbr */
@@ -374,7 +379,7 @@ public function __destruct()
374379
if ($this->serverState !== 'FAILED' && $this->isOpen()) {
375380
$this->consumeResults();
376381

377-
$this->getImplementation()->goodbye();
382+
$this->protocol()->goodbye();
378383

379384
$this->serverState = 'DEFUNCT';
380385
unset($this->boltProtocol); // has to be set to null as the sockets don't recover nicely contrary to what the underlying code might lead you to believe;

src/Bolt/BoltDriver.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
namespace Laudis\Neo4j\Bolt;
1515

1616
use Exception;
17+
use Laudis\Neo4j\Common\SingleThreadedSemaphore;
18+
use Laudis\Neo4j\Common\SysVSemaphore;
19+
use function extension_loaded;
1720
use function is_string;
1821
use Laudis\Neo4j\Authentication\Authenticate;
1922
use Laudis\Neo4j\Common\Uri;
@@ -78,6 +81,20 @@ public static function create($uri, ?DriverConfiguration $configuration = null,
7881
$configuration ??= DriverConfiguration::default();
7982
$authenticate ??= Authenticate::fromUrl($uri);
8083

84+
// Because interprocess switching of connections between PHP sessions is impossible,
85+
// we have to build a key to limit the amount of open connections, potentially between ALL sessions.
86+
// because of this we have to settle on a configuration basis to limit the connection pool,
87+
// not on an object basis.
88+
// The combination is between the server and the user agent as it most closely resembles an "application"
89+
// connecting to a server. The application thus supports multiple authentication methods, but they have
90+
// to be shared between the same connection pool.
91+
$key = $uri->getHost().':'.$uri->getPort().':'.$config->getUserAgent();
92+
if (extension_loaded('ext-sysvsem')) {
93+
$semaphore = SysVSemaphore::create($key, $config->getMaxPoolSize());
94+
} else {
95+
$semaphore = SingleThreadedSemaphore::create($key, $config->getMaxPoolSize());
96+
}
97+
8198
if ($formatter !== null) {
8299
return new self(
83100
$uri,

src/Bolt/SingleBoltConnectionPool.php

Lines changed: 8 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,18 @@
1313

1414
use Bolt\protocol\V3;
1515
use function explode;
16-
use function extension_loaded;
1716
use Generator;
1817
use Laudis\Neo4j\BoltFactory;
1918
use Laudis\Neo4j\Common\ConnectionConfiguration;
20-
use Laudis\Neo4j\Common\SingleThreadedSemaphore;
21-
use Laudis\Neo4j\Common\SysVSemaphore;
2219
use Laudis\Neo4j\Contracts\AuthenticateInterface;
20+
use Laudis\Neo4j\Contracts\ConnectionFactoryInterface;
2321
use Laudis\Neo4j\Contracts\ConnectionInterface;
2422
use Laudis\Neo4j\Contracts\ConnectionPoolInterface;
2523
use Laudis\Neo4j\Contracts\SemaphoreInterface;
2624
use Laudis\Neo4j\Databags\DatabaseInfo;
27-
use Laudis\Neo4j\Databags\DriverConfiguration;
2825
use Laudis\Neo4j\Databags\SessionConfiguration;
2926
use Laudis\Neo4j\Enum\ConnectionProtocol;
3027
use function microtime;
31-
use Psr\Http\Message\UriInterface;
3228
use function shuffle;
3329

3430
/**
@@ -37,31 +33,17 @@
3733
class SingleBoltConnectionPool implements ConnectionPoolInterface
3834
{
3935
private SemaphoreInterface $semaphore;
40-
private UriInterface $uri;
4136

4237
/** @var list<BoltConnection> */
4338
private array $activeConnections = [];
44-
private DriverConfiguration $config;
4539
private AuthenticateInterface $auth;
40+
private ConnectionFactoryInterface $factory;
4641

47-
public function __construct(UriInterface $uri, DriverConfiguration $config, AuthenticateInterface $auth)
42+
public function __construct(AuthenticateInterface $auth, SemaphoreInterface $semaphore, ConnectionFactoryInterface $factory)
4843
{
49-
// Because interprocess switching of connections between PHP sessions is impossible,
50-
// we have to build a key to limit the amount of open connections, potentially between ALL sessions.
51-
// because of this we have to settle on a configuration basis to limit the connection pool,
52-
// not on an object basis.
53-
// The combination is between the server and the user agent as it most closely resembles an "application"
54-
// connecting to a server. The application thus supports multiple authentication methods, but they have
55-
// to be shared between the same connection pool.
56-
$key = $uri->getHost().':'.$uri->getPort().':'.$config->getUserAgent();
57-
if (extension_loaded('ext-sysvsem')) {
58-
$this->semaphore = SysVSemaphore::create($key, $config->getMaxPoolSize());
59-
} else {
60-
$this->semaphore = SingleThreadedSemaphore::create($key, $config->getMaxPoolSize());
61-
}
62-
63-
$this->uri = $uri;
44+
$this->semaphore = $semaphore;
6445
$this->auth = $auth;
46+
$this->factory = $factory;
6547
}
6648

6749
/**
@@ -92,7 +74,7 @@ public function acquire(SessionConfiguration $config): Generator
9274
}
9375
}
9476

95-
return $this->returnAnyAvailableConnection() ?? $this->createNewConnection($config);
77+
return $this->returnAnyAvailableConnection() ?? $this->factory->createConnection($this->auth, $config);
9678
}
9779

9880
public function release(ConnectionInterface $connection): void
@@ -150,37 +132,14 @@ private function returnAnyAvailableConnection(string $encryptionLevel): ?BoltCon
150132
if ($requiresReconnectConnection) {
151133
$this->release($requiresReconnectConnection);
152134

153-
return $this->createNewConnection()
135+
return $this->createNewConnection();
154136
}
155137

156138
return null;
157139
}
158140

159-
private function createNewConnection(SessionConfiguration $config): BoltConnection
160-
{
161-
$factory = BoltFactory::fromVariables($this->uri, $this->auth, $this->config);
162-
[$bolt, $response] = $factory->build();
163-
164-
$config = new ConnectionConfiguration(
165-
$response['server'],
166-
$this->uri,
167-
explode('/', $response['server'])[1] ?? '',
168-
ConnectionProtocol::determineBoltVersion($bolt),
169-
$config->getAccessMode(),
170-
$this->config,
171-
$config->getDatabase() === null ? null : new DatabaseInfo($config->getDatabase())
172-
);
173-
174-
$tbr = new BoltConnection($factory, $bolt, $config);
175-
176-
$this->activeConnections[] = $tbr;
177-
178-
return $tbr;
179-
}
180-
181141
private function requiresReconnect(BoltConnection $activeConnection, string $requiredEncryptionLevel): bool
182142
{
183-
return $activeConnection->getAuthentication()->toString($this->uri) !== $this->auth->toString($this->uri) ||
184-
$activeConnection->getEncryptionLevel() === $requiredEncryptionLevel;
143+
return
185144
}
186145
}

src/BoltFactory.php

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
namespace Laudis\Neo4j;
1515

1616
use Bolt\Bolt;
17+
use Bolt\connection\AConnection;
1718
use Bolt\connection\Socket;
1819
use Bolt\connection\StreamSocket;
1920
use Bolt\error\ConnectException;
@@ -39,63 +40,59 @@
3940
/**
4041
* Small wrapper around the bolt library to easily guarantee only bolt version 3 and up will be created and authenticated.
4142
*
42-
* @implements ConnectionFactoryInterface<V3>
43+
* @implements ConnectionFactoryInterface<array{0: V3, 1: AConnection}>
4344
*/
4445
final class BoltFactory implements ConnectionFactoryInterface
4546
{
4647
private UriInterface $uri;
47-
private AuthenticateInterface $authenticate;
48+
private AuthenticateInterface $auth;
4849
private DriverConfiguration $config;
50+
private SslConfigurator $sslConfigurator;
4951

5052
/**
5153
* @psalm-external-mutation-free
5254
*/
5355
public function __construct(
5456
UriInterface $uri,
5557
AuthenticateInterface $authenticate,
56-
DriverConfiguration $config
58+
DriverConfiguration $config,
59+
SslConfigurator $sslConfigurator
5760
) {
5861
$this->uri = $uri;
59-
$this->authenticate = $authenticate;
62+
$this->auth = $authenticate;
6063
$this->config = $config;
61-
}
62-
63-
private static function configureSsl(UriInterface $uri, StreamSocket $socket, DriverConfiguration $config): void
64-
{
65-
$options = (new SslConfigurator())->configure($uri, $config);
66-
67-
if ($options !== null) {
68-
$socket->setSslContextOptions($options);
69-
}
64+
$this->sslConfigurator = $sslConfigurator;
7065
}
7166

7267
public function createConnection(AuthenticateInterface $auth, SessionConfiguration $config): ConnectionInterface
7368
{
74-
$ssl = (new SslConfigurator())->configure($this->uri, $this->config);
69+
[$encryptionLevel, $sslConfig] = $this->sslConfigurator->configure($this->uri, $this->config);
7570
$port = $this->uri->getPort() ?? 7687;
76-
if (extension_loaded('sockets') && $ssl === null) {
77-
$socket = new Socket($this->uri->getHost(), $port, TransactionConfiguration::DEFAULT_TIMEOUT);
71+
if (extension_loaded('sockets') && $sslConfig === null) {
72+
$connection = new Socket($this->uri->getHost(), $port, TransactionConfiguration::DEFAULT_TIMEOUT);
7873
} else {
79-
$socket = new StreamSocket($this->uri->getHost(), $port, TransactionConfiguration::DEFAULT_TIMEOUT);
80-
self::configureSsl($this->uri, $socket, $this->config);
74+
$connection = new StreamSocket($this->uri->getHost(), $port, TransactionConfiguration::DEFAULT_TIMEOUT);
75+
if ($sslConfig !== null) {
76+
$connection->setSslContextOptions($sslConfig);
77+
}
8178
}
8279

83-
$bolt = new Bolt($socket);
80+
$bolt = new Bolt($connection);
8481

8582
try {
8683
$bolt->setProtocolVersions(4.4, 4.3, 4.2, 3);
8784
try {
88-
$build = $bolt->build();
85+
$protocol = $bolt->build();
8986
} catch (ConnectException $exception) {
9087
$bolt->setProtocolVersions(4.1, 4.0, 4, 3);
91-
$build = $bolt->build();
88+
$protocol = $bolt->build();
9289
}
9390

94-
if (!$build instanceof V3) {
91+
if (!$protocol instanceof V3) {
9592
throw new RuntimeException('Client only supports bolt version 3 and up.');
9693
}
9794

98-
$response = $auth->authenticateBolt($build, $this->config->getUserAgent());
95+
$response = $auth->authenticateBolt($protocol, $this->config->getUserAgent());
9996
} catch (MessageException $e) {
10097
throw Neo4jException::fromMessageException($e);
10198
}
@@ -104,12 +101,35 @@ public function createConnection(AuthenticateInterface $auth, SessionConfigurati
104101
$response['server'],
105102
$this->uri,
106103
explode('/', $response['server'])[1] ?? '',
107-
ConnectionProtocol::determineBoltVersion($bolt),
104+
ConnectionProtocol::determineBoltVersion($protocol),
105+
$config->getAccessMode(),
106+
$this->config,
107+
$config->getDatabase() === null ? null : new DatabaseInfo($config->getDatabase())
108+
);
109+
110+
return new BoltConnection($protocol, $connection, $config, $this->auth, $encryptionLevel);
111+
}
112+
113+
public function canReuseConnection(ConnectionInterface $connection): bool
114+
{
115+
return $connection->getAuthentication()->toString($this->uri) == $this->auth->toString($this->uri) ||
116+
$connection->getEncryptionLevel() === $this->sslConfigurator->configure($this->uri, $this->config)[0];
117+
}
118+
119+
public function reuseConnection(ConnectionInterface $connection, SessionConfiguration $config): ConnectionInterface
120+
{
121+
$config = new ConnectionConfiguration(
122+
$connection->getServerAgent(),
123+
$this->uri,
124+
$connection->getServerVersion(),
125+
$connection->getProtocol(),
108126
$config->getAccessMode(),
109127
$this->config,
110128
$config->getDatabase() === null ? null : new DatabaseInfo($config->getDatabase())
111129
);
112130

113-
return new BoltConnection($factory, $bolt, $config);
131+
[$protocol, $connectionImpl] = $connection->getImplementation();
132+
133+
return new BoltConnection($protocol, $connectionImpl, $config, $this->auth, $connection->getEncryptionLevel());
114134
}
115135
}

0 commit comments

Comments
 (0)