13
13
14
14
namespace Laudis \Neo4j \Bolt ;
15
15
16
+ use function array_flip ;
16
17
use Bolt \Bolt ;
17
18
use Bolt \connection \StreamSocket ;
18
19
use Exception ;
19
20
use function explode ;
20
- use Laudis \Neo4j \Common \TransactionHelper ;
21
+ use const FILTER_VALIDATE_IP ;
22
+ use function filter_var ;
23
+ use Laudis \Neo4j \Common \BoltConnection ;
21
24
use Laudis \Neo4j \Contracts \AuthenticateInterface ;
22
25
use Laudis \Neo4j \Contracts \ConnectionInterface ;
23
26
use Laudis \Neo4j \Contracts \ConnectionPoolInterface ;
27
+ use Laudis \Neo4j \Databags \DatabaseInfo ;
24
28
use Laudis \Neo4j \Databags \SessionConfiguration ;
29
+ use Laudis \Neo4j \Enum \ConnectionProtocol ;
30
+ use Laudis \Neo4j \Neo4j \RoutingTable ;
25
31
use Psr \Http \Message \UriInterface ;
26
- use function str_starts_with ;
27
32
28
33
/**
29
34
* @implements ConnectionPoolInterface<Bolt>
30
35
*/
31
36
final class BoltConnectionPool implements ConnectionPoolInterface
32
37
{
38
+ /** @var array<string, list<ConnectionInterface<Bolt>>> */
39
+ private static array $ connectionCache = [];
40
+
33
41
/**
34
42
* @throws Exception
35
43
*/
@@ -38,24 +46,92 @@ public function acquire(
38
46
AuthenticateInterface $ authenticate ,
39
47
float $ socketTimeout ,
40
48
string $ userAgent ,
41
- SessionConfiguration $ config
49
+ SessionConfiguration $ config ,
50
+ ?RoutingTable $ table = null ,
51
+ ?UriInterface $ server = null
42
52
): ConnectionInterface {
43
- $ host = $ uri ->getHost ();
44
- $ socket = new StreamSocket ($ host , $ uri ->getPort () ?? 7687 , $ socketTimeout );
53
+ $ connectingTo = $ server ?? $ uri ;
54
+ $ key = $ connectingTo ->getHost ().': ' .($ connectingTo ->getPort () ?? '7687 ' );
55
+ if (!isset ($ this ->connectionCache [$ key ])) {
56
+ self ::$ connectionCache [$ key ] = [];
57
+ }
58
+
59
+ foreach (self ::$ connectionCache [$ key ] as $ connection ) {
60
+ if ($ connection ->isOpen ()) {
61
+ return $ connection ;
62
+ }
63
+ }
64
+
65
+ $ socket = new StreamSocket ($ connectingTo ->getHost (), $ connectingTo ->getPort () ?? 7687 , $ socketTimeout );
66
+
67
+ $ this ->configureSsl ($ uri , $ connectingTo , $ socket , $ table );
68
+
69
+ $ bolt = new Bolt ($ socket );
70
+ $ authenticate ->authenticateBolt ($ bolt , $ connectingTo , $ userAgent );
71
+
72
+ /**
73
+ * @var array{'name': 0, 'version': 1, 'edition': 2}
74
+ * @psalm-suppress all
75
+ */
76
+ $ fields = array_flip ($ bolt ->run (<<<'CYPHER'
77
+ CALL dbms.components()
78
+ YIELD name, versions, edition
79
+ UNWIND versions AS version
80
+ RETURN name, version, edition
81
+ CYPHER)['fields ' ]);
45
82
46
- $ this ->configureSsl ($ uri , $ host , $ socket );
83
+ /** @var array{0: array{0: string, 1: string, 2: string}} $results */
84
+ $ results = $ bolt ->pullAll ();
47
85
48
- return TransactionHelper::connectionFromSocket ($ socket , $ uri , $ userAgent , $ authenticate , $ config );
86
+ $ connection = new BoltConnection (
87
+ $ bolt ,
88
+ $ socket ,
89
+ $ results [0 ][$ fields ['name ' ]].'- ' .$ results [0 ][$ fields ['edition ' ]].'/ ' .$ results [0 ][$ fields ['version ' ]],
90
+ $ connectingTo ,
91
+ $ results [0 ][$ fields ['version ' ]],
92
+ ConnectionProtocol::determineBoltVersion ($ bolt ),
93
+ $ config ->getAccessMode (),
94
+ new DatabaseInfo ($ config ->getDatabase ())
95
+ );
96
+
97
+ self ::$ connectionCache [$ key ][] = $ connection ;
98
+
99
+ return $ connection ;
49
100
}
50
101
51
- private function configureSsl (UriInterface $ uri , string $ host , StreamSocket $ socket ): void
102
+ private function configureSsl (UriInterface $ uri , UriInterface $ server , StreamSocket $ socket, ? RoutingTable $ table ): void
52
103
{
53
104
$ scheme = $ uri ->getScheme ();
54
105
$ explosion = explode ('+ ' , $ scheme , 2 );
55
106
$ sslConfig = $ explosion [1 ] ?? '' ;
56
107
57
108
if (str_starts_with ('s ' , $ sslConfig )) {
58
- TransactionHelper::enableSsl ($ host , $ sslConfig , $ socket );
109
+ // We have to pass a different host when working with ssl on aura.
110
+ // There is a strange behaviour where if we pass the uri host on a single
111
+ // instance aura deployment, we need to pass the original uri for the
112
+ // ssl configuration to be valid.
113
+ if ($ table && $ table ->getWithRole ()->count () > 1 ) {
114
+ $ this ->enableSsl ($ server ->getHost (), $ sslConfig , $ socket );
115
+ } else {
116
+ $ this ->enableSsl ($ uri ->getHost (), $ sslConfig , $ socket );
117
+ }
118
+ }
119
+ }
120
+
121
+ private function enableSsl (string $ host , string $ sslConfig , StreamSocket $ sock ): void
122
+ {
123
+ $ options = [
124
+ 'verify_peer ' => true ,
125
+ 'peer_name ' => $ host ,
126
+ ];
127
+ if (!filter_var ($ host , FILTER_VALIDATE_IP )) {
128
+ $ options ['SNI_enabled ' ] = true ;
129
+ }
130
+ if ($ sslConfig === 's ' ) {
131
+ $ sock ->setSslContextOptions ($ options );
132
+ } elseif ($ sslConfig === 'ssc ' ) {
133
+ $ options ['allow_self_signed ' ] = true ;
134
+ $ sock ->setSslContextOptions ($ options );
59
135
}
60
136
}
61
137
}
0 commit comments