Skip to content

Commit f66217b

Browse files
committed
Initial commit to support Casual Clusters
This is little flaky, but I have made successful writes in testing. The flakiness seems to be that I'm not always getting a writable leader. The TTLs are very short on my test database so I'm thinking there might be a race between fetching the routing table and sending the queries. So far this is somewhat magical but does not break the existing API. The implementation parses the statements to see if a write is needed and will get one. Otherwise it uses the default connection as configured in the ClientBuilder.
1 parent efc38fc commit f66217b

File tree

5 files changed

+225
-2
lines changed

5 files changed

+225
-2
lines changed

src/Network/Bolt/BoltDriver.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,12 @@ public function aquireSession(): SessionInterface
6767
throw new Neo4jException(new Vector([new Neo4jError($e->getMessage(), '')]), $e);
6868
}
6969

70-
$this->session = new BoltSession($this->parsedUrl, $bolt, new BoltCypherFormatter(), $this->injections);
70+
if ($this->injections->isCasualCluster()) {
71+
$routingTable = new RoutingTable($bolt, new BoltCypherFormatter);
72+
$this->session = new CasualClusterSession($routingTable, $this->parsedUrl, $bolt, new BoltCypherFormatter, $this->injections);
73+
} else {
74+
$this->session = new BoltSession($this->parsedUrl, $bolt, new BoltCypherFormatter(), $this->injections);
75+
}
7176

7277
return $this->session;
7378
}

src/Network/Bolt/BoltInjections.php

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,18 @@ final class BoltInjections
4242
private $database;
4343
/** @var LazySSLContextOptions */
4444
private $sslContextOptions;
45+
/** @var bool */
46+
private bool $isCasualCluster;
4547

4648
/**
4749
* @param callable():string|?string $database
4850
* @param LazySSLContextOptions $sslContextOptions
4951
*/
50-
public function __construct($database = null, $sslContextOptions = null)
52+
public function __construct($database = null, $sslContextOptions = null, $isCasualCluster = false)
5153
{
5254
$this->database = $database ?? static function (): string { return 'neo4j'; };
5355
$this->sslContextOptions = $sslContextOptions;
56+
$this->isCasualCluster = $isCasualCluster;
5457
}
5558

5659
/**
@@ -81,6 +84,11 @@ public function withSslContextOptions($options): self
8184
return new self($this->database, $options);
8285
}
8386

87+
public function withCasualCluster(bool $isCasualCluster): self
88+
{
89+
return new self($this->database, $this->sslContextOptions, $isCasualCluster);
90+
}
91+
8492
public function database(): string
8593
{
8694
if (is_callable($this->database)) {
@@ -102,4 +110,9 @@ public function sslContextOptions(): ?array
102110

103111
return $this->sslContextOptions;
104112
}
113+
114+
public function isCasualCluster(): bool
115+
{
116+
return $this->isCasualCluster;
117+
}
105118
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/*
6+
* This file is part of the Laudis Neo4j package.
7+
*
8+
* (c) Laudis technologies <http://laudis.tech>
9+
*
10+
* For the full copyright and license information, please view the LICENSE
11+
* file that was distributed with this source code.
12+
*/
13+
14+
namespace Laudis\Neo4j\Network\Bolt;
15+
16+
use Bolt\Bolt;
17+
use Ds\Vector;
18+
use Laudis\Neo4j\Contracts\SessionInterface;
19+
use Laudis\Neo4j\Contracts\TransactionInterface;
20+
use Laudis\Neo4j\Formatter\BoltCypherFormatter;
21+
22+
class CasualClusterSession implements SessionInterface
23+
{
24+
private RoutingTable $routingTable;
25+
private array $parsedUrl;
26+
private Bolt $bolt;
27+
private BoltCypherFormatter $formatter;
28+
private BoltInjections $injections;
29+
30+
public function __construct(RoutingTable $routingTable, array $parsedUrl, Bolt $bolt, BoltCypherFormatter $formatter, BoltInjections $injections)
31+
{
32+
$this->routingTable = $routingTable;
33+
$this->parsedUrl = $parsedUrl;
34+
$this->bolt = $bolt;
35+
$this->formatter = $formatter;
36+
$this->injections = $injections;
37+
}
38+
39+
public function run(iterable $statements): Vector
40+
{
41+
return $this->preparedSession($statements)->run($statements);
42+
}
43+
44+
public function runOverTransaction(TransactionInterface $transaction, iterable $statements): Vector
45+
{
46+
return $this->preparedSession($statements)->runOverTransaction($transaction, $statements);
47+
}
48+
49+
public function rollbackTransaction(TransactionInterface $transaction): void
50+
{
51+
$this->preparedSession()->rollbackTransaction($transaction);
52+
}
53+
54+
public function commitTransaction(TransactionInterface $transaction, iterable $statements): Vector
55+
{
56+
return $this->preparedSession($statements)->commitTransaction($transaction, $statements);
57+
}
58+
59+
public function openTransaction(?iterable $statements = null): TransactionInterface
60+
{
61+
return $this->preparedSession($statements)->openTransaction($statements);
62+
}
63+
64+
private function needsWriter(array $statements): bool
65+
{
66+
return count(preg_grep(
67+
'/(CREATE|SET|MERGE|DELETE)/m',
68+
array_map(function ($statement) {
69+
return $statement->getText();
70+
}, $statements)
71+
)) > 0;
72+
}
73+
74+
private function preparedSession(?iterable $statements = [])
75+
{
76+
if (!is_null($statements) && $this->needsWriter($this->coerce_statements($statements))) {
77+
$leaders = $this->routingTable->getLeaders();
78+
79+
$parsedUrl = $this->parsedUrl;
80+
$parsedUrl['host'] = $leaders->getHost();
81+
return new BoltSession($parsedUrl, $this->bolt, $this->formatter, $this->injections);
82+
}
83+
84+
return new BoltSession($this->parsedUrl, $this->bolt, $this->formatter, $this->injections);
85+
}
86+
87+
private function coerce_statements(iterable $statements): array
88+
{
89+
if (is_array($statements)) return $statements;
90+
$s = [];
91+
array_push($s, ...$statements);
92+
return $s;
93+
}
94+
}

