16
16
use Bolt \Bolt ;
17
17
use Bolt \connection \StreamSocket ;
18
18
use Exception ;
19
+ use Laudis \Neo4j \Bolt \BoltConnectionPool ;
20
+ use Laudis \Neo4j \Enum \ConnectionProtocol ;
19
21
use function explode ;
20
22
use Laudis \Neo4j \Bolt \BoltDriver ;
21
23
use Laudis \Neo4j \Common \TransactionHelper ;
40
42
final class Neo4jConnectionPool implements ConnectionPoolInterface
41
43
{
42
44
private ?RoutingTable $ table = null ;
45
+ private BoltConnectionPool $ pool ;
46
+
47
+ public function __construct (BoltConnectionPool $ pool )
48
+ {
49
+ $ this ->pool = $ pool ;
50
+ }
43
51
44
52
/**
45
53
* @throws Exception
@@ -51,7 +59,8 @@ public function acquire(
51
59
string $ userAgent ,
52
60
SessionConfiguration $ config
53
61
): ConnectionInterface {
54
- $ table = $ this ->routingTable ($ uri , $ authenticate );
62
+ $ connection = $ this ->pool ->acquire ($ uri , $ authenticate , $ socketTimeout , $ userAgent , $ config );
63
+ $ table = $ this ->routingTable ($ connection );
55
64
$ server = $ this ->getNextServer ($ table , $ config ->getAccessMode ());
56
65
57
66
$ socket = new StreamSocket ($ server ->getHost (), $ server ->getPort () ?? 7687 , $ socketTimeout );
@@ -76,26 +85,28 @@ private function getNextServer(RoutingTable $table, AccessMode $mode): Uri
76
85
}
77
86
78
87
/**
79
- * @param BasicDriver $driver
88
+ * @param ConnectionInterface<Bolt> $driver
80
89
*
81
90
* @throws Exception
82
91
*/
83
- private function routingTable (UriInterface $ uri , AuthenticateInterface $ authenticate ): RoutingTable
92
+ private function routingTable (ConnectionInterface $ connection ): RoutingTable
84
93
{
85
94
if ($ this ->table === null || $ this ->table ->getTtl () < time ()) {
86
- $ session = BoltDriver::create ($ uri , null , $ authenticate )->createSession ();
87
- $ row = $ session ->run (<<<'CYPHER'
88
- CALL dbms.components() YIELD versions UNWIND versions AS version RETURN version
89
- CYPHER
90
- )->first ();
91
- /** @var string */
92
- $ version = $ row ->get ('version ' );
93
-
94
- if (str_starts_with ($ version , '3 ' )) {
95
- $ response = $ session ->run ('CALL dbms.cluster.overview() ' );
96
-
97
- /** @var iterable<array{addresses: list<string>, role:string}> $values */
98
- $ values = [];
95
+ $ bolt = $ connection ->getImplementation ();
96
+ $ protocol = $ connection ->getProtocol ();
97
+ if ($ protocol ->compare (ConnectionProtocol::BOLT_V43 ()) >= 0 ) {
98
+ $ route = $ bolt ->route ()['rt ' ];
99
+ ['servers ' => $ servers , 'ttl ' => $ ttl ] = $ route ;
100
+ } elseif ($ protocol ->compare (ConnectionProtocol::BOLT_V40 ()) >= 0 ) {
101
+ $ bolt ->run ('CALL dbms.routing.getRoutingTable({context: []}) ' );
102
+ ['servers ' => $ servers , 'ttl ' => $ ttl ] = $ bolt ->pullAll ();
103
+ } else {
104
+ $ bolt ->run ('CALL dbms.cluster.overview() ' );
105
+ $ response = $ bolt ->pullAll ();
106
+
107
+ /** @var iterable<array{addresses: list<string>, role:string}> $servers */
108
+ $ servers = [];
109
+ $ ttl = time () + 3600 ;
99
110
foreach ($ response as $ server ) {
100
111
/** @var CypherList<string> $addresses */
101
112
$ addresses = $ server ->get ('addresses ' );
@@ -105,19 +116,10 @@ private function routingTable(UriInterface $uri, AuthenticateInterface $authenti
105
116
*
106
117
* @var array{addresses: list<string>, role:string}
107
118
*/
108
- $ values [] = ['addresses ' => $ addresses ->toArray (), 'role ' => $ server ->get ('role ' )];
119
+ $ servers [] = ['addresses ' => $ addresses ->toArray (), 'role ' => $ server ->get ('role ' )];
109
120
}
110
-
111
- $ this ->table = new RoutingTable ($ values , time () + 3600 );
112
- } else {
113
- $ response = $ session ->run ('CALL dbms.routing.getRoutingTable({context: []}) ' )->first ();
114
- /** @var iterable<array{addresses: list<string>, role:string}> $values */
115
- $ values = $ response ->get ('servers ' );
116
- /** @var int $ttl */
117
- $ ttl = $ response ->get ('ttl ' );
118
-
119
- $ this ->table = new RoutingTable ($ values , time () + $ ttl );
120
121
}
122
+ $ this ->table = new RoutingTable ($ servers , $ ttl );
121
123
}
122
124
123
125
return $ this ->table ;
0 commit comments