Skip to content

Commit f92eb5d

Browse files
committed
Added auto routing to http
1 parent daa5d6f commit f92eb5d

File tree

7 files changed

+144
-32
lines changed

7 files changed

+144
-32
lines changed

src/Network/Bolt/AutoRoutedSession.php renamed to src/Network/AutoRoutedSession.php

Lines changed: 84 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,20 @@
1111
* file that was distributed with this source code.
1212
*/
1313

14-
namespace Laudis\Neo4j\Network\Bolt;
14+
namespace Laudis\Neo4j\Network;
1515

1616
use Ds\Map;
1717
use Ds\Vector;
1818
use Exception;
1919
use Laudis\Neo4j\ClientBuilder;
2020
use Laudis\Neo4j\Contracts\ClientInterface;
21+
use Laudis\Neo4j\Contracts\Injections;
2122
use Laudis\Neo4j\Contracts\SessionInterface;
2223
use Laudis\Neo4j\Contracts\TransactionInterface;
2324
use Laudis\Neo4j\Databags\Statement;
2425
use Laudis\Neo4j\Enum\RoutingRoles;
26+
use Laudis\Neo4j\Network\Bolt\BoltInjections;
27+
use Laudis\Neo4j\Network\Http\HttpInjections;
2528
use function parse_url;
2629
use function preg_match;
2730
use function random_int;
@@ -32,12 +35,16 @@ final class AutoRoutedSession implements SessionInterface
3235
private SessionInterface $referenceSession;
3336
private ?ClientInterface $client = null;
3437
private ?RoutingTable $table = null;
35-
private BoltInjections $injections;
38+
/** @var BoltInjections|HttpInjections */
39+
private Injections $injections;
3640
private int $maxLeader = 0;
3741
private int $maxFollower = 0;
3842
private array $parsedUrl;
3943

