Skip to content

Commit 1167a1e

Browse files
committed
made bolt session correctly choose read and write servers when requested
1 parent cf7cd9e commit 1167a1e

File tree

1 file changed

+42
-24
lines changed

1 file changed

+42
-24
lines changed

src/Bolt/Session.php

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
use Laudis\Neo4j\Databags\SessionConfiguration;
2929
use Laudis\Neo4j\Databags\Statement;
3030
use Laudis\Neo4j\Databags\TransactionConfiguration;
31+
use Laudis\Neo4j\Enum\AccessMode;
3132
use Laudis\Neo4j\Exception\Neo4jException;
3233
use Laudis\Neo4j\Types\CypherList;
3334
use Psr\Http\Message\UriInterface;
@@ -70,7 +71,7 @@ public function __construct(
7071

7172
public function runStatements(iterable $statements, ?TransactionConfiguration $config = null): CypherList
7273
{
73-
return $this->beginInstantTransaction()->runStatements($statements);
74+
return $this->beginInstantTransaction($this->config)->runStatements($statements);
7475
}
7576

7677
public function openTransaction(iterable $statements = null, ?TransactionConfiguration $config = null): UnmanagedTransactionInterface
@@ -90,16 +91,24 @@ public function run(string $statement, iterable $parameters = [], ?TransactionCo
9091

9192
public function writeTransaction(callable $tsxHandler, ?TransactionConfiguration $config = null)
9293
{
94+
$config ??= TransactionConfiguration::default();
95+
9396
return TransactionHelper::retry(
94-
fn () => $this->beginTransaction([], $config),
97+
fn () => $this->startTransaction($config, $this->config->withAccessMode(AccessMode::WRITE())),
9598
$tsxHandler,
96-
$config ?? TransactionConfiguration::default()
99+
$config
97100
);
98101
}
99102

100103
public function readTransaction(callable $tsxHandler, ?TransactionConfiguration $config = null)
101104
{
102-
return $this->writeTransaction($tsxHandler);
105+
$config ??= TransactionConfiguration::default();
106+
107+
return TransactionHelper::retry(
108+
fn () => $this->startTransaction($config, $this->config->withAccessMode(AccessMode::READ())),
109+
$tsxHandler,
110+
$config
111+
);
103112
}
104113

105114
public function transaction(callable $tsxHandler, ?TransactionConfiguration $config = null)
@@ -110,22 +119,7 @@ public function transaction(callable $tsxHandler, ?TransactionConfiguration $con
110119
public function beginTransaction(?iterable $statements = null, ?TransactionConfiguration $config = null): UnmanagedTransactionInterface
111120
{
112121
$config ??= TransactionConfiguration::default();
113-
try {
114-
$bolt = $this->acquireBolt($config);
115-
116-
$begin = $bolt->begin(['db' => $this->config->getDatabase()]);
117-
118-
if (!$begin) {
119-
throw new Neo4jException(new Vector([new Neo4jError('', 'Cannot open new transaction')]));
120-
}
121-
} catch (Exception $e) {
122-
if ($e instanceof Neo4jException) {
123-
throw $e;
124-
}
125-
throw new Neo4jException(new Vector([new Neo4jError('', $e->getMessage())]), $e);
126-
}
127-
128-
$tsx = new BoltUnmanagedTransaction($this->config->getDatabase(), $this->formatter, $bolt);
122+
$tsx = $this->startTransaction($config, $this->config);
129123

130124
$tsx->runStatements($statements ?? []);
131125

@@ -135,16 +129,40 @@ public function beginTransaction(?iterable $statements = null, ?TransactionConfi
135129
/**
136130
* @return UnmanagedTransactionInterface<T>
137131
*/
138-
private function beginInstantTransaction(): TransactionInterface
132+
private function beginInstantTransaction(SessionConfiguration $config): TransactionInterface
139133
{
140-
return new BoltUnmanagedTransaction($this->config->getDatabase(), $this->formatter, $this->acquireBolt(TransactionConfiguration::default()));
134+
return new BoltUnmanagedTransaction(
135+
$this->config->getDatabase(),
136+
$this->formatter,
137+
$this->acquireBolt(TransactionConfiguration::default(), $config)
138+
);
141139
}
142140

143-
private function acquireBolt(TransactionConfiguration $config): Bolt
141+
private function acquireBolt(TransactionConfiguration $config, SessionConfiguration $sessionConfig): Bolt
144142
{
145-
$bolt = new Bolt($this->pool->acquire($this->uri, $this->config->getAccessMode(), $this->auth, $config));
143+
$bolt = new Bolt($this->pool->acquire($this->uri, $sessionConfig->getAccessMode(), $this->auth, $config));
146144
$this->auth->authenticateBolt($bolt, $this->uri, $this->userAgent);
147145

148146
return $bolt;
149147
}
148+
149+
private function startTransaction(TransactionConfiguration $config, SessionConfiguration $sessionConfig): UnmanagedTransactionInterface
150+
{
151+
try {
152+
$bolt = $this->acquireBolt($config, $sessionConfig);
153+
154+
$begin = $bolt->begin(['db' => $this->config->getDatabase()]);
155+
156+
if (!$begin) {
157+
throw new Neo4jException(new Vector([new Neo4jError('', 'Cannot open new transaction')]));
158+
}
159+
} catch (Exception $e) {
160+
if ($e instanceof Neo4jException) {
161+
throw $e;
162+
}
163+
throw new Neo4jException(new Vector([new Neo4jError('', $e->getMessage())]), $e);
164+
}
165+
166+
return new BoltUnmanagedTransaction($this->config->getDatabase(), $this->formatter, $bolt);
167+
}
150168
}

0 commit comments

Comments
 (0)