src/Network/Bolt/RoutingTable.php

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/*
6+
* This file is part of the Laudis Neo4j package.
7+
*
8+
* (c) Laudis technologies <http://laudis.tech>
9+
*
10+
* For the full copyright and license information, please view the LICENSE
11+
* file that was distributed with this source code.
12+
*/
13+
14+
namespace Laudis\Neo4j\Network\Bolt;
15+
16+
use Bolt\Bolt;
17+
use Ds\Map;
18+
use Laudis\Neo4j\Formatter\BoltCypherFormatter;
19+
20+
class RoutingTable
21+
{
22+
private $roles = [
23+
'leader' => 'WRITE',
24+
'follower' => 'READ',
25+
'router' => 'ROUTE',
26+
];
27+
28+
private Map $routingTable;
29+
private ?int $ttl = null;
30+
private Bolt $bolt;
31+
32+
public function __construct(Bolt $bolt, BoltCypherFormatter $formatter)
33+
{
34+
$this->bolt = $bolt;
35+
$this->formatter = $formatter;
36+
$this->loadRoutingTable();
37+
}
38+
39+
public function getLeaders(): ?Server
40+
{
41+
foreach ($this->getServers() as $server) {
42+
if ($server['role'] === $this->roles['leader']) {
43+
return new Server($server['addresses'], $server['role']);
44+
}
45+
}
46+
47+
return null;
48+
}
49+
50+
public function getFollowers(): ?Server
51+
{
52+
foreach ($this->getServers() as $server) {
53+
if ($server['role'] === $this->roles['follower']) {
54+
return new Server($server['addresses'], $server['roler']);
55+
}
56+
}
57+
}
58+
59+
private function getServers(): array
60+
{
61+
return $this->loadRoutingTable()->get('servers');
62+
}
63+
64+
private function loadRoutingTable(): Map
65+
{
66+
if (is_null($this->ttl) || $this->ttl > time()) {
67+
$meta = $this->bolt->run('CALL dbms.routing.getRoutingTable({})');
68+
$results = $this->bolt->pull();
69+
$response = $this->formatter->formatResult($meta, $results)->first();
70+
$this->ttl = time() + $response->get('ttl');
71+
$this->routingTable = $response;
72+
}
73+
74+
return $this->routingTable;
75+
}
76+
}

src/Network/Bolt/Server.php

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/*
6+
* This file is part of the Laudis Neo4j package.
7+
*
8+
* (c) Laudis technologies <http://laudis.tech>
9+
*
10+
* For the full copyright and license information, please view the LICENSE
11+
* file that was distributed with this source code.
12+
*/
13+
14+
namespace Laudis\Neo4j\Network\Bolt;
15+
16+
class Server
17+
{
18+
private array $addresses;
19+
private string $role;
20+
21+
public function __construct(array $addresses, string $role)
22+
{
23+
$this->addresses = $addresses;
24+
$this->role = $role;
25+
}
26+
27+
public function getHost(): string
28+
{
29+
if (count($this->addresses) > 1) {
30+
return array_rand($this->addresses);
31+
}
32+
33+
return $this->addresses[0];
34+
}
35+
}

0 commit comments

Comments
 (0)