40-
public function __construct(SessionInterface $referenceSession, BoltInjections $injections, array $parsedUrl)
44+
/**
45+
* @param BoltInjections|HttpInjections $injections
46+
*/
47+
public function __construct(SessionInterface $referenceSession, Injections $injections, array $parsedUrl)
4148
{
4249
$this->referenceSession = $referenceSession;
4350
$this->injections = $injections;
@@ -81,20 +88,20 @@ private function setupClient(): ClientInterface
8188
$values = $response->get('servers');
8289
/** @var int $ttl */
8390
$ttl = $response->get('ttl');
91+
if ($this->injections instanceof HttpInjections) {
92+
$values = $this->translateTableToHttp($values);
93+
}
8494
$this->table = new RoutingTable($values, time() + $ttl);
8595

8696
$builder = ClientBuilder::create();
8797
$leaders = $this->table->getWithRole(RoutingRoles::LEADER());
8898
$followers = $this->table->getWithRole(RoutingRoles::FOLLOWER());
8999
$injections = $this->injections->withAutoRouting(false);
90100

91-
foreach ($leaders as $i => $leader) {
92-
$builder = $builder->addBoltConnection('leader-'.$i, $this->rebuildUrl($leader), $injections);
93-
$this->maxLeader = $i;
94-
}
95-
foreach ($followers as $i => $follower) {
96-
$builder = $builder->addBoltConnection('follower-'.$i, $this->rebuildUrl($follower), $injections);
97-
$this->maxFollower = $i;
101+
if ($injections instanceof BoltInjections) {
102+
$builder = $this->buildBoltConnections($leaders, $builder, $injections, $followers);
103+
} else {
104+
$builder = $this->buildHttpConnections($leaders, $builder, $injections, $followers);
98105
}
99106

100107
$this->client = $builder->build();
@@ -183,9 +190,9 @@ public function commitTransaction(TransactionInterface $transaction, iterable $s
183190
return $transaction->commit($statements);
184191
}
185192

186-
private function rebuildUrl(string $url): string
193+
private function rebuildUrl(array $parsedUrl): string
187194
{
188-
$parts = array_merge($this->parsedUrl, parse_url($url));
195+
$parts = array_merge($this->parsedUrl, $parsedUrl);
189196

190197
return (isset($parts['scheme']) ? "{$parts['scheme']}:" : '').
191198
((isset($parts['user']) || isset($parts['host'])) ? '//' : '').
@@ -198,4 +205,69 @@ private function rebuildUrl(string $url): string
198205
(isset($parts['query']) ? "?{$parts['query']}" : '').
199206
(isset($parts['fragment']) ? "#{$parts['fragment']}" : '');
200207
}
208+
209+
/**
210+
* @param Vector<string> $leaders
211+
* @param Vector<string> $followers
212+
*/
213+
private function buildBoltConnections(
214+
Vector $leaders,
215+
ClientBuilder $builder,
216+
BoltInjections $injections,
217+
Vector $followers
218+
): ClientBuilder {
219+
foreach ($leaders as $i => $leader) {
220+
$builder = $builder->addBoltConnection('leader-'.$i, $this->rebuildUrl(parse_url($leader)), $injections);
221+
$this->maxLeader = $i;
222+
}
223+
foreach ($followers as $i => $follower) {
224+
$builder = $builder->addBoltConnection('follower-'.$i, $this->rebuildUrl(parse_url($follower)), $injections);
225+
$this->maxFollower = $i;
226+
}
227+
228+
return $builder;
229+
}
230+
231+
/**
232+
* @param Vector<string> $leaders
233+
* @param Vector<string> $followers
234+
*/
235+
private function buildHttpConnections(
236+
Vector $leaders,
237+
ClientBuilder $builder,
238+
HttpInjections $injections,
239+
Vector $followers
240+
): ClientBuilder {
241+
foreach ($leaders as $i => $leader) {
242+
$builder = $builder->addHttpConnection('leader-'.$i, $this->rebuildUrl(parse_url($leader)), $injections);
243+
$this->maxLeader = $i;
244+
}
245+
foreach ($followers as $i => $follower) {
246+
$builder = $builder->addHttpConnection('follower-'.$i, $this->rebuildUrl(parse_url($follower)), $injections);
247+
$this->maxFollower = $i;
248+
}
249+
250+
return $builder;
251+
}
252+
253+
/**
254+
* @param iterable<array{addresses: list<string>, role:string}> $servers
255+
*
256+
* @return iterable<array{addresses: list<string>, role:string}>
257+
*/
258+
private function translateTableToHttp(iterable $servers): iterable
259+
{
260+
/** @var list<array{addresses: list<string>, role:string}> */
261+
$tbr = [];
262+
263+
foreach ($servers as $server) {
264+
$row = ['addresses' => [], 'role' => $server['role']];
265+
foreach ($server['addresses'] as $address) {
266+
$row['addresses'][] = $this->rebuildUrl(['host' => parse_url($address, PHP_URL_HOST)]);
267+
}
268+
$tbr[] = $row;
269+
}
270+
271+
return $tbr;
272+
}
201273
}

src/Network/Bolt/BoltDriver.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
use Laudis\Neo4j\Databags\Neo4jError;
2424
use Laudis\Neo4j\Exception\Neo4jException;
2525
use Laudis\Neo4j\Formatter\BoltCypherFormatter;
26+
use Laudis\Neo4j\Network\AutoRoutedSession;
2627

2728
/**
2829
* @psalm-type ParsedUrl = array{fragment?: string, host: string, pass: string, path?: string, port?: int, query?: string, scheme?: string, user: string}

src/Network/Http/HttpDriver.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use Laudis\Neo4j\Databags\RequestData;
2020
use Laudis\Neo4j\Formatter\HttpCypherFormatter;
2121
use Laudis\Neo4j\HttpDriver\RequestFactory;
22+
use Laudis\Neo4j\Network\AutoRoutedSession;
2223
use Laudis\Neo4j\Network\VersionDiscovery;
2324
use Psr\Http\Client\ClientExceptionInterface;
2425

@@ -76,6 +77,9 @@ public function aquireSession(): SessionInterface
7677
$formatter,
7778
$requestData
7879
);
80+
if ($this->injections->hasAutoRouting()) {
81+
$this->session = new AutoRoutedSession($this->session, $this->injections, $this->parsedUrl);
82+
}
7983

8084
return $this->session;
8185
}

src/Network/Http/HttpSession.php

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@
1717
use JsonException;
1818
use Laudis\Neo4j\Contracts\SessionInterface;
1919
use Laudis\Neo4j\Contracts\TransactionInterface;
20+
use Laudis\Neo4j\Databags\Neo4jError;
2021
use Laudis\Neo4j\Databags\RequestData;
2122
use Laudis\Neo4j\Exception\Neo4jException;
2223
use Laudis\Neo4j\Formatter\HttpCypherFormatter;
2324
use Laudis\Neo4j\HttpDriver\RequestFactory;
2425
use Laudis\Neo4j\HttpDriver\Transaction;
2526
use Psr\Http\Client\ClientExceptionInterface;
2627
use Psr\Http\Client\ClientInterface;
27-
use Psr\Http\Message\StreamInterface;
28+
use Psr\Http\Message\ResponseInterface;
2829

2930
/**
3031
* @psalm-import-type CypherResponseSet from \Laudis\Neo4j\Formatter\HttpCypherFormatter
@@ -52,7 +53,7 @@ public function run(iterable $statements): Vector
5253
{
5354
$request = $this->factory->post($this->data, $statements);
5455
$response = $this->client->sendRequest($request);
55-
$data = $this->interpretResponse($response->getBody());
56+
$data = $this->interpretResponse($response);
5657

5758
return $this->formatter->formatResponse($data);
5859
}
@@ -66,7 +67,7 @@ public function openTransaction(iterable $statements = null): TransactionInterfa
6667
$request = $this->factory->openTransaction($this->data);
6768
$response = $this->client->sendRequest($request);
6869
/** @var array{commit: string} $data */
69-
$data = $this->interpretResponse($response->getBody());
70+
$data = $this->interpretResponse($response);
7071

7172
return new Transaction($this, preg_replace('/\/commit/u', '', $data['commit']));
7273
}
@@ -81,7 +82,7 @@ public function commitTransaction(TransactionInterface $transaction, iterable $s
8182
$commit = $transaction->getDomainIdentifier().'/commit';
8283
$request = $this->factory->post($this->data->withEndpoint($commit), $statements);
8384
$response = $this->client->sendRequest($request);
84-
$data = $this->interpretResponse($response->getBody());
85+
$data = $this->interpretResponse($response);
8586

8687
return $this->formatter->formatResponse($data);
8788
}
@@ -95,7 +96,7 @@ public function rollbackTransaction(TransactionInterface $transaction): void
9596
{
9697
$request = $this->factory->delete($this->data->withEndpoint($transaction->getDomainIdentifier()));
9798
$response = $this->client->sendRequest($request);
98-
$this->interpretResponse($response->getBody());
99+
$this->interpretResponse($response);
99100
}
100101

