Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: '3.8'

x-definitions:
x-shared-env:
&common-env
Expand Down
27 changes: 6 additions & 21 deletions src/Bolt/BoltConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
use Laudis\Neo4j\Contracts\FormatterInterface;
use Laudis\Neo4j\Databags\BookmarkHolder;
use Laudis\Neo4j\Databags\DatabaseInfo;
use Laudis\Neo4j\Databags\Neo4jError;
use Laudis\Neo4j\Enum\AccessMode;
use Laudis\Neo4j\Enum\ConnectionProtocol;
use Laudis\Neo4j\Exception\Neo4jException;
Expand Down Expand Up @@ -151,6 +150,12 @@ public function setTimeout(float $timeout): void

public function consumeResults(): void
{
if ($this->protocol()->serverState !== ServerState::STREAMING && $this->protocol()->serverState !== ServerState::TX_STREAMING) {
$this->subscribedResults = [];

return;
}

foreach ($this->subscribedResults as $result) {
$result = $result->get();
if ($result) {
Expand Down Expand Up @@ -185,10 +190,6 @@ public function begin(?string $database, ?float $timeout, BookmarkHolder $holder
{
$this->consumeResults();

if ($this->protocol()->serverState !== ServerState::READY) {
throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Request.Invalid', 'Message \'BEGIN\' cannot be handled by a session which isn\'t in the READY state.')]);
}

$extra = $this->buildRunExtra($database, $timeout, $holder, AccessMode::WRITE());
$response = $this->protocol()
->begin($extra)
Expand All @@ -203,10 +204,6 @@ public function begin(?string $database, ?float $timeout, BookmarkHolder $holder
*/
public function discard(?int $qid): void
{
if (!in_array($this->protocol()->serverState, [ServerState::STREAMING, ServerState::TX_STREAMING], true)) {
throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Request.Invalid', 'Message \'DISCARD\' cannot be handled by a session which isn\'t in the STREAMING|TX_STREAMING state.')]);
}

$extra = $this->buildResultExtra(null, $qid);
$response = $this->protocol()
->discard($extra)
Expand All @@ -223,10 +220,6 @@ public function discard(?int $qid): void
*/
public function run(string $text, array $parameters, ?string $database, ?float $timeout, BookmarkHolder $holder, ?AccessMode $mode): array
{
if (!in_array($this->protocol()->serverState, [ServerState::READY, ServerState::TX_READY, ServerState::TX_STREAMING], true)) {
throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Request.Invalid', 'Message \'RUN\' cannot be handled by a session which isn\'t in the READY|TX_READY|TX_STREAMING state.')]);
}

$extra = $this->buildRunExtra($database, $timeout, $holder, $mode);
$response = $this->protocol()
->run($text, $parameters, $extra)
Expand Down Expand Up @@ -260,10 +253,6 @@ public function rollback(): void
{
$this->consumeResults();

if ($this->protocol()->serverState !== ServerState::TX_READY) {
throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Request.Invalid', 'Message \'ROLLBACK\' cannot be handled by a session which isn\'t in the TX_READY state.')]);
}

$response = $this->protocol()
->rollback()
->getResponse();
Expand All @@ -284,10 +273,6 @@ public function protocol(): V4_4|V5|V5_1|V5_2|V5_3|V5_4
*/
public function pull(?int $qid, ?int $fetchSize): array
{
if (!in_array($this->protocol()->serverState, [ServerState::STREAMING, ServerState::TX_STREAMING], true)) {
throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Request.Invalid', 'Message \'PULL\' cannot be handled by a session which isn\'t in the STREAMING|TX_STREAMING state.')]);
}

$extra = $this->buildResultExtra($fetchSize, $qid);

$tbr = [];
Expand Down
73 changes: 46 additions & 27 deletions src/Bolt/BoltUnmanagedTransaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@

namespace Laudis\Neo4j\Bolt;

