Skip to content

Commit b566404

Browse files
committed
introduced hierarchy preference in connection reuse selection
1 parent 00d4c59 commit b566404

File tree

5 files changed

+87
-66
lines changed

5 files changed

+87
-66
lines changed

src/Bolt/BoltConnection.php

Lines changed: 39 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,20 @@
1313

1414
namespace Laudis\Neo4j\Bolt;
1515

16-
use BadMethodCallException;
16+
use Bolt\connection\AConnection;
1717
use Bolt\error\IgnoredException;
1818
use Bolt\error\MessageException;
1919
use Bolt\protocol\V3;
2020
use Bolt\protocol\V4;
21-
use Laudis\Neo4j\BoltFactory;
2221
use Laudis\Neo4j\Common\ConnectionConfiguration;
22+
use Laudis\Neo4j\Contracts\AuthenticateInterface;
2323
use Laudis\Neo4j\Contracts\ConnectionInterface;
2424
use Laudis\Neo4j\Contracts\FormatterInterface;
2525
use Laudis\Neo4j\Databags\BookmarkHolder;
2626
use Laudis\Neo4j\Databags\DatabaseInfo;
27-
use Laudis\Neo4j\Databags\DriverConfiguration;
2827
use Laudis\Neo4j\Enum\AccessMode;
2928
use Laudis\Neo4j\Enum\ConnectionProtocol;
3029
use Laudis\Neo4j\Types\CypherList;
31-
use LogicException;
3230
use Psr\Http\Message\UriInterface;
3331
use RuntimeException;
3432
use function str_starts_with;
@@ -41,13 +39,11 @@
4139
*/
4240
final class BoltConnection implements ConnectionInterface
4341
{
44-
private ?V3 $boltProtocol;
42+
private V3 $boltProtocol;
4543
/** @psalm-readonly */
4644
private ConnectionConfiguration $config;
47-
/** @psalm-readonly */
48-
private BoltFactory $factory;
45+
private string $serverState;
4946

50-
private string $serverState = 'DISCONNECTED';
5147
/**
5248
* @note We are using references to "subscribed results" to maintain backwards compatibility and try and strike
5349
* a balance between performance and ease of use.
@@ -62,32 +58,45 @@ final class BoltConnection implements ConnectionInterface
6258
* @var list<WeakReference<CypherList>>
6359
*/
6460
private array $subscribedResults = [];
61+
private AuthenticateInterface $auth;
62+
private AConnection $connection;
63+
private string $encryptionLevel;
6564

6665
/**
6766
* @psalm-mutation-free
6867
*/
69-
public function __construct(BoltFactory $factory, ?V3 $boltProtocol, ConnectionConfiguration $config)
68+
public function __construct(V3 $protocol, AConnection $connection, ConnectionConfiguration $config, AuthenticateInterface $auth, string $encryptionLevel)
7069
{
71-
$this->factory = $factory;
72-
$this->boltProtocol = $boltProtocol;
70+
$this->boltProtocol = $protocol;
7371
$this->config = $config;
74-
if ($boltProtocol) {
75-
$this->serverState = 'READY';
76-
}
72+
$this->serverState = 'READY';
73+
$this->auth = $auth;
74+
$this->connection = $connection;
75+
$this->encryptionLevel = $encryptionLevel;
7776
}
7877

7978
/**
8079
* @psalm-mutation-free
8180
*/
8281
public function getImplementation(): V3
8382
{
84-
if ($this->boltProtocol === null) {
83+
if (!$this->isOpen()) {
8584
throw new RuntimeException('Connection is closed');
8685
}
8786

8887
return $this->boltProtocol;
8988
}
9089

90+
/**
91+
* Encryption level can be either '', 's' or 'ssc', which stand for 'no encryption', 'full encryption' and 'self-signed encryption' respectively.
92+
*
93+
* @return string
94+
*/
95+
public function getEncryptionLevel(): string
96+
{
97+
return $this->encryptionLevel;
98+
}
99+
91100
/**
92101
* @psalm-mutation-free
93102
*/
@@ -136,6 +145,11 @@ public function getDatabaseInfo(): ?DatabaseInfo
136145
return $this->config->getDatabaseInfo();
137146
}
138147

148+
public function getAuthentication(): AuthenticateInterface
149+
{
150+
return $this->auth;
151+
}
152+
139153
/**
140154
* @psalm-mutation-free
141155
*/
@@ -146,7 +160,7 @@ public function isOpen(): bool
146160

147161
public function setTimeout(float $timeout): void
148162
{
149-
$this->factory->getConnection()->setTimeout($timeout);
163+
$this->connection->setTimeout($timeout);
150164
}
151165

152166
public function consumeResults(): void
@@ -170,7 +184,7 @@ public function consumeResults(): void
170184
public function reset(): void
171185
{
172186
try {
173-
$this->protocol()->reset();
187+
$this->getImplementation()->reset();
174188
} catch (MessageException $e) {
175189
$this->serverState = 'DEFUNCT';
176190

@@ -192,7 +206,7 @@ public function begin(?string $database, ?float $timeout, BookmarkHolder $holder
192206

193207
$extra = $this->buildRunExtra($database, $timeout, $holder);
194208
try {
195-
$this->protocol()->begin($extra);
209+
$this->getImplementation()->begin($extra);
196210
} catch (IgnoredException $e) {
197211
$this->serverState = 'INTERRUPTED';
198212

@@ -206,11 +220,6 @@ public function begin(?string $database, ?float $timeout, BookmarkHolder $holder
206220
$this->serverState = 'TX_READY';
207221
}
208222

209-
public function getFactory(): BoltFactory
210-
{
211-
return $this->factory;
212-
}
213-
214223
/**
215224
* Discards a result.
216225
*
@@ -220,7 +229,7 @@ public function discard(?int $qid): void
220229
{
221230
try {
222231
$extra = $this->buildResultExtra(null, $qid);
223-
$bolt = $this->protocol();
232+
$bolt = $this->getImplementation();
224233

225234
if ($bolt instanceof V4) {
226235
$result = $bolt->discard($extra);
@@ -256,7 +265,7 @@ public function run(string $text, array $parameters, ?string $database, ?float $
256265
try {
257266
$extra = $this->buildRunExtra($database, $timeout, $holder);
258267

259-
$tbr = $this->protocol()->run($text, $parameters, $extra);
268+
$tbr = $this->getImplementation()->run($text, $parameters, $extra);
260269

261270
if (str_starts_with($this->serverState, 'TX_')) {
262271
$this->serverState = 'TX_STREAMING';
@@ -287,7 +296,7 @@ public function commit(): void
287296
$this->consumeResults();
288297

289298
try {
290-
$this->protocol()->commit();
299+
$this->getImplementation()->commit();
291300
} catch (MessageException $e) {
292301
$this->serverState = 'FAILED';
293302

@@ -311,7 +320,7 @@ public function rollback(): void
311320
$this->consumeResults();
312321

313322
try {
314-
$this->protocol()->rollback();
323+
$this->getImplementation()->rollback();
315324
} catch (MessageException $e) {
316325
$this->serverState = 'FAILED';
317326

@@ -336,7 +345,7 @@ public function pull(?int $qid, ?int $fetchSize): array
336345
{
337346
$extra = $this->buildResultExtra($fetchSize, $qid);
338347

339-
$bolt = $this->protocol();
348+
$bolt = $this->getImplementation();
340349
try {
341350
if (!$bolt instanceof V4) {
342351
/** @var non-empty-list<list> $tbr */
@@ -360,23 +369,15 @@ public function pull(?int $qid, ?int $fetchSize): array
360369
return $tbr;
361370
}
362371

363-
/**
364-
* @psalm-mutation-free
365-
*/
366-
public function getDriverConfiguration(): DriverConfiguration
367-
{
368-
return $this->config->getDriverConfiguration();
369-
}
370-
371372
public function __destruct()
372373
{
373374
if ($this->serverState !== 'FAILED' && $this->isOpen()) {
374375
$this->consumeResults();
375376

376-
$this->protocol()->goodbye();
377+
$this->getImplementation()->goodbye();
377378

378379
$this->serverState = 'DEFUNCT';
379-
$this->boltProtocol = null; // has to be set to null as the sockets don't recover nicely contrary to what the underlying code might lead you to believe;
380+
unset($this->boltProtocol); // has to be set to null as the sockets don't recover nicely contrary to what the underlying code might lead you to believe;
380381
}
381382
}
382383

@@ -421,15 +422,6 @@ public function subscribeResult(CypherList $result): void
421422
$this->subscribedResults[] = WeakReference::create($result);
422423
}
423424

424-
private function protocol(): V3
425-
{
426-
if ($this->boltProtocol === null) {
427-
throw new LogicException('Cannot use protocol if it is not created');
428-
}
429-
430-
return $this->boltProtocol;
431-
}
432-
433425
private function interpretResult(array $result): void
434426
{
435427
if (str_starts_with($this->serverState, 'TX_')) {

src/Bolt/SingleBoltConnectionPool.php

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
use Laudis\Neo4j\Enum\ConnectionProtocol;
3030
use function microtime;
3131
use Psr\Http\Message\UriInterface;
32-
use RuntimeException;
3332
use function shuffle;
3433

3534
/**
@@ -66,8 +65,6 @@ public function __construct(UriInterface $uri, DriverConfiguration $config, Auth
6665
}
6766

6867
/**
69-
* @param SessionConfiguration $config
70-
*
7168
* @return Generator<
7269
* int,
7370
* float,
@@ -112,18 +109,21 @@ public function release(ConnectionInterface $connection): void
112109
}
113110
}
114111

115-
116-
117-
private function returnAnyAvailableConnection(): ?BoltConnection
112+
private function returnAnyAvailableConnection(string $encryptionLevel): ?BoltConnection
118113
{
119114
$streamingConnection = null;
115+
$requiresReconnectConnection = null;
120116
// Ensure random connection reuse before picking one.
121117
shuffle($this->activeConnections);
122118

123119
foreach ($this->activeConnections as $activeConnection) {
124120
// We prefer a connection that is just ready
125121
if ($activeConnection->getServerState() === 'READY') {
126-
return $activeConnection;
122+
if (!$this->requiresReconnect($activeConnection, $encryptionLevel)) {
123+
return $activeConnection;
124+
} else {
125+
$requiresReconnectConnection = $activeConnection;
126+
}
127127
}
128128

129129
// We will store any streaming connections, so we can use that one
@@ -134,12 +134,26 @@ private function returnAnyAvailableConnection(): ?BoltConnection
134134
// https://github.com/neo4j-php/neo4j-php-client/issues/146
135135
// NOTE: we cannot work with TX_STREAMING as we cannot force the transaction to implicitly close.
136136
if ($streamingConnection === null && $activeConnection->getServerState() === 'STREAMING') {
137-
$streamingConnection = $activeConnection;
138-
$streamingConnection->consumeResults(); // State should now be ready
137+
if (!$this->requiresReconnect($activeConnection, $encryptionLevel)) {
138+
$streamingConnection = $activeConnection;
139+
$streamingConnection->consumeResults(); // State should now be ready
140+
} else {
141+
$requiresReconnectConnection = $activeConnection;
142+
}
139143
}
140144
}
141145

142-
return $streamingConnection;
146+
if ($streamingConnection) {
147+
return $streamingConnection;
148+
}
149+
150+
if ($requiresReconnectConnection) {
151+
$this->release($requiresReconnectConnection);
152+
153+
return $this->createNewConnection()
154+
}
155+
156+
return null;
143157
}
144158

145159
private function createNewConnection(SessionConfiguration $config): BoltConnection
@@ -163,4 +177,10 @@ private function createNewConnection(SessionConfiguration $config): BoltConnecti
163177

164178
return $tbr;
165179
}
180+
181+
private function requiresReconnect(BoltConnection $activeConnection, string $requiredEncryptionLevel): bool
182+
{
183+
return $activeConnection->getAuthentication()->toString($this->uri) !== $this->auth->toString($this->uri) ||
184+
$activeConnection->getEncryptionLevel() === $requiredEncryptionLevel;
185+
}
166186
}

src/Bolt/SslConfigurator.php

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,13 @@
2020

2121
final class SslConfigurator
2222
{
23-
public function configure(UriInterface $uri, DriverConfiguration $config): ?array
23+
/**
24+
* @param UriInterface $uri
25+
* @param DriverConfiguration $config
26+
*
27+
* @return array{0: ''|'s'|'ssc', 1: array|null}
28+
*/
29+
public function configure(UriInterface $uri, DriverConfiguration $config): array
2430
{
2531
$sslMode = $config->getSslConfiguration()->getMode();
2632
$sslConfig = '';
@@ -35,10 +41,10 @@ public function configure(UriInterface $uri, DriverConfiguration $config): ?arra
3541
}
3642

3743
if (str_starts_with($sslConfig, 's')) {
38-
return $this->enableSsl($uri->getHost(), $sslConfig, $config);
44+
return [$sslConfig, $this->enableSsl($uri->getHost(), $sslConfig, $config)];
3945
}
4046

41-
return null;
47+
return [$sslConfig, null];
4248
}
4349

4450
private function enableSsl(string $host, string $sslConfig, DriverConfiguration $config): ?array

src/BoltFactory.php

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,11 @@
1414
namespace Laudis\Neo4j;
1515

1616
use Bolt\Bolt;
17-
use Bolt\connection\IConnection;
1817
use Bolt\connection\Socket;
1918
use Bolt\connection\StreamSocket;
2019
use Bolt\error\ConnectException;
2120
use Bolt\error\MessageException;
2221
use Bolt\protocol\V3;
23-
use Exception;
24-
use Laudis\Neo4j\Databags\SessionConfiguration;
2522
use function explode;
2623
use function extension_loaded;
2724
use Laudis\Neo4j\Bolt\BoltConnection;
@@ -32,6 +29,7 @@
3229
use Laudis\Neo4j\Contracts\ConnectionInterface;
3330
use Laudis\Neo4j\Databags\DatabaseInfo;
3431
use Laudis\Neo4j\Databags\DriverConfiguration;
32+
use Laudis\Neo4j\Databags\SessionConfiguration;
3533
use Laudis\Neo4j\Databags\TransactionConfiguration;
3634
use Laudis\Neo4j\Enum\ConnectionProtocol;
3735
use Laudis\Neo4j\Exception\Neo4jException;
@@ -56,14 +54,12 @@ public function __construct(
5654
UriInterface $uri,
5755
AuthenticateInterface $authenticate,
5856
DriverConfiguration $config
59-
)
60-
{
57+
) {
6158
$this->uri = $uri;
6259
$this->authenticate = $authenticate;
6360
$this->config = $config;
6461
}
6562

66-
6763
private static function configureSsl(UriInterface $uri, StreamSocket $socket, DriverConfiguration $config): void
6864
{
6965
$options = (new SslConfigurator())->configure($uri, $config);

src/Contracts/ConnectionInterface.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ interface ConnectionInterface
3434
*/
3535
public function getImplementation();
3636

37+
/**
38+
* Returns the authentication logic attached to this connection.
39+
*
40+
* @psalm-mutation-free
41+
*/
42+
public function getAuthentication(): AuthenticateInterface;
43+
3744
/**
3845
* Returns the agent the servers uses to identify itself.
3946
*

0 commit comments

Comments
 (0)