Skip to content

Commit a2efd17

Browse files
feat: Simplify acquiring and resuing connections
1 parent 1acd98a commit a2efd17

File tree

7 files changed

+46
-70
lines changed

7 files changed

+46
-70
lines changed

src/Bolt/ConnectionPool.php

Lines changed: 25 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
use Laudis\Neo4j\Databags\DriverConfiguration;
2525
use Laudis\Neo4j\Databags\SessionConfiguration;
2626

27+
use Laudis\Neo4j\Exception\ConnectionPoolException;
2728
use function method_exists;
2829
use function microtime;
2930

@@ -44,6 +45,7 @@ public function __construct(
4445
private readonly BoltFactory $factory,
4546
private readonly ConnectionRequestData $data,
4647
private readonly ?Neo4jLogger $logger,
48+
private readonly float $acquireConnectionTimeout
4749
) {
4850
}
4951

@@ -63,33 +65,39 @@ public static function create(
6365
$conf->getUserAgent(),
6466
$conf->getSslConfiguration()
6567
),
66-
$conf->getLogger()
68+
$conf->getLogger(),
69+
$conf->getAcquireConnectionTimeout()
6770
);
6871
}
6972

7073
public function acquire(SessionConfiguration $config): Generator
7174
{
72-
$generator = $this->semaphore->wait();
73-
$start = microtime(true);
75+
return (function () use ($config) {
76+
$connection = $this->reuseConnectionIfPossible($config);
77+
if ($connection !== null) {
78+
return $connection;
79+
}
7480

75-
return (function () use ($generator, $start, $config) {
81+
$generator = $this->semaphore->wait();
7682
// If the generator is valid, it means we are waiting to acquire a new connection.
7783
// This means we can use this time to check if we can reuse a connection or should throw a timeout exception.
7884
while ($generator->valid()) {
79-
/** @var bool $continue */
80-
$continue = yield microtime(true) - $start;
81-
$generator->send($continue);
82-
if ($continue === false) {
83-
return null;
84-
}
85+
$waitTime = $generator->current();
86+
if ($waitTime <= $this->acquireConnectionTimeout) {
87+
yield $waitTime;
88+
89+
$connection = $this->reuseConnectionIfPossible($config);
90+
if ($connection !== null) {
91+
return $connection;
92+
}
8593

86-
$connection = $this->returnAnyAvailableConnection($config);
87-
if ($connection !== null) {
88-
return $connection;
94+
$generator->next();
95+
} else {
96+
throw new ConnectionPoolException('Connection acquire timeout reached: ' . $waitTime);
8997
}
9098
}
9199

92-
$connection = $this->returnAnyAvailableConnection($config);
100+
$connection = $this->reuseConnectionIfPossible($config);
93101
if ($connection !== null) {
94102
return $connection;
95103
}
@@ -119,53 +127,15 @@ public function getLogger(): ?Neo4jLogger
119127
return $this->logger;
120128
}
121129

122-
/**
123-
* @return BoltConnection|null
124-
*/
125-
private function returnAnyAvailableConnection(SessionConfiguration $config): ?ConnectionInterface
130+
private function reuseConnectionIfPossible(SessionConfiguration $config): ?ConnectionInterface
126131
{
127-
$streamingConnection = null;
128-
$requiresReconnectConnection = null;
129132
// Ensure random connection reuse before picking one.
130133
shuffle($this->activeConnections);
131-
132134
foreach ($this->activeConnections as $activeConnection) {
133135
// We prefer a connection that is just ready
134-
if ($activeConnection->getServerState() === 'READY') {
135-
if ($this->factory->canReuseConnection($activeConnection, $this->data, $config)) {
136-
return $this->factory->reuseConnection($activeConnection, $config);
137-
} else {
138-
$requiresReconnectConnection = $activeConnection;
139-
}
136+
if ($activeConnection->getServerState() === 'READY' && $this->factory->canReuseConnection($activeConnection, $config)) {
137+
return $this->factory->reuseConnection($activeConnection, $config);
140138
}
141-
142-
// We will store any streaming connections, so we can use that one
143-
// as we can force the subscribed result sets to consume the results
144-
// and become ready again.
145-
// This code will make sure we never get stuck if the user has many
146-
// results open that aren't consumed yet.
147-
// https://github.com/neo4j-php/neo4j-php-client/issues/146
148-
// NOTE: we cannot work with TX_STREAMING as we cannot force the transaction to implicitly close.
149-
if ($streamingConnection === null && $activeConnection->getServerState() === 'STREAMING') {
150-
if ($this->factory->canReuseConnection($activeConnection, $this->data, $config)) {
151-
$streamingConnection = $activeConnection;
152-
if (method_exists($streamingConnection, 'consumeResults')) {
153-
$streamingConnection->consumeResults(); // State should now be ready
154-
}
155-
} else {
156-
$requiresReconnectConnection = $activeConnection;
157-
}
158-
}
159-
}
160-
161-
if ($streamingConnection) {
162-
return $this->factory->reuseConnection($streamingConnection, $config);
163-
}
164-
165-
if ($requiresReconnectConnection) {
166-
$this->release($requiresReconnectConnection);
167-
168-
return $this->factory->createConnection($this->data, $config);
169139
}
170140

171141
return null;

src/Bolt/Messages/BoltCommitMessage.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public function send(): BoltCommitMessage
4040
{
4141
$this->logger?->log(LogLevel::DEBUG, 'COMMIT');
4242
$response = $this->protocol->commit()->getResponse();
43+
$this->protocol->serverState = ServerState::READY;
4344

4445
/** @var array{bookmark?: string} $content */
4546
$content = $response->content;

src/BoltFactory.php

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public function createConnection(ConnectionRequestData $data, SessionConfigurati
7979
return new BoltConnection($protocol, $connection, $data->getAuth(), $data->getUserAgent(), $config, $this->logger);
8080
}
8181

82-
public function canReuseConnection(ConnectionInterface $connection, ConnectionRequestData $data, SessionConfiguration $config): bool
82+
public function canReuseConnection(ConnectionInterface $connection, SessionConfiguration $config): bool
8383
{
8484
if (!$connection->isOpen()) {
8585
return false;
@@ -88,12 +88,7 @@ public function canReuseConnection(ConnectionInterface $connection, ConnectionRe
8888
$databaseInfo = $connection->getDatabaseInfo();
8989
$database = $databaseInfo?->getName();
9090

91-
return $connection->getServerAddress()->getHost() === $data->getUri()->getHost()
92-
&& $connection->getServerAddress()->getPort() === $data->getUri()->getPort()
93-
&& $connection->getAuthentication()->toString($data->getUri()) === $data->getAuth()->toString($data->getUri())
94-
&& $connection->getEncryptionLevel() === $this->sslConfigurationFactory->create($data->getUri(), $data->getSslConfig())[0]
95-
&& $connection->getUserAgent() === $data->getUserAgent()
96-
&& $connection->getAccessMode() === $config->getAccessMode()
91+
return $connection->getAccessMode() === $config->getAccessMode()
9792
&& $database === $config->getDatabase();
9893
}
9994

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?php
2+
3+
namespace Laudis\Neo4j\Exception;
4+
5+
final class ConnectionPoolException extends \RuntimeException
6+
{
7+
8+
}

src/Neo4j/Neo4jConnectionPool.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public function __construct(
7474
private readonly CacheInterface $cache,
7575
private readonly AddressResolverInterface $resolver,
7676
private readonly ?Neo4jLogger $logger,
77+
private readonly float $acquireConnectionTimeout,
7778
) {
7879
}
7980

@@ -96,7 +97,8 @@ public static function create(
9697
),
9798
Cache::getInstance(),
9899
$resolver,
99-
$conf->getLogger()
100+
$conf->getLogger(),
101+
$conf->getAcquireConnectionTimeout()
100102
);
101103
}
102104

@@ -112,7 +114,7 @@ public function createOrGetPool(string $hostname, UriInterface $uri): Connection
112114

113115
$key = $this->createKey($data);
114116
if (!array_key_exists($key, self::$pools)) {
115-
self::$pools[$key] = new ConnectionPool($this->semaphore, $this->factory, $data, $this->logger);
117+
self::$pools[$key] = new ConnectionPool($this->semaphore, $this->factory, $data, $this->logger, $this->acquireConnectionTimeout);
116118
}
117119

118120
return self::$pools[$key];

tests/Integration/EdgeCasesTest.php

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,18 +102,17 @@ public function testRunALotOfStatements(): void
102102
}
103103
}
104104

105-
$statements = [];
105+
$count = 0;
106106
foreach ($personIds as $personId) {
107107
foreach ($movieIds as $movieId) {
108-
$statements[] = Statement::create(
108+
$count++;
109+
$this->getSession()->runStatement(Statement::create(
109110
'MATCH (a), (b) WHERE id(a) = $ida AND id(b) = $idb MERGE (a) <-[r:ACTED_IN]- (b) RETURN id(r)',
110111
['ida' => $personId, 'idb' => $movieId]
111-
);
112+
));
112113
}
113114
}
114-
115-
$this->getSession()->runStatements($statements);
116-
self::assertCount(4978, $statements);
115+
self::assertEquals(4978, $count);
117116
}
118117

119118
public function testGettingKeysFromArraylist(): void

tests/Unit/BoltConnectionPoolTest.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ private function setupPool(Generator $semaphoreGenerator): void
171171
SslConfiguration::default()
172172
),
173173
null,
174+
10.0
174175
);
175176
}
176177
}

0 commit comments

Comments
 (0)