use Bolt\enum\ServerState;
use Laudis\Neo4j\Common\TransactionHelper;
use Laudis\Neo4j\Contracts\FormatterInterface;
use Laudis\Neo4j\Contracts\UnmanagedTransactionInterface;
use Laudis\Neo4j\Databags\BookmarkHolder;
use Laudis\Neo4j\Databags\SessionConfiguration;
use Laudis\Neo4j\Databags\Statement;
use Laudis\Neo4j\Databags\TransactionConfiguration;
use Laudis\Neo4j\Enum\TransactionState;
use Laudis\Neo4j\Exception\ClientException;
use Laudis\Neo4j\Exception\Neo4jException;
use Laudis\Neo4j\ParameterHelper;
use Laudis\Neo4j\Types\AbstractCypherSequence;
Expand All @@ -40,9 +43,7 @@
*/
final class BoltUnmanagedTransaction implements UnmanagedTransactionInterface
{
private bool $isRolledBack = false;

private bool $isCommitted = false;
private TransactionState $state = TransactionState::ACTIVE;

/**
* @param FormatterInterface<T> $formatter
Expand All @@ -61,8 +62,25 @@ public function __construct(
private readonly BookmarkHolder $bookmarkHolder
) {}

/**
* @throws ClientException|Throwable
*/
public function commit(iterable $statements = []): CypherList
{
if ($this->isFinished()) {
if ($this->state === TransactionState::TERMINATED) {
throw new ClientException("Can't commit, transaction has been terminated");
}

if ($this->state === TransactionState::COMMITTED) {
throw new ClientException("Can't commit, transaction has already been committed");
}

if ($this->state === TransactionState::ROLLED_BACK) {
throw new ClientException("Can't commit, transaction has already been rolled back");
}
}

// Force the results to pull all the results.
// After a commit, the connection will be in the ready state, making it impossible to use PULL
$tbr = $this->runStatements($statements)->each(static function ($list) {
Expand All @@ -72,15 +90,29 @@ public function commit(iterable $statements = []): CypherList
});

$this->connection->commit();
$this->isCommitted = true;
$this->state = TransactionState::COMMITTED;

return $tbr;
}

public function rollback(): void
{
if ($this->isFinished()) {
if ($this->state === TransactionState::TERMINATED) {
throw new ClientException("Can't rollback, transaction has been terminated");
}

if ($this->state === TransactionState::COMMITTED) {
throw new ClientException("Can't rollback, transaction has already been committed");
}

if ($this->state === TransactionState::ROLLED_BACK) {
throw new ClientException("Can't rollback, transaction has already been rolled back");
}
}

$this->connection->rollback();
$this->isRolledBack = true;
$this->state = TransactionState::ROLLED_BACK;
}

/**
Expand All @@ -99,6 +131,11 @@ public function runStatement(Statement $statement)
$parameters = ParameterHelper::formatParameters($statement->getParameters(), $this->connection->getProtocol());
$start = microtime(true);

$serverState = $this->connection->protocol()->serverState;
if (in_array($serverState, [ServerState::STREAMING, ServerState::TX_STREAMING])) {
$this->connection->consumeResults();
}

try {
$meta = $this->connection->run(
$statement->getText(),
Expand All @@ -109,7 +146,7 @@ public function runStatement(Statement $statement)
$this->config->getAccessMode()
);
} catch (Throwable $e) {
$this->isRolledBack = true;
$this->state = TransactionState::TERMINATED;
throw $e;
}
$run = microtime(true);
Expand Down Expand Up @@ -139,36 +176,18 @@ public function runStatements(iterable $statements): CypherList
return new CypherList($tbr);
}

/**
* @throws Neo4jException
*
* @return never
*/
private function handleMessageException(Neo4jException $e): void
{
$exception = $e->getErrors()[0];
if (!($exception->getClassification() === 'ClientError' && $exception->getCategory() === 'Request')) {
$this->connection->reset();
}
if (!$this->isFinished() && in_array($exception->getClassification(), TransactionHelper::ROLLBACK_CLASSIFICATIONS)) {
$this->isRolledBack = true;
}

throw $e;
}

public function isRolledBack(): bool
{
return $this->isRolledBack;
return $this->state === TransactionState::ROLLED_BACK || $this->state === TransactionState::TERMINATED;
}

public function isCommitted(): bool
{
return $this->isCommitted;
return $this->state == TransactionState::COMMITTED;
}

public function isFinished(): bool
{
return $this->isRolledBack() || $this->isCommitted();
return $this->state != TransactionState::ACTIVE;
}
}
40 changes: 0 additions & 40 deletions src/Enum/TransactionEffect.php

This file was deleted.

40 changes: 40 additions & 0 deletions src/Enum/TransactionState.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Neo4j PHP Client and Driver package.
*
* (c) Nagels <https://nagels.tech>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Laudis\Neo4j\Enum;

/**
* The state of a transaction.
*/
enum TransactionState
{
/**
* The transaction is running with no explicit success or failure marked.
*/
case ACTIVE;

/**
* This transaction has been terminated because of a fatal connection error.
*/
case TERMINATED;

/**
* This transaction has successfully committed.
*/
case COMMITTED;

/**
* This transaction has been rolled back.
*/
case ROLLED_BACK;
}
32 changes: 32 additions & 0 deletions src/Exception/ClientException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Neo4j PHP Client and Driver package.
*
* (c) Nagels <https://nagels.tech>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Laudis\Neo4j\Exception;

use RuntimeException;
use Throwable;

/**
* Exception when a Client Error occurs.
*
* @psalm-immutable
*
* @psalm-suppress MutableDependency
*/
final class ClientException extends RuntimeException
{
public function __construct(string $message, ?Throwable $previous = null)
{
parent::__construct($message, 0, $previous);
}
}
Loading
Loading