11
11
12
12
namespace Laudis \Neo4j \Bolt ;
13
13
14
- use Bolt \protocol \V3 ;
15
- use function explode ;
16
14
use Generator ;
17
- use Laudis \Neo4j \BoltFactory ;
18
- use Laudis \Neo4j \Common \ConnectionConfiguration ;
19
15
use Laudis \Neo4j \Contracts \AuthenticateInterface ;
20
16
use Laudis \Neo4j \Contracts \ConnectionFactoryInterface ;
21
17
use Laudis \Neo4j \Contracts \ConnectionInterface ;
22
18
use Laudis \Neo4j \Contracts \ConnectionPoolInterface ;
23
19
use Laudis \Neo4j \Contracts \SemaphoreInterface ;
24
- use Laudis \Neo4j \Databags \DatabaseInfo ;
25
20
use Laudis \Neo4j \Databags \SessionConfiguration ;
26
- use Laudis \Neo4j \Enum \ConnectionProtocol ;
21
+ use Laudis \Neo4j \Databags \SslConfiguration ;
22
+ use function method_exists ;
27
23
use function microtime ;
28
24
use function shuffle ;
29
25
30
26
/**
31
- * @implements ConnectionPoolInterface<V3>
27
+ * @template T
28
+ * @implements ConnectionPoolInterface<T>
32
29
*/
33
- class SingleBoltConnectionPool implements ConnectionPoolInterface
30
+ class ConnectionPool implements ConnectionPoolInterface
34
31
{
35
32
private SemaphoreInterface $ semaphore ;
36
33
37
- /** @var list<BoltConnection > */
34
+ /** @var list<ConnectionInterface<T> > */
38
35
private array $ activeConnections = [];
39
36
private AuthenticateInterface $ auth ;
37
+ /** @var ConnectionFactoryInterface<T> */
40
38
private ConnectionFactoryInterface $ factory ;
39
+ private string $ userAgent ;
40
+ private SslConfiguration $ sslConfiguration ;
41
41
42
- public function __construct (AuthenticateInterface $ auth , SemaphoreInterface $ semaphore , ConnectionFactoryInterface $ factory )
42
+ /**
43
+ * @param ConnectionFactoryInterface<T> $factory
44
+ */
45
+ public function __construct (AuthenticateInterface $ auth , string $ userAgent , SslConfiguration $ sslConfiguration , SemaphoreInterface $ semaphore , ConnectionFactoryInterface $ factory )
43
46
{
44
47
$ this ->semaphore = $ semaphore ;
45
48
$ this ->auth = $ auth ;
46
49
$ this ->factory = $ factory ;
50
+ $ this ->userAgent = $ userAgent ;
51
+ $ this ->sslConfiguration = $ sslConfiguration ;
47
52
}
48
53
49
- /**
50
- * @return Generator<
51
- * int,
52
- * float,
53
- * bool,
54
- * BoltConnection|null
55
- * >
56
- */
57
54
public function acquire (SessionConfiguration $ config ): Generator
58
55
{
59
56
$ generator = $ this ->semaphore ->wait ();
@@ -68,19 +65,19 @@ public function acquire(SessionConfiguration $config): Generator
68
65
return null ;
69
66
}
70
67
71
- $ connection = $ this ->returnAnyAvailableConnection ();
68
+ $ connection = $ this ->returnAnyAvailableConnection ($ config );
72
69
if ($ connection !== null ) {
73
70
return $ connection ;
74
71
}
75
72
}
76
73
77
- return $ this ->returnAnyAvailableConnection () ?? $ this ->factory ->createConnection ($ this ->auth , $ config );
74
+ return $ this ->returnAnyAvailableConnection ($ config ) ??
75
+ $ this ->factory ->createConnection ($ this ->userAgent , $ this ->sslConfiguration , $ config , $ this ->auth );
78
76
}
79
77
80
78
public function release (ConnectionInterface $ connection ): void
81
79
{
82
80
$ this ->semaphore ->post ();
83
- $ connection ->close ();
84
81
85
82
foreach ($ this ->activeConnections as $ i => $ activeConnection ) {
86
83
if ($ connection === $ activeConnection ) {
@@ -91,7 +88,10 @@ public function release(ConnectionInterface $connection): void
91
88
}
92
89
}
93
90
94
- private function returnAnyAvailableConnection (string $ encryptionLevel ): ?BoltConnection
91
+ /**
92
+ * @return ConnectionInterface<T>|null
93
+ */
94
+ private function returnAnyAvailableConnection (SessionConfiguration $ config ): ?ConnectionInterface
95
95
{
96
96
$ streamingConnection = null ;
97
97
$ requiresReconnectConnection = null ;
@@ -101,8 +101,8 @@ private function returnAnyAvailableConnection(string $encryptionLevel): ?BoltCon
101
101
foreach ($ this ->activeConnections as $ activeConnection ) {
102
102
// We prefer a connection that is just ready
103
103
if ($ activeConnection ->getServerState () === 'READY ' ) {
104
- if ($ this ->factory ->canReuseConnection ($ activeConnection )) {
105
- return $ this ->factory ->reuseConnection ($ activeConnection );
104
+ if ($ this ->factory ->canReuseConnection ($ activeConnection, $ this -> userAgent , $ this -> sslConfiguration , $ this -> auth )) {
105
+ return $ this ->factory ->reuseConnection ($ activeConnection, $ config );
106
106
} else {
107
107
$ requiresReconnectConnection = $ activeConnection ;
108
108
}
@@ -116,23 +116,25 @@ private function returnAnyAvailableConnection(string $encryptionLevel): ?BoltCon
116
116
// https://github.com/neo4j-php/neo4j-php-client/issues/146
117
117
// NOTE: we cannot work with TX_STREAMING as we cannot force the transaction to implicitly close.
118
118
if ($ streamingConnection === null && $ activeConnection ->getServerState () === 'STREAMING ' ) {
119
- if ($ this ->factory ->canReuseConnection ($ activeConnection )) {
119
+ if ($ this ->factory ->canReuseConnection ($ activeConnection, $ this -> userAgent , $ this -> sslConfiguration , $ this -> auth )) {
120
120
$ streamingConnection = $ activeConnection ;
121
- $ streamingConnection ->consumeResults (); // State should now be ready
121
+ if (method_exists ($ streamingConnection , 'consumeResults ' )) {
122
+ $ streamingConnection ->consumeResults (); // State should now be ready
123
+ }
122
124
} else {
123
125
$ requiresReconnectConnection = $ activeConnection ;
124
126
}
125
127
}
126
128
}
127
129
128
130
if ($ streamingConnection ) {
129
- return $ this ->factory ->reuseConnection ($ streamingConnection );
131
+ return $ this ->factory ->reuseConnection ($ streamingConnection, $ config );
130
132
}
131
133
132
134
if ($ requiresReconnectConnection ) {
133
135
$ this ->release ($ requiresReconnectConnection );
134
136
135
- return $ this ->factory ->createConnection ($ this ->auth );
137
+ return $ this ->factory ->createConnection ($ this ->userAgent , $ this -> sslConfiguration , $ config , $ this -> auth );
136
138
}
137
139
138
140
return null ;
0 commit comments