Skip to content
52 changes: 23 additions & 29 deletions src/Bolt/BoltConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
*/
class BoltConnection implements ConnectionInterface
{
private BoltMessageFactory $messageFactory;

/**
* @note We are using references to "subscribed results" to maintain backwards compatibility and try and strike
* a balance between performance and ease of use.
Expand Down Expand Up @@ -82,6 +84,7 @@ public function __construct(
private readonly ConnectionConfiguration $config,
private readonly ?Neo4jLogger $logger,
) {
$this->messageFactory = new BoltMessageFactory($this->protocol(), $this->logger);
}

public function getEncryptionLevel(): string
Expand Down Expand Up @@ -196,10 +199,8 @@ public function consumeResults(): void
*/
public function reset(): void
{
$this->logger?->log(LogLevel::DEBUG, 'RESET');
$response = $this->protocol()
->reset()
->getResponse();
$message = $this->messageFactory->createResetMessage();
$response = $message->send()->getResponse();
$this->assertNoFailure($response);
$this->subscribedResults = [];
}
Expand All @@ -214,10 +215,8 @@ public function begin(?string $database, ?float $timeout, BookmarkHolder $holder
$this->consumeResults();

$extra = $this->buildRunExtra($database, $timeout, $holder, AccessMode::WRITE());
$this->logger?->log(LogLevel::DEBUG, 'BEGIN', $extra);
$response = $this->protocol()
->begin($extra)
->getResponse();
$message = $this->messageFactory->createBeginMessage($extra);
$response = $message->send()->getResponse();
$this->assertNoFailure($response);
}

Expand All @@ -229,10 +228,9 @@ public function begin(?string $database, ?float $timeout, BookmarkHolder $holder
public function discard(?int $qid): void
{
$extra = $this->buildResultExtra(null, $qid);
$this->logger?->log(LogLevel::DEBUG, 'DISCARD', $extra);
$response = $this->protocol()
->discard($extra)
->getResponse();

$message = $this->messageFactory->createDiscardMessage($extra);
$response = $message->send()->getResponse();
$this->assertNoFailure($response);
}

Expand All @@ -252,10 +250,8 @@ public function run(
?AccessMode $mode,
): array {
$extra = $this->buildRunExtra($database, $timeout, $holder, $mode);
$this->logger?->log(LogLevel::DEBUG, 'RUN', $extra);
$response = $this->protocol()
->run($text, $parameters, $extra)
->getResponse();
$message = $this->messageFactory->createRunMessage($text, $parameters, $extra);
$response = $message->send()->getResponse();
$this->assertNoFailure($response);

/** @var BoltMeta */
Expand All @@ -269,12 +265,10 @@ public function run(
*/
public function commit(): void
{
$this->logger?->log(LogLevel::DEBUG, 'COMMIT');
$this->consumeResults();

$response = $this->protocol()
->commit()
->getResponse();
$message = $this->messageFactory->createCommitMessage();
$response = $message->send()->getResponse();
$this->assertNoFailure($response);
}

Expand All @@ -285,12 +279,10 @@ public function commit(): void
*/
public function rollback(): void
{
$this->logger?->log(LogLevel::DEBUG, 'ROLLBACK');
$this->consumeResults();

$response = $this->protocol()
->rollback()
->getResponse();
$message = $this->messageFactory->createRollbackMessage();
$response = $message->send()->getResponse();
$this->assertNoFailure($response);
}

Expand All @@ -316,8 +308,9 @@ public function pull(?int $qid, ?int $fetchSize): array
$this->logger?->log(LogLevel::DEBUG, 'PULL', $extra);

$tbr = [];
/** @var Response $response */
foreach ($this->protocol()->pull($extra)->getResponses() as $response) {
$message = $this->messageFactory->createPullMessage($extra);

foreach ($message->send()->getResponses() as $response) {
$this->assertNoFailure($response);
$tbr[] = $response->content;
}
Expand All @@ -339,8 +332,8 @@ public function close(): void
$this->consumeResults();
}

$this->logger?->log(LogLevel::DEBUG, 'GOODBYE');
$this->protocol()->goodbye();
$message = $this->messageFactory->createGoodbyeMessage();
$message->send();

unset($this->boltProtocol); // has to be set to null as the sockets don't recover nicely contrary to what the underlying code might lead you to believe;
}
Expand Down Expand Up @@ -406,7 +399,8 @@ private function assertNoFailure(Response $response): void
{
if ($response->signature === Signature::FAILURE) {
$this->logger?->log(LogLevel::ERROR, 'FAILURE');
$resetResponse = $this->protocol()->reset()->getResponse();
$message = $this->messageFactory->createResetMessage();
$resetResponse = $message->send()->getResponse();
$this->subscribedResults = [];
if ($resetResponse->signature === Signature::FAILURE) {
throw new Neo4jException([Neo4jError::fromBoltResponse($resetResponse), Neo4jError::fromBoltResponse($response)]);
Expand Down
82 changes: 82 additions & 0 deletions src/Bolt/BoltMessageFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Neo4j PHP Client and Driver package.
*
* (c) Nagels <https://nagels.tech>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Laudis\Neo4j\Bolt;

use Bolt\protocol\V4_4;
use Bolt\protocol\V5;
use Bolt\protocol\V5_1;
use Bolt\protocol\V5_2;
use Bolt\protocol\V5_3;
use Bolt\protocol\V5_4;
use Laudis\Neo4j\Bolt\Messages\BoltBeginMessage;
use Laudis\Neo4j\Bolt\Messages\BoltCommitMessage;
use Laudis\Neo4j\Bolt\Messages\BoltDiscardMessage;
use Laudis\Neo4j\Bolt\Messages\BoltGoodbyeMessage;
use Laudis\Neo4j\Bolt\Messages\BoltPullMessage;
use Laudis\Neo4j\Bolt\Messages\BoltResetMessage;
use Laudis\Neo4j\Bolt\Messages\BoltRollbackMessage;
use Laudis\Neo4j\Bolt\Messages\BoltRunMessage;
use Laudis\Neo4j\Common\Neo4jLogger;

/**
* Factory class for creating Bolt protocol messages.
*/
final class BoltMessageFactory
{
public function __construct(
private readonly V4_4|V5|V5_1|V5_2|V5_3|V5_4 $protocol,
private readonly ?Neo4jLogger $logger = null,
) {
}

public function createResetMessage(): BoltResetMessage
{
return new BoltResetMessage($this->protocol, $this->logger);
}

public function createBeginMessage(array $extra): BoltBeginMessage
{
return new BoltBeginMessage($this->protocol, $extra, $this->logger);
}

public function createDiscardMessage(array $extra): BoltDiscardMessage
{
return new BoltDiscardMessage($this->protocol, $extra, $this->logger);
}

public function createRunMessage(string $text, array $parameters, array $extra): BoltRunMessage
{
return new BoltRunMessage($this->protocol, $text, $parameters, $extra, $this->logger);
}

public function createCommitMessage(): BoltCommitMessage
{
return new BoltCommitMessage($this->protocol, $this->logger);
}

public function createRollbackMessage(): BoltRollbackMessage
{
return new BoltRollbackMessage($this->protocol, $this->logger);
}

public function createPullMessage(array $extra): BoltPullMessage
{
return new BoltPullMessage($this->protocol, $extra, $this->logger);
}

public function createGoodbyeMessage(): BoltGoodbyeMessage
{
return new BoltGoodbyeMessage($this->protocol, $this->logger);
}
}
43 changes: 43 additions & 0 deletions src/Bolt/Messages/BoltBeginMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Neo4j PHP Client and Driver package.
*
* (c) Nagels <https://nagels.tech>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Laudis\Neo4j\Bolt\Messages;

use Bolt\protocol\V4_4;
use Bolt\protocol\V5;
use Bolt\protocol\V5_1;
use Bolt\protocol\V5_2;
use Bolt\protocol\V5_3;
use Bolt\protocol\V5_4;
use Laudis\Neo4j\Common\Neo4jLogger;
use Laudis\Neo4j\Contracts\BoltMessage;
use Psr\Log\LogLevel;

final class BoltBeginMessage extends BoltMessage
{
public function __construct(
private readonly V4_4|V5|V5_1|V5_2|V5_3|V5_4 $protocol,
private readonly array $extra,
private readonly ?Neo4jLogger $logger,
) {
parent::__construct($protocol);
}

public function send(): BoltBeginMessage
{
$this->logger?->log(LogLevel::DEBUG, 'BEGIN', $this->extra);
$this->protocol->begin($this->extra);

return $this;
}
}
42 changes: 42 additions & 0 deletions src/Bolt/Messages/BoltCommitMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Neo4j PHP Client and Driver package.
*
* (c) Nagels <https://nagels.tech>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Laudis\Neo4j\Bolt\Messages;

use Bolt\protocol\V4_4;
use Bolt\protocol\V5;
use Bolt\protocol\V5_1;
use Bolt\protocol\V5_2;
use Bolt\protocol\V5_3;
use Bolt\protocol\V5_4;
use Laudis\Neo4j\Common\Neo4jLogger;
use Laudis\Neo4j\Contracts\BoltMessage;
use Psr\Log\LogLevel;

final class BoltCommitMessage extends BoltMessage
{
public function __construct(
private readonly V4_4|V5|V5_1|V5_2|V5_3|V5_4 $protocol,
private readonly ?Neo4jLogger $logger,
) {
parent::__construct($protocol);
}

public function send(): BoltCommitMessage
{
$this->logger?->log(LogLevel::DEBUG, 'COMMIT');
$this->protocol->commit();

return $this;
}
}
43 changes: 43 additions & 0 deletions src/Bolt/Messages/BoltDiscardMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Neo4j PHP Client and Driver package.
*
* (c) Nagels <https://nagels.tech>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Laudis\Neo4j\Bolt\Messages;

use Bolt\protocol\V4_4;
use Bolt\protocol\V5;
use Bolt\protocol\V5_1;
use Bolt\protocol\V5_2;
use Bolt\protocol\V5_3;
use Bolt\protocol\V5_4;
use Laudis\Neo4j\Common\Neo4jLogger;
use Laudis\Neo4j\Contracts\BoltMessage;
use Psr\Log\LogLevel;

final class BoltDiscardMessage extends BoltMessage
{
public function __construct(
private readonly V4_4|V5|V5_1|V5_2|V5_3|V5_4 $protocol,
private readonly array $extra,
private readonly ?Neo4jLogger $logger,
) {
parent::__construct($protocol);
}

public function send(): BoltDiscardMessage
{
$this->logger?->log(LogLevel::DEBUG, 'DISCARD', $this->extra);
$this->protocol->discard($this->extra);

return $this;
}
}
42 changes: 42 additions & 0 deletions src/Bolt/Messages/BoltGoodbyeMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Neo4j PHP Client and Driver package.
*
* (c) Nagels <https://nagels.tech>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Laudis\Neo4j\Bolt\Messages;

use Bolt\protocol\V4_4;
use Bolt\protocol\V5;
use Bolt\protocol\V5_1;
use Bolt\protocol\V5_2;
use Bolt\protocol\V5_3;
use Bolt\protocol\V5_4;
use Laudis\Neo4j\Common\Neo4jLogger;
use Laudis\Neo4j\Contracts\BoltMessage;
use Psr\Log\LogLevel;

final class BoltGoodbyeMessage extends BoltMessage
{
public function __construct(
private readonly V4_4|V5|V5_1|V5_2|V5_3|V5_4 $protocol,
private readonly ?Neo4jLogger $logger,
) {
parent::__construct($protocol);
}

public function send(): BoltGoodbyeMessage
{
$this->logger?->log(LogLevel::DEBUG, 'GOODBYE');
$this->protocol->goodbye();

return $this;
}
}
Loading
Loading