Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
beStrictAboutOutputDuringTests="true"
>
<testsuites>
<testsuite name="Unit tests">
<directory suffix="Test.php">./tests/Unit</directory>
</testsuite>
<testsuite name="Functional tests">
<directory suffix="Test.php">./tests/Functional</directory>
</testsuite>
Expand Down
83 changes: 76 additions & 7 deletions src/Decorators/SymfonySession.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,33 @@
namespace Neo4j\Neo4jBundle\Decorators;

use Laudis\Neo4j\Basic\Session;
use Laudis\Neo4j\Common\TransactionHelper;
use Laudis\Neo4j\Contracts\ConnectionPoolInterface;
use Laudis\Neo4j\Contracts\CypherSequence;
use Laudis\Neo4j\Contracts\SessionInterface;
use Laudis\Neo4j\Databags\Bookmark;
use Laudis\Neo4j\Databags\SessionConfiguration;
use Laudis\Neo4j\Databags\Statement;
use Laudis\Neo4j\Databags\SummarizedResult;
use Laudis\Neo4j\Databags\TransactionConfiguration;
use Laudis\Neo4j\Enum\AccessMode;
use Laudis\Neo4j\Exception\Neo4jException;
use Laudis\Neo4j\Types\CypherList;
use Neo4j\Neo4jBundle\EventHandler;
use Neo4j\Neo4jBundle\Factories\SymfonyDriverFactory;

final class SymfonySession implements SessionInterface
{
private const MAX_RETRIES = 3;
private const ROLLBACK_CLASSIFICATIONS = ['ClientError', 'TransientError', 'DatabaseError'];

public function __construct(
private readonly Session $session,
private readonly EventHandler $handler,
private readonly SymfonyDriverFactory $factory,
private readonly string $alias,
private readonly string $schema,
private readonly SessionConfiguration $config,
private readonly ConnectionPoolInterface $pool,
) {
}

Expand Down Expand Up @@ -76,10 +85,7 @@ public function beginTransaction(?iterable $statements = null, ?TransactionConfi
#[\Override]
public function writeTransaction(callable $tsxHandler, ?TransactionConfiguration $config = null)
{
return TransactionHelper::retry(
fn () => $this->beginTransaction(config: $config),
$tsxHandler
);
return $this->retryTransaction($tsxHandler, $config, read: false);
}

/**
Expand All @@ -92,8 +98,7 @@ public function writeTransaction(callable $tsxHandler, ?TransactionConfiguration
#[\Override]
public function readTransaction(callable $tsxHandler, ?TransactionConfiguration $config = null)
{
// TODO: create read transaction here.
return $this->writeTransaction($tsxHandler, $config);
return $this->retryTransaction($tsxHandler, $config, read: true);
}

/**
Expand All @@ -114,4 +119,68 @@ public function getLastBookmark(): Bookmark
{
return $this->session->getLastBookmark();
}

/**
* Custom retry transaction logic to replace TransactionHelper.
*
* @template HandlerResult
*
* @param callable(SymfonyTransaction):HandlerResult $tsxHandler
*
* @return HandlerResult
*/
private function retryTransaction(callable $tsxHandler, ?TransactionConfiguration $config, bool $read)
{
$attempt = 0;

while (true) {
++$attempt;
$transaction = null;

try {
$sessionConfig = $this->config->withAccessMode($read ? AccessMode::READ() : AccessMode::WRITE());
$transaction = $this->startTransaction($config, $sessionConfig);

$result = $tsxHandler($transaction);

self::triggerLazyResult($result);
$transaction->commit();

return $result;
} catch (Neo4jException $e) {
if ($transaction && !in_array($e->getClassification(), self::ROLLBACK_CLASSIFICATIONS)) {
$transaction->rollback();
}

if ('NotALeader' === $e->getTitle()) {
$this->pool->close();
} elseif ('TransientError' !== $e->getClassification()) {
throw $e;
}

if ($attempt >= self::MAX_RETRIES) {
throw $e;
}

usleep(100_000);
}
}
}

private static function triggerLazyResult(mixed $tbr): void
{
if ($tbr instanceof CypherSequence) {
$tbr->preload();
}
}

private function startTransaction(?TransactionConfiguration $config, SessionConfiguration $sessionConfig): SymfonyTransaction
{
return $this->factory->createTransaction(
session: $this->session,
config: $config,
alias: $this->alias,
schema: $this->schema
);
}
}
1 change: 0 additions & 1 deletion src/DependencyInjection/Neo4jExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use Neo4j\Neo4jBundle\EventHandler;
use Neo4j\Neo4jBundle\EventListener\Neo4jProfileListener;
use Psr\Log\LoggerInterface;
use Psr\Log\LogLevel;
use Symfony\Component\Config\FileLocator;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\ContainerInterface;
Expand Down
17 changes: 17 additions & 0 deletions src/Factories/SymfonyDriverFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public function createSession(
factory: $this,
alias: $alias,
schema: $schema,
config: $config ?? new SessionConfiguration(),
pool: $this->getPoolFromDriver($driver),
);
}

Expand All @@ -70,6 +72,21 @@ public function createDriver(
);
}

private function getPoolFromDriver(Driver $driver): \Laudis\Neo4j\Contracts\ConnectionPoolInterface
{
// Use reflection to access the private pool property from the underlying driver
$reflection = new \ReflectionClass($driver);
$driverProperty = $reflection->getProperty('driver');
$driverProperty->setAccessible(true);
$underlyingDriver = $driverProperty->getValue($driver);

$underlyingReflection = new \ReflectionClass($underlyingDriver);
$poolProperty = $underlyingReflection->getProperty('pool');
$poolProperty->setAccessible(true);

return $poolProperty->getValue($underlyingDriver);
}

private function generateTransactionId(): string
{
if ($this->uuidFactory) {
Expand Down
Loading
Loading