101102
/**
@@ -104,10 +105,14 @@ public function rollbackTransaction(TransactionInterface $transaction): void
104105
*
105106
* @return CypherResponseSet
106107
*/
107-
private function interpretResponse(StreamInterface $stream): array
108+
private function interpretResponse(ResponseInterface $response): array
108109
{
110+
$contents = $response->getBody()->getContents();
111+
if ($response->getStatusCode() >= 400) {
112+
throw new Neo4jException(new Vector([new Neo4jError((string) $response->getStatusCode(), $contents)]));
113+
}
109114
/** @var CypherResponseSet $body */
110-
$body = json_decode($stream->getContents(), true, 512, JSON_THROW_ON_ERROR);
115+
$body = json_decode($contents, true, 512, JSON_THROW_ON_ERROR);
111116
$errors = $this->formatter->filterError($body);
112117
if (!$errors->isEmpty()) {
113118
throw new Neo4jException($errors);
@@ -124,7 +129,7 @@ public function runOverTransaction(TransactionInterface $transaction, iterable $
124129
{
125130
$request = $this->factory->post($this->data, $statements);
126131
$response = $this->client->sendRequest($request);
127-
$data = $this->interpretResponse($response->getBody());
132+
$data = $this->interpretResponse($response);
128133

129134
return $this->formatter->formatResponse($data);
130135
}

src/Network/Bolt/RoutingTable.php renamed to src/Network/RoutingTable.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* file that was distributed with this source code.
1212
*/
1313

14-
namespace Laudis\Neo4j\Network\Bolt;
14+
namespace Laudis\Neo4j\Network;
1515

1616
use Ds\Vector;
1717
use Laudis\Neo4j\Enum\RoutingRoles;

src/Network/VersionDiscovery.php

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@
1313

1414
namespace Laudis\Neo4j\Network;
1515

16+
use Ds\Vector;
1617
use JsonException;
18+
use Laudis\Neo4j\Databags\Neo4jError;
1719
use Laudis\Neo4j\Databags\RequestData;
20+
use Laudis\Neo4j\Exception\Neo4jException;
1821
use Laudis\Neo4j\HttpDriver\RequestFactory;
1922
use Psr\Http\Client\ClientExceptionInterface;
2023
use Psr\Http\Client\ClientInterface;
@@ -26,6 +29,8 @@
2629
* bolt_direct: string,
2730
* neo4j_version: string,
2831
* neo4j_edition: string,
32+
* db/cluster?: string,
33+
* dbms/cluster?: string,
2934
* data?: string
3035
* }
3136
* @psalm-type DiscoveryResultLegacy = array{
@@ -66,7 +71,7 @@ public function discoverTransactionUrl(RequestData $data, string $database): str
6671
$version = $discovery['neo4j_version'] ?? null;
6772

6873
if ($version === null) {
69-
$discovery = $this->discovery($data->withEndpoint($discovery['data'] ?? $data->getEndpoint().'/db/data'));
74+
$discovery = $this->discovery($data->withEndpoint($discovery['data'] ?? ($data->getEndpoint().'/db/data')));
7075
}
7176
$tsx = $discovery['transaction'];
7277

@@ -77,15 +82,18 @@ public function discoverTransactionUrl(RequestData $data, string $database): str
7782
* @throws ClientExceptionInterface
7883
* @throws JsonException
7984
*
80-
* @return DiscoveryResult|DiscoveryResultLegacy $discovery
85+
* @return DiscoveryResult|DiscoveryResultLegacy
8186
*/
8287
private function discovery(RequestData $data): array
8388
{
8489
$response = $this->client->sendRequest($this->requestFactory->createRequest($data, 'GET'));
8590

86-
/** @var DiscoveryResultLegacy|DiscoveryResult $result */
87-
$result = json_decode($response->getBody()->getContents(), true, 512, JSON_THROW_ON_ERROR);
91+
$contents = $response->getBody()->getContents();
92+
if ($response->getStatusCode() >= 400) {
93+
throw new Neo4jException(new Vector([new Neo4jError((string) $response->getStatusCode(), $contents)]));
94+
}
8895

89-
return $result;
96+
/** @var DiscoveryResultLegacy|DiscoveryResult */
97+
return json_decode($contents, true, 512, JSON_THROW_ON_ERROR);
9098
}
9199
}

tests/Integration/ClusterIntegrationTest.php

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
<?php
22

3+
declare(strict_types=1);
4+
35
/*
46
* This file is part of the Laudis Neo4j package.
57
*
@@ -14,6 +16,7 @@
1416
use Laudis\Neo4j\ClientBuilder;
1517
use Laudis\Neo4j\Contracts\ClientInterface;
1618
use Laudis\Neo4j\Network\Bolt\BoltInjections;
19+
use Laudis\Neo4j\Network\Http\HttpInjections;
1720
use PHPUnit\Framework\TestCase;
1821

1922
final class ClusterIntegrationTest extends TestCase
@@ -23,19 +26,38 @@ final class ClusterIntegrationTest extends TestCase
2326
protected function setUp(): void
2427
{
2528
parent::setUp();
26-
$injections = BoltInjections::create()->withAutoRouting(true);
29+
$boltInjections = BoltInjections::create()->withAutoRouting(true);
30+
$httpInjections = HttpInjections::create()->withAutoRouting(true);
2731
$this->client = ClientBuilder::create()
28-
->addBoltConnection('cluster', 'bolt://neo4j:test@core1', $injections)
32+
->addBoltConnection('cluster-bolt', 'bolt://neo4j:test@core1', $boltInjections)
33+
->addHttpConnection('cluster-http', 'http://neo4j:test@core1', $httpInjections)
2934
->build();
3035
}
3136

32-
public function testAcceptance(): void
37+
/**
38+
* @dataProvider aliasProvider
39+
*/
40+
public function testAcceptance(string $connection): void
41+
{
42+
self::assertEquals(1, $this->client->run('RETURN 1 as x', [], $connection)->first()->get('x'));
43+
}
44+
45+
/**
46+
* @dataProvider aliasProvider
47+
*/
48+
public function testWrite(string $connection): void
3349
{
34-
self::assertEquals(1, $this->client->run('RETURN 1 as x')->first()->get('x'));
50+
self::assertEquals([], $this->client->run('MERGE (x:X) RETURN x', [], $connection)->first()->get('x'));
3551
}
3652

37-
public function testWrite(): void
53+
/**
54+
* @return list<array{0: string}>
55+
*/
56+
public function aliasProvider(): array
3857
{
39-
self::assertEquals([], $this->client->run('MERGE (x:X) RETURN x')->first()->get('x'));
58+
return [
59+
['cluster-bolt'],
60+
['cluster-http'],
61+
];
4062
}
4163
}

0 commit comments

Comments
 (0)