14
14
namespace Laudis \Neo4j \Neo4j ;
15
15
16
16
use Bolt \Bolt ;
17
+ use Bolt \connection \StreamSocket ;
17
18
use Exception ;
18
19
use Laudis \Neo4j \Bolt \BoltDriver ;
19
20
use Laudis \Neo4j \Common \Uri ;
29
30
/**
30
31
* @psalm-import-type BasicDriver from \Laudis\Neo4j\Contracts\DriverInterface
31
32
*
32
- * @implements ConnectionPoolInterface<Bolt >
33
+ * @implements ConnectionPoolInterface<StreamSocket >
33
34
*/
34
35
final class Neo4jConnectionPool implements ConnectionPoolInterface
35
36
{
36
37
private ?RoutingTable $ table = null ;
37
- /** @var ConnectionPoolInterface<Bolt > */
38
+ /** @var ConnectionPoolInterface<StreamSocket > */
38
39
private ConnectionPoolInterface $ pool ;
39
40
private string $ version ;
40
41
41
42
/**
42
- * @param ConnectionPoolInterface<Bolt > $pool
43
+ * @param ConnectionPoolInterface<StreamSocket > $pool
43
44
*/
44
45
public function __construct (ConnectionPoolInterface $ pool , string $ version )
45
46
{
@@ -50,7 +51,7 @@ public function __construct(ConnectionPoolInterface $pool, string $version)
50
51
/**
51
52
* @throws Exception
52
53
*/
53
- public function acquire (UriInterface $ uri , AccessMode $ mode ): Bolt
54
+ public function acquire (UriInterface $ uri , AccessMode $ mode ): StreamSocket
54
55
{
55
56
$ table = $ this ->routingTable (BoltDriver::create ($ uri ));
56
57
$ server = $ this ->getNextServer ($ table , $ mode );
@@ -83,15 +84,25 @@ private function routingTable(DriverInterface $driver): RoutingTable
83
84
if ($ this ->table === null || $ this ->table ->getTtl () < time ()) {
84
85
$ session = $ driver ->createSession ();
85
86
if (str_starts_with ($ this ->version , '3 ' )) {
86
- $ response = $ session ->run ('CALL dbms.cluster.overview() ' )->first ();
87
+ $ response = $ session ->run ('CALL dbms.cluster.overview() ' );
88
+
89
+ /** @var iterable<array{addresses: list<string>, role:string}> $values */
90
+ $ values = [];
91
+ foreach ($ response as $ server ) {
92
+ /** @psalm-suppress InvalidArrayAssignment */
93
+ $ values [] = ['addresses ' => $ server ->get ('addresses ' ), 'role ' => $ server ->get ('role ' )];
94
+ }
95
+
96
+ $ this ->table = new RoutingTable ($ values , time () + 3600 );
87
97
} else {
88
98
$ response = $ session ->run ('CALL dbms.routing.getRoutingTable({context: []}) ' )->first ();
99
+ /** @var iterable<array{addresses: list<string>, role:string}> $values */
100
+ $ values = $ response ->get ('servers ' );
101
+ /** @var int $ttl */
102
+ $ ttl = $ response ->get ('ttl ' );
103
+
104
+ $ this ->table = new RoutingTable ($ values , time () + $ ttl );
89
105
}
90
- /** @var iterable<array{addresses: list<string>, role:string}> $values */
91
- $ values = $ response ->get ('servers ' );
92
- /** @var int $ttl */
93
- $ ttl = $ response ->get ('ttl ' );
94
- $ this ->table = new RoutingTable ($ values , time () + $ ttl );
95
106
}
96
107
97
108
return $ this ->table ;
0 commit comments