diff --git a/docker-compose.yml b/docker-compose.yml index 6dbf3a8e..df0d5660 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: '3.8' - x-definitions: x-shared-env: &common-env diff --git a/src/Bolt/BoltConnection.php b/src/Bolt/BoltConnection.php index 8bf528a0..dba4e9a2 100644 --- a/src/Bolt/BoltConnection.php +++ b/src/Bolt/BoltConnection.php @@ -28,7 +28,6 @@ use Laudis\Neo4j\Contracts\FormatterInterface; use Laudis\Neo4j\Databags\BookmarkHolder; use Laudis\Neo4j\Databags\DatabaseInfo; -use Laudis\Neo4j\Databags\Neo4jError; use Laudis\Neo4j\Enum\AccessMode; use Laudis\Neo4j\Enum\ConnectionProtocol; use Laudis\Neo4j\Exception\Neo4jException; @@ -151,6 +150,12 @@ public function setTimeout(float $timeout): void public function consumeResults(): void { + if ($this->protocol()->serverState !== ServerState::STREAMING && $this->protocol()->serverState !== ServerState::TX_STREAMING) { + $this->subscribedResults = []; + + return; + } + foreach ($this->subscribedResults as $result) { $result = $result->get(); if ($result) { @@ -185,10 +190,6 @@ public function begin(?string $database, ?float $timeout, BookmarkHolder $holder { $this->consumeResults(); - if ($this->protocol()->serverState !== ServerState::READY) { - throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Request.Invalid', 'Message \'BEGIN\' cannot be handled by a session which isn\'t in the READY state.')]); - } - $extra = $this->buildRunExtra($database, $timeout, $holder, AccessMode::WRITE()); $response = $this->protocol() ->begin($extra) @@ -203,10 +204,6 @@ public function begin(?string $database, ?float $timeout, BookmarkHolder $holder */ public function discard(?int $qid): void { - if (!in_array($this->protocol()->serverState, [ServerState::STREAMING, ServerState::TX_STREAMING], true)) { - throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Request.Invalid', 'Message \'DISCARD\' cannot be handled by a session which isn\'t in the STREAMING|TX_STREAMING state.')]); - } - $extra = $this->buildResultExtra(null, $qid); $response = $this->protocol() ->discard($extra) @@ -223,10 +220,6 @@ public function discard(?int $qid): void */ public function run(string $text, array $parameters, ?string $database, ?float $timeout, BookmarkHolder $holder, ?AccessMode $mode): array { - if (!in_array($this->protocol()->serverState, [ServerState::READY, ServerState::TX_READY, ServerState::TX_STREAMING], true)) { - throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Request.Invalid', 'Message \'RUN\' cannot be handled by a session which isn\'t in the READY|TX_READY|TX_STREAMING state.')]); - } - $extra = $this->buildRunExtra($database, $timeout, $holder, $mode); $response = $this->protocol() ->run($text, $parameters, $extra) @@ -260,10 +253,6 @@ public function rollback(): void { $this->consumeResults(); - if ($this->protocol()->serverState !== ServerState::TX_READY) { - throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Request.Invalid', 'Message \'ROLLBACK\' cannot be handled by a session which isn\'t in the TX_READY state.')]); - } - $response = $this->protocol() ->rollback() ->getResponse(); @@ -284,10 +273,6 @@ public function protocol(): V4_4|V5|V5_1|V5_2|V5_3|V5_4 */ public function pull(?int $qid, ?int $fetchSize): array { - if (!in_array($this->protocol()->serverState, [ServerState::STREAMING, ServerState::TX_STREAMING], true)) { - throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Request.Invalid', 'Message \'PULL\' cannot be handled by a session which isn\'t in the STREAMING|TX_STREAMING state.')]); - } - $extra = $this->buildResultExtra($fetchSize, $qid); $tbr = []; diff --git a/src/Bolt/BoltUnmanagedTransaction.php b/src/Bolt/BoltUnmanagedTransaction.php index cf649e68..9a547551 100644 --- a/src/Bolt/BoltUnmanagedTransaction.php +++ b/src/Bolt/BoltUnmanagedTransaction.php @@ -13,14 +13,15 @@ namespace Laudis\Neo4j\Bolt; -use Laudis\Neo4j\Common\TransactionHelper; +use Bolt\enum\ServerState; use Laudis\Neo4j\Contracts\FormatterInterface; use Laudis\Neo4j\Contracts\UnmanagedTransactionInterface; use Laudis\Neo4j\Databags\BookmarkHolder; use Laudis\Neo4j\Databags\SessionConfiguration; use Laudis\Neo4j\Databags\Statement; use Laudis\Neo4j\Databags\TransactionConfiguration; -use Laudis\Neo4j\Exception\Neo4jException; +use Laudis\Neo4j\Enum\TransactionState; +use Laudis\Neo4j\Exception\ClientException; use Laudis\Neo4j\ParameterHelper; use Laudis\Neo4j\Types\AbstractCypherSequence; use Laudis\Neo4j\Types\CypherList; @@ -40,9 +41,7 @@ */ final class BoltUnmanagedTransaction implements UnmanagedTransactionInterface { - private bool $isRolledBack = false; - - private bool $isCommitted = false; + private TransactionState $state = TransactionState::ACTIVE; /** * @param FormatterInterface $formatter @@ -61,8 +60,25 @@ public function __construct( private readonly BookmarkHolder $bookmarkHolder ) {} + /** + * @throws ClientException|Throwable + */ public function commit(iterable $statements = []): CypherList { + if ($this->isFinished()) { + if ($this->state === TransactionState::TERMINATED) { + throw new ClientException("Can't commit, transaction has been terminated"); + } + + if ($this->state === TransactionState::COMMITTED) { + throw new ClientException("Can't commit, transaction has already been committed"); + } + + if ($this->state === TransactionState::ROLLED_BACK) { + throw new ClientException("Can't commit, transaction has already been rolled back"); + } + } + // Force the results to pull all the results. // After a commit, the connection will be in the ready state, making it impossible to use PULL $tbr = $this->runStatements($statements)->each(static function ($list) { @@ -72,15 +88,29 @@ public function commit(iterable $statements = []): CypherList }); $this->connection->commit(); - $this->isCommitted = true; + $this->state = TransactionState::COMMITTED; return $tbr; } public function rollback(): void { + if ($this->isFinished()) { + if ($this->state === TransactionState::TERMINATED) { + throw new ClientException("Can't rollback, transaction has been terminated"); + } + + if ($this->state === TransactionState::COMMITTED) { + throw new ClientException("Can't rollback, transaction has already been committed"); + } + + if ($this->state === TransactionState::ROLLED_BACK) { + throw new ClientException("Can't rollback, transaction has already been rolled back"); + } + } + $this->connection->rollback(); - $this->isRolledBack = true; + $this->state = TransactionState::ROLLED_BACK; } /** @@ -99,6 +129,11 @@ public function runStatement(Statement $statement) $parameters = ParameterHelper::formatParameters($statement->getParameters(), $this->connection->getProtocol()); $start = microtime(true); + $serverState = $this->connection->protocol()->serverState; + if (in_array($serverState, [ServerState::STREAMING, ServerState::TX_STREAMING])) { + $this->connection->consumeResults(); + } + try { $meta = $this->connection->run( $statement->getText(), @@ -109,7 +144,7 @@ public function runStatement(Statement $statement) $this->config->getAccessMode() ); } catch (Throwable $e) { - $this->isRolledBack = true; + $this->state = TransactionState::TERMINATED; throw $e; } $run = microtime(true); @@ -139,36 +174,18 @@ public function runStatements(iterable $statements): CypherList return new CypherList($tbr); } - /** - * @throws Neo4jException - * - * @return never - */ - private function handleMessageException(Neo4jException $e): void - { - $exception = $e->getErrors()[0]; - if (!($exception->getClassification() === 'ClientError' && $exception->getCategory() === 'Request')) { - $this->connection->reset(); - } - if (!$this->isFinished() && in_array($exception->getClassification(), TransactionHelper::ROLLBACK_CLASSIFICATIONS)) { - $this->isRolledBack = true; - } - - throw $e; - } - public function isRolledBack(): bool { - return $this->isRolledBack; + return $this->state === TransactionState::ROLLED_BACK || $this->state === TransactionState::TERMINATED; } public function isCommitted(): bool { - return $this->isCommitted; + return $this->state == TransactionState::COMMITTED; } public function isFinished(): bool { - return $this->isRolledBack() || $this->isCommitted(); + return $this->state != TransactionState::ACTIVE; } } diff --git a/src/Enum/TransactionEffect.php b/src/Enum/TransactionEffect.php deleted file mode 100644 index 9d898538..00000000 --- a/src/Enum/TransactionEffect.php +++ /dev/null @@ -1,40 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Laudis\Neo4j\Enum; - -use JsonSerializable; -use Laudis\TypedEnum\TypedEnum; - -/** - * Defines the access mode of a connection. - * - * @method static self ROLLBACK() - * @method static self NONE() - * - * @extends TypedEnum - * - * @psalm-immutable - * - * @psalm-suppress MutableDependency - */ -final class TransactionEffect extends TypedEnum implements JsonSerializable -{ - private const ROLLBACK = 'rollback'; - private const WRITE = 'none'; - - public function jsonSerialize(): string - { - return $this->getValue(); - } -} diff --git a/src/Enum/TransactionState.php b/src/Enum/TransactionState.php new file mode 100644 index 00000000..6df70917 --- /dev/null +++ b/src/Enum/TransactionState.php @@ -0,0 +1,40 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\Enum; + +/** + * The state of a transaction. + */ +enum TransactionState +{ + /** + * The transaction is running with no explicit success or failure marked. + */ + case ACTIVE; + + /** + * This transaction has been terminated because of a fatal connection error. + */ + case TERMINATED; + + /** + * This transaction has successfully committed. + */ + case COMMITTED; + + /** + * This transaction has been rolled back. + */ + case ROLLED_BACK; +} diff --git a/src/Exception/ClientException.php b/src/Exception/ClientException.php new file mode 100644 index 00000000..81639c5a --- /dev/null +++ b/src/Exception/ClientException.php @@ -0,0 +1,32 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\Exception; + +use RuntimeException; +use Throwable; + +/** + * Exception when a Client Error occurs. + * + * @psalm-immutable + * + * @psalm-suppress MutableDependency + */ +final class ClientException extends RuntimeException +{ + public function __construct(string $message, ?Throwable $previous = null) + { + parent::__construct($message, 0, $previous); + } +} diff --git a/tests/Integration/TransactionIntegrationTest.php b/tests/Integration/TransactionIntegrationTest.php index 8fdbbdf3..4d5f0a59 100644 --- a/tests/Integration/TransactionIntegrationTest.php +++ b/tests/Integration/TransactionIntegrationTest.php @@ -14,6 +14,7 @@ namespace Laudis\Neo4j\Tests\Integration; use Laudis\Neo4j\Databags\Statement; +use Laudis\Neo4j\Exception\ClientException; use Laudis\Neo4j\Exception\Neo4jException; use Laudis\Neo4j\Tests\EnvironmentAwareIntegrationTest; use PHPUnit\Framework\Attributes\DoesNotPerformAssertions; @@ -213,61 +214,60 @@ public function testCommitValidFilledWithInvalidStatement(): void } // TODO commit on READY state cause stuck neo4j connection on older version and disconnect at newer -// public function testCommitInvalid(): void -// { -// $tsx = $this->getSession()->beginTransaction(); -// $tsx->commit(); -// -// self::assertTrue($tsx->isFinished()); -// self::assertFalse($tsx->isRolledBack()); -// self::assertTrue($tsx->isCommitted()); -// -// $exception = false; -// try { -// $tsx->commit(); -// } catch (Throwable) { -// $exception = true; -// } -// self::assertTrue($exception); -// -// self::assertTrue($tsx->isFinished()); -// self::assertTrue($tsx->isRolledBack()); -// self::assertFalse($tsx->isCommitted()); -// } + public function testCommitInvalid(): void + { + $tsx = $this->getSession()->beginTransaction(); + $tsx->commit(); + + self::assertTrue($tsx->isFinished()); + self::assertFalse($tsx->isRolledBack()); + self::assertTrue($tsx->isCommitted()); + + $exception = null; + try { + $tsx->commit(); + } catch (ClientException $e) { + $exception = $e; + } + self::assertTrue($exception instanceof ClientException); + + self::assertTrue($tsx->isFinished()); + self::assertFalse($tsx->isRolledBack()); + self::assertTrue($tsx->isCommitted()); + } public function testRollbackValid(): void { - $this->markTestSkipped('Skipped due to ConnectionTimeoutException'); -// $tsx = $this->getSession()->beginTransaction(); -// $tsx->rollback(); -// -// self::assertTrue($tsx->isFinished()); -// self::assertTrue($tsx->isRolledBack()); -// self::assertFalse($tsx->isCommitted()); + $tsx = $this->getSession()->beginTransaction(); + $tsx->rollback(); + + self::assertTrue($tsx->isFinished()); + self::assertTrue($tsx->isRolledBack()); + self::assertFalse($tsx->isCommitted()); } // TODO rollback on READY state cause stuck neo4j connection on older version and disconnect at newer -// public function testRollbackInvalid(): void -// { -// $tsx = $this->getSession()->beginTransaction(); -// $tsx->rollback(); -// -// self::assertTrue($tsx->isFinished()); -// self::assertTrue($tsx->isRolledBack()); -// self::assertFalse($tsx->isCommitted()); -// -// $exception = false; -// try { -// $tsx->rollback(); -// } catch (Throwable) { -// $exception = true; -// } -// self::assertTrue($exception); -// -// self::assertTrue($tsx->isFinished()); -// self::assertTrue($tsx->isRolledBack()); -// self::assertFalse($tsx->isCommitted()); -// } + public function testRollbackInvalid(): void + { + $tsx = $this->getSession()->beginTransaction(); + $tsx->rollback(); + + self::assertTrue($tsx->isFinished()); + self::assertTrue($tsx->isRolledBack()); + self::assertFalse($tsx->isCommitted()); + + $exception = null; + try { + $tsx->rollback(); + } catch (ClientException $e) { + $exception = $e; + } + self::assertTrue($exception instanceof ClientException); + + self::assertTrue($tsx->isFinished()); + self::assertTrue($tsx->isRolledBack()); + self::assertFalse($tsx->isCommitted()); + } // /** // * TODO - rework this test