diff --git a/src/Basic/Driver.php b/src/Basic/Driver.php index ca76954d..7b73a876 100644 --- a/src/Basic/Driver.php +++ b/src/Basic/Driver.php @@ -57,4 +57,9 @@ public static function create(string|UriInterface $uri, ?DriverConfiguration $co return new self($driver); } + + public function closeConnections(): void + { + $this->driver->closeConnections(); + } } diff --git a/src/Bolt/BoltConnection.php b/src/Bolt/BoltConnection.php index 91c95a8c..4c58f95d 100644 --- a/src/Bolt/BoltConnection.php +++ b/src/Bolt/BoltConnection.php @@ -143,7 +143,23 @@ public function getAuthentication(): AuthenticateInterface */ public function isOpen(): bool { - return !in_array($this->protocol()->serverState, [ServerState::DISCONNECTED, ServerState::DEFUNCT], true); + return !in_array( + $this->protocol()->serverState, + [ServerState::DISCONNECTED, ServerState::DEFUNCT], + true + ); + } + + /** + * @psalm-mutation-free + */ + public function isStreaming(): bool + { + return in_array( + $this->protocol()->serverState, + [ServerState::STREAMING, ServerState::TX_STREAMING], + true + ); } public function setTimeout(float $timeout): void @@ -154,7 +170,7 @@ public function setTimeout(float $timeout): void public function consumeResults(): void { $this->logger?->log(LogLevel::DEBUG, 'Consuming results'); - if ($this->protocol()->serverState !== ServerState::STREAMING && $this->protocol()->serverState !== ServerState::TX_STREAMING) { + if (!$this->isStreaming()) { $this->subscribedResults = []; return; @@ -225,8 +241,14 @@ public function discard(?int $qid): void * * @return BoltMeta */ - public function run(string $text, array $parameters, ?string $database, ?float $timeout, BookmarkHolder $holder, ?AccessMode $mode): array - { + public function run( + string $text, + array $parameters, + ?string $database, + ?float $timeout, + BookmarkHolder $holder, + ?AccessMode $mode + ): array { $extra = $this->buildRunExtra($database, $timeout, $holder, $mode); $this->logger?->log(LogLevel::DEBUG, 'RUN', $extra); $response = $this->protocol() @@ -298,10 +320,15 @@ public function pull(?int $qid, ?int $fetchSize): array } public function __destruct() + { + $this->close(); + } + + public function close(): void { try { if ($this->isOpen()) { - if ($this->protocol()->serverState === ServerState::STREAMING || $this->protocol()->serverState === ServerState::TX_STREAMING) { + if ($this->isStreaming()) { $this->consumeResults(); } diff --git a/src/Bolt/BoltDriver.php b/src/Bolt/BoltDriver.php index db31730b..0fa16706 100644 --- a/src/Bolt/BoltDriver.php +++ b/src/Bolt/BoltDriver.php @@ -109,4 +109,9 @@ public function verifyConnectivity(?SessionConfiguration $config = null): bool return true; } + + public function closeConnections(): void + { + $this->pool->close(); + } } diff --git a/src/Bolt/ConnectionPool.php b/src/Bolt/ConnectionPool.php index 905be531..2b2bfa25 100644 --- a/src/Bolt/ConnectionPool.php +++ b/src/Bolt/ConnectionPool.php @@ -46,8 +46,12 @@ public function __construct( private readonly ?Neo4jLogger $logger ) {} - public static function create(UriInterface $uri, AuthenticateInterface $auth, DriverConfiguration $conf, SemaphoreInterface $semaphore): self - { + public static function create( + UriInterface $uri, + AuthenticateInterface $auth, + DriverConfiguration $conf, + SemaphoreInterface $semaphore + ): self { return new self( $semaphore, BoltFactory::create($conf->getLogger()), @@ -165,4 +169,12 @@ private function returnAnyAvailableConnection(SessionConfiguration $config): ?Co return null; } + + public function close(): void + { + foreach ($this->activeConnections as $activeConnection) { + $activeConnection->close(); + } + $this->activeConnections = []; + } } diff --git a/src/Contracts/ConnectionInterface.php b/src/Contracts/ConnectionInterface.php index 3d917d99..63faa619 100644 --- a/src/Contracts/ConnectionInterface.php +++ b/src/Contracts/ConnectionInterface.php @@ -112,4 +112,9 @@ public function getEncryptionLevel(): string; * Returns the user agent handling this connection. */ public function getUserAgent(): string; + + /** + * Closes the connection. + */ + public function close(): void; } diff --git a/src/Contracts/ConnectionPoolInterface.php b/src/Contracts/ConnectionPoolInterface.php index 56978b66..98dd372c 100644 --- a/src/Contracts/ConnectionPoolInterface.php +++ b/src/Contracts/ConnectionPoolInterface.php @@ -44,4 +44,9 @@ public function acquire(SessionConfiguration $config): Generator; * Releases a connection back to the pool. */ public function release(ConnectionInterface $connection): void; + + /** + * Closes all connections in the pool. + */ + public function close(): void; } diff --git a/src/Contracts/DriverInterface.php b/src/Contracts/DriverInterface.php index ea0ca2b3..9d9b0018 100644 --- a/src/Contracts/DriverInterface.php +++ b/src/Contracts/DriverInterface.php @@ -38,4 +38,9 @@ public function createSession(?SessionConfiguration $config = null): SessionInte * Returns true if the driver can make a valid connection with the server. */ public function verifyConnectivity(?SessionConfiguration $config = null): bool; + + /** + * Closes all connections in the pool. + */ + public function closeConnections(): void; } diff --git a/src/Http/HttpConnectionPool.php b/src/Http/HttpConnectionPool.php index ac2fd8a2..2c0e3929 100644 --- a/src/Http/HttpConnectionPool.php +++ b/src/Http/HttpConnectionPool.php @@ -127,4 +127,9 @@ public function release(ConnectionInterface $connection): void { // Nothing to release in the current HTTP Protocol implementation } + + public function close(): void + { + // Nothing to close in the current HTTP Protocol implementation + } } diff --git a/src/Http/HttpDriver.php b/src/Http/HttpDriver.php index 8eb41220..76bcfa8d 100644 --- a/src/Http/HttpDriver.php +++ b/src/Http/HttpDriver.php @@ -198,4 +198,9 @@ private function tsxUrl(SessionConfiguration $config): Resolvable return str_replace('{databaseName}', $database, $tsx); }); } + + public function closeConnections(): void + { + // Nothing to close in the current HTTP Protocol implementation + } } diff --git a/src/Neo4j/Neo4jConnectionPool.php b/src/Neo4j/Neo4jConnectionPool.php index eff1c6c2..0a2bf890 100644 --- a/src/Neo4j/Neo4jConnectionPool.php +++ b/src/Neo4j/Neo4jConnectionPool.php @@ -233,4 +233,13 @@ private function createKey(ConnectionRequestData $data, ?SessionConfiguration $c ':', ], '|', $key); } + + public function close(): void + { + foreach (self::$pools as $pool) { + $pool->close(); + } + self::$pools = []; + $this->cache->clear(); + } } diff --git a/src/Neo4j/Neo4jDriver.php b/src/Neo4j/Neo4jDriver.php index 1dfccbb5..bc2a1695 100644 --- a/src/Neo4j/Neo4jDriver.php +++ b/src/Neo4j/Neo4jDriver.php @@ -111,4 +111,9 @@ public function verifyConnectivity(?SessionConfiguration $config = null): bool return true; } + + public function closeConnections(): void + { + $this->pool->close(